Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 68 additions & 84 deletions notebooks/examples.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,11 @@
"metadata": {},
"outputs": [],
"source": [
"import flux.job\n",
"from executorlib import Executor\n",
"\n",
"with flux.job.FluxExecutor() as flux_exe:\n",
" with Executor(max_cores=1, flux_executor=flux_exe) as exe:\n",
" future = exe.submit(sum, [1, 1])\n",
" print(future.result())"
"with Executor(max_cores=1, backend=\"flux\") as exe:\n",
" future = exe.submit(sum, [1, 1])\n",
" print(future.result())"
]
},
{
Expand All @@ -98,28 +96,26 @@
"metadata": {},
"outputs": [],
"source": [
"import flux.job\n",
"from executorlib import Executor\n",
"\n",
"\n",
"def calc(*args):\n",
" return sum(*args)\n",
"\n",
"\n",
"with flux.job.FluxExecutor() as flux_exe:\n",
" with Executor(max_cores=2, flux_executor=flux_exe) as exe:\n",
" fs_1 = exe.submit(calc, [2, 1])\n",
" fs_2 = exe.submit(calc, [2, 2])\n",
" fs_3 = exe.submit(calc, [2, 3])\n",
" fs_4 = exe.submit(calc, [2, 4])\n",
" print(\n",
" [\n",
" fs_1.result(),\n",
" fs_2.result(),\n",
" fs_3.result(),\n",
" fs_4.result(),\n",
" ]\n",
" )"
"with Executor(max_cores=2, backend=\"flux\") as exe:\n",
" fs_1 = exe.submit(calc, [2, 1])\n",
" fs_2 = exe.submit(calc, [2, 2])\n",
" fs_3 = exe.submit(calc, [2, 3])\n",
" fs_4 = exe.submit(calc, [2, 4])\n",
" print(\n",
" [\n",
" fs_1.result(),\n",
" fs_2.result(),\n",
" fs_3.result(),\n",
" fs_4.result(),\n",
" ]\n",
" )"
]
},
{
Expand Down Expand Up @@ -156,17 +152,15 @@
"metadata": {},
"outputs": [],
"source": [
"import flux.job\n",
"from executorlib import Executor\n",
"\n",
"\n",
"def calc(*args):\n",
" return sum(*args)\n",
"\n",
"\n",
"with flux.job.FluxExecutor() as flux_exe:\n",
" with Executor(max_cores=2, flux_executor=flux_exe) as exe:\n",
" print(list(exe.map(calc, [[2, 1], [2, 2], [2, 3], [2, 4]])))"
"with Executor(max_cores=2, backend=\"flux\") as exe:\n",
" print(list(exe.map(calc, [[2, 1], [2, 2], [2, 3], [2, 4]])))"
]
},
{
Expand Down Expand Up @@ -272,27 +266,25 @@
"metadata": {},
"outputs": [],
"source": [
"import flux.job\n",
"from executorlib import Executor\n",
"\n",
"\n",
"def calc_function(parameter_a, parameter_b):\n",
" return parameter_a + parameter_b\n",
"\n",
"\n",
"with flux.job.FluxExecutor() as flux_exe:\n",
" with Executor(\n",
" # Resource definition on the executor level\n",
" max_cores=2, # total number of cores available to the Executor\n",
" block_allocation=True, # reuse python processes\n",
" flux_executor=flux_exe,\n",
" ) as exe:\n",
" future_obj = exe.submit(\n",
" calc_function,\n",
" 1, # parameter_a\n",
" parameter_b=2,\n",
" )\n",
" print(future_obj.result())"
"with Executor(\n",
" # Resource definition on the executor level\n",
" max_cores=2, # total number of cores available to the Executor\n",
" block_allocation=True, # reuse python processes\n",
" backend=\"flux\",\n",
") as exe:\n",
" future_obj = exe.submit(\n",
" calc_function,\n",
" 1, # parameter_a\n",
" parameter_b=2,\n",
" )\n",
" print(future_obj.result())"
]
},
{
Expand Down Expand Up @@ -326,7 +318,6 @@
"metadata": {},
"outputs": [],
"source": [
"import flux.job\n",
"from executorlib import Executor\n",
"\n",
"\n",
Expand All @@ -338,15 +329,14 @@
" return {\"j\": 4, \"k\": 3, \"l\": 2}\n",
"\n",
"\n",
"with flux.job.FluxExecutor() as flux_exe:\n",
" with Executor(\n",
" max_cores=1,\n",
" init_function=init_function,\n",
" flux_executor=flux_exe,\n",
" block_allocation=True,\n",
" ) as exe:\n",
" fs = exe.submit(calc, 2, j=5)\n",
" print(fs.result())"
"with Executor(\n",
" max_cores=1,\n",
" init_function=init_function,\n",
" backend=\"flux\",\n",
" block_allocation=True,\n",
") as exe:\n",
" fs = exe.submit(calc, 2, j=5)\n",
" print(fs.result())"
]
},
{
Expand Down Expand Up @@ -458,7 +448,6 @@
"metadata": {},
"outputs": [],
"source": [
"import flux.job\n",
"from executorlib import Executor\n",
"\n",
"\n",
Expand All @@ -470,15 +459,14 @@
" return i, size, rank\n",
"\n",
"\n",
"with flux.job.FluxExecutor() as flux_exe:\n",
" with Executor(\n",
" max_cores=2,\n",
" resource_dict={\"cores\": 2},\n",
" flux_executor=flux_exe,\n",
" flux_executor_pmi_mode=\"pmix\",\n",
" ) as exe:\n",
" fs = exe.submit(calc, 3)\n",
" print(fs.result())"
"with Executor(\n",
" max_cores=2,\n",
" resource_dict={\"cores\": 2},\n",
" backend=\"flux\",\n",
" flux_executor_pmi_mode=\"pmix\",\n",
") as exe:\n",
" fs = exe.submit(calc, 3)\n",
" print(fs.result())"
]
},
{
Expand Down Expand Up @@ -518,7 +506,6 @@
"source": [
"```\n",
"import socket\n",
"import flux.job\n",
"from executorlib import Executor\n",
"from tensorflow.python.client import device_lib\n",
"\n",
Expand All @@ -529,15 +516,14 @@
" for x in local_device_protos if x.device_type == 'GPU'\n",
" ]\n",
"\n",
"with flux.job.FluxExecutor() as flux_exe:\n",
" with Executor(\n",
" max_workers=2, \n",
" gpus_per_worker=1,\n",
" executor=flux_exe,\n",
" ) as exe:\n",
" fs_1 = exe.submit(get_available_gpus)\n",
" fs_2 = exe.submit(get_available_gpus)\n",
" print(fs_1.result(), fs_2.result())\n",
"with Executor(\n",
" max_workers=2, \n",
" gpus_per_worker=1,\n",
" backend=\"flux\",\n",
") as exe:\n",
" fs_1 = exe.submit(get_available_gpus)\n",
" fs_2 = exe.submit(get_available_gpus)\n",
" print(fs_1.result(), fs_2.result())\n",
"```"
]
},
Expand Down Expand Up @@ -634,29 +620,27 @@
"metadata": {},
"outputs": [],
"source": [
"import flux.job\n",
"from executorlib import Executor\n",
"\n",
"\n",
"def calc_function(parameter_a, parameter_b):\n",
" return parameter_a + parameter_b\n",
"\n",
"\n",
"with flux.job.FluxExecutor() as flux_exe:\n",
" with Executor(max_cores=2, flux_executor=flux_exe) as exe:\n",
" future_1 = exe.submit(\n",
" calc_function,\n",
" 1,\n",
" parameter_b=2,\n",
" resource_dict={\"cores\": 1},\n",
" )\n",
" future_2 = exe.submit(\n",
" calc_function,\n",
" 1,\n",
" parameter_b=future_1,\n",
" resource_dict={\"cores\": 1},\n",
" )\n",
" print(future_2.result())"
"with Executor(max_cores=2, backend=\"flux\") as exe:\n",
" future_1 = exe.submit(\n",
" calc_function,\n",
" 1,\n",
" parameter_b=2,\n",
" resource_dict={\"cores\": 1},\n",
" )\n",
" future_2 = exe.submit(\n",
" calc_function,\n",
" 1,\n",
" parameter_b=future_1,\n",
" resource_dict={\"cores\": 1},\n",
" )\n",
" print(future_2.result())"
]
},
{
Expand Down
Loading