diff --git a/environment.yml b/environment.yml index df3ccd8..f2712b1 100644 --- a/environment.yml +++ b/environment.yml @@ -6,7 +6,7 @@ dependencies: - datalad - pip - pip: - - pydra + - pydra==0.23.0a0 - jupyter - jupyter_contrib_nbextensions - jupytext diff --git a/notebooks/2_intro_functiontask.md b/notebooks/2_intro_functiontask.md index 0760312..ace8f2d 100644 --- a/notebooks/2_intro_functiontask.md +++ b/notebooks/2_intro_functiontask.md @@ -4,16 +4,16 @@ jupytext: extension: .md format_name: myst format_version: 0.13 - jupytext_version: 1.14.0 + jupytext_version: 1.15.0 kernelspec: - display_name: Python 3 + display_name: Python 3 (ipykernel) language: python name: python3 --- # FunctionTask -```{code-cell} +```{code-cell} ipython3 --- jupyter: outputs_hidden: false @@ -29,10 +29,9 @@ nest_asyncio.apply() A `FunctionTask` is a `Task` that can be created from every *python* function by using *pydra* decorator: `pydra.mark.task`: -```{code-cell} +```{code-cell} ipython3 import pydra - @pydra.mark.task def add_var(a, b): return a + b @@ -40,54 +39,151 @@ def add_var(a, b): Once we decorate the function, we can create a pydra `Task` and specify the input: -```{code-cell} -task1 = add_var(a=4, b=5) +```{code-cell} ipython3 +task0 = add_var(a=4, b=5) ``` -We can check the type of `task1`: +We can check the type of `task0`: -```{code-cell} -type(task1) +```{code-cell} ipython3 +type(task0) ``` and we can check if the task has correct values of `a` and `b`, they should be saved in the task `inputs`: -```{code-cell} -print(f'a = {task1.inputs.a}') -print(f'b = {task1.inputs.b}') +```{code-cell} ipython3 +print(f'a = {task0.inputs.a}') +print(f'b = {task0.inputs.b}') ``` We can also check content of entire `inputs`: -```{code-cell} -task1.inputs +```{code-cell} ipython3 +task0.inputs ``` As you could see, `task.inputs` contains also information about the function, that is an inseparable part of the `FunctionTask`. Once we have the task with set input, we can run it. Since `Task` is a "callable object", we can use the syntax: -```{code-cell} -task1() +```{code-cell} ipython3 +task0() ``` As you can see, the result was returned right away, but we can also access it later: -```{code-cell} -task1.result() +```{code-cell} ipython3 +task0.result() ``` `Result` contains more than just an output, so if we want to get the task output, we can type: -```{code-cell} -result = task1.result() +```{code-cell} ipython3 +result = task0.result() result.output.out ``` And if we want to see the input that was used in the task, we can set an optional argument `return_inputs` to True. -```{code-cell} -task1.result(return_inputs=True) +```{code-cell} ipython3 +task0.result(return_inputs=True) +``` + +## Type-checking + ++++ + +### What is Type-checking? + +Type-checking is verifying the type of a value at compile or run time. It ensures that operations or assignments to variables are semantically meaningful and can be executed without type errors, enhancing code reliability and maintainability. + ++++ + +### Why Use Type-checking? + +1. **Error Prevention**: Type-checking helps catch type mismatches early, preventing potential runtime errors. +2. **Improved Readability**: Type annotations make understanding what types of values a function expects and returns more straightforward. +3. **Better Documentation**: Explicitly stating expected types acts as inline documentation, simplifying code collaboration and review. +4. **Optimized Performance**: Type-related optimizations can be made during compilation when types are explicitly specified. + ++++ + +### How is Type-checking Implemented in Pydra? + ++++ + +#### Static Type-Checking +Static type-checking is done using Python's type annotations. You annotate the types of your function arguments and the return type and then use a tool like `mypy` to statically check if you're using the function correctly according to those annotations. + +```{code-cell} ipython3 +@pydra.mark.task +def add(a: int, b: int) -> int: + return a + b +``` + +```{code-cell} ipython3 +# This usage is correct according to static type hints: +task1a = add(a=5, b=3) +task1a() +``` + +```{code-cell} ipython3 +:tags: [raises-exception] +# This usage is incorrect according to static type hints: +task1b = add(a="hello", b="world") +task1b() +``` + +#### Dynamic Type-Checking + +Dynamic type-checking is done at runtime. Add dynamic type checks if you want to enforce types when the function is executed. + +```{code-cell} ipython3 +@pydra.mark.task +def add(a, b): + if not (isinstance(a, int) and isinstance(b, int)): + raise TypeError("Both inputs should be integers.") + return a + b +``` + +```{code-cell} ipython3 +# This usage is correct and will not raise a runtime error: +task1c = add(a=5, b=3) +task1c() +``` + +```{code-cell} ipython3 +:tags: [raises-exception] +# This usage is incorrect and will raise a runtime TypeError: +task1d = add(a="hello", b="world") +task1d() +``` + +#### Checking Complex Types + +For more complex types like lists, dictionaries, or custom objects, we can use type hints combined with dynamic checks. + +```{code-cell} ipython3 +from typing import List, Tuple + +@pydra.mark.task +def sum_of_pairs(pairs: List[Tuple[int, int]]) -> List[int]: + if not all(isinstance(pair, Tuple) and len(pair) == 2 for pair in pairs): + raise ValueError("Input should be a list of pairs (tuples with 2 integers each).") + return [sum(pair) for pair in pairs] +``` + +```{code-cell} ipython3 +# Correct usage +task1e = sum_of_pairs(pairs=[(1, 2), (3, 4)]) +task1e() +``` + +```{code-cell} ipython3 +:tags: [raises-exception] +# This will raise a ValueError +task1f = sum_of_pairs(pairs=[(1, 2), (3, "4")]) +task1f() ``` ## Customizing output names @@ -95,46 +191,44 @@ Note, that "out" is the default name for the task output, but we can always cust Let's start from the function annotation: -```{code-cell} +```{code-cell} ipython3 import typing as ty - @pydra.mark.task -def add_var_an(a, b) -> {'sum_a_b': int}: +def add_var_an(a: int, b: int) -> {'sum_a_b': int}: return a + b -task1a = add_var_an(a=4, b=5) -task1a() +task2a = add_var_an(a=4, b=5) +task2a() ``` The annotation might be very useful to specify the output names when the function returns multiple values. -```{code-cell} +```{code-cell} ipython3 @pydra.mark.task -def modf_an(a) -> {'fractional': ty.Any, 'integer': ty.Any}: +def modf_an(a: float) -> {'fractional': ty.Any, 'integer': ty.Any}: import math return math.modf(a) -task2 = modf_an(a=3.5) -task2() +task2b = modf_an(a=3.5) +task2b() ``` The second way of customizing the output requires another decorator - `pydra.mark.annotate` -```{code-cell} +```{code-cell} ipython3 @pydra.mark.task @pydra.mark.annotate({'return': {'fractional': ty.Any, 'integer': ty.Any}}) -def modf(a): +def modf(a: float): import math return math.modf(a) - -task2a = modf(a=3.5) -task2a() +task2c = modf(a=3.5) +task2c() ``` **Note, that the order of the pydra decorators is important!** @@ -145,7 +239,7 @@ task2a() We don't have to provide the input when we create a task, we can always set it later: -```{code-cell} +```{code-cell} ipython3 task3 = add_var() task3.inputs.a = 4 task3.inputs.b = 5 @@ -154,7 +248,7 @@ task3() If we don't specify the input, `attr.NOTHING` will be used as the default value -```{code-cell} +```{code-cell} ipython3 task3a = add_var() task3a.inputs.a = 4 @@ -166,7 +260,7 @@ task3a.inputs.b == attr.NOTHING And if we try to run the task, an error will be raised: -```{code-cell} +```{code-cell} ipython3 :tags: [raises-exception] task3a() @@ -176,62 +270,61 @@ task3a() After running the task, we can check where the output directory with the results was created: -```{code-cell} +```{code-cell} ipython3 task3.output_dir ``` Within the directory you can find the file with the results: `_result.pklz`. -```{code-cell} +```{code-cell} ipython3 import os ``` -```{code-cell} +```{code-cell} ipython3 os.listdir(task3.output_dir) ``` But we can also provide the path where we want to store the results. If a path is provided for the cache directory, then pydra will use the cached results of a node instead of recomputing the result. Let's create a temporary directory and a specific subdirectory "task4": -```{code-cell} +```{code-cell} ipython3 from tempfile import mkdtemp from pathlib import Path ``` -```{code-cell} +```{code-cell} ipython3 cache_dir_tmp = Path(mkdtemp()) / 'task4' print(cache_dir_tmp) ``` Now we can pass this path to the argument of `FunctionTask` - `cache_dir`. To observe the execution time, we specify a function that is sleeping for 5s: -```{code-cell} +```{code-cell} ipython3 @pydra.mark.task -def add_var_wait(a, b): +def add_var_wait(a: int, b: int): import time time.sleep(5) return a + b - task4 = add_var_wait(a=4, b=6, cache_dir=cache_dir_tmp) ``` If you're running the cell first time, it should take around 5s. -```{code-cell} +```{code-cell} ipython3 task4() task4.result() ``` We can check `output_dir` of our task, it should contain the path of `cache_dir_tmp` and the last part contains the name of the task class `FunctionTask` and the task checksum: -```{code-cell} +```{code-cell} ipython3 task4.output_dir ``` Let's see what happens when we defined identical task again with the same `cache_dir`: -```{code-cell} +```{code-cell} ipython3 task4a = add_var_wait(a=4, b=6, cache_dir=cache_dir_tmp) task4a() ``` @@ -240,7 +333,7 @@ This time the result should be ready right away! *pydra* uses available results *pydra* not only checks for the results in `cache_dir`, but you can provide a list of other locations that should be checked. Let's create another directory that will be used as `cache_dir` and previous working directory will be used in `cache_locations`. -```{code-cell} +```{code-cell} ipython3 cache_dir_tmp_new = Path(mkdtemp()) / 'task4b' task4b = add_var_wait( @@ -251,13 +344,13 @@ task4b() This time the results should be also returned quickly! And we can check that `task4b.output_dir` was not created: -```{code-cell} +```{code-cell} ipython3 task4b.output_dir.exists() ``` If you want to rerun the task regardless having already the results, you can set `rerun` to `True`. The task will take several seconds and new `output_dir` will be created: -```{code-cell} +```{code-cell} ipython3 cache_dir_tmp_new = Path(mkdtemp()) / 'task4c' task4c = add_var_wait( @@ -270,7 +363,7 @@ task4c.output_dir.exists() If we update the input of the task, and run again, the new directory will be created and task will be recomputed: -```{code-cell} +```{code-cell} ipython3 task4b.inputs.a = 1 print(task4b()) print(task4b.output_dir.exists()) @@ -278,7 +371,7 @@ print(task4b.output_dir.exists()) and when we check the `output_dir`, we can see that it's different than last time: -```{code-cell} +```{code-cell} ipython3 task4b.output_dir ``` @@ -289,23 +382,22 @@ This is because, the checksum changes when we change either input or function. ### Exercise 1 Create a task that take a list of numbers as an input and returns two fields: `mean` with the mean value and `std` with the standard deviation value. -```{code-cell} +```{code-cell} ipython3 :tags: [hide-cell] @pydra.mark.task @pydra.mark.annotate({'return': {'mean': ty.Any, 'std': ty.Any}}) -def mean_dev(my_list): +def mean_dev(my_list: List): import statistics as st return st.mean(my_list), st.stdev(my_list) - my_task = mean_dev(my_list=[2, 2, 2]) my_task() my_task.result() ``` -```{code-cell} +```{code-cell} ipython3 # write your solution here (you can use statistics module) ``` @@ -315,7 +407,7 @@ my_task.result() `AuditFlag.RESOURCE` allows you to monitor resource usage for the `Task`, while `AuditFlag.PROV` tracks the provenance of the `Task`. -```{code-cell} +```{code-cell} ipython3 from pydra.utils.messenger import AuditFlag, PrintMessenger task5 = add_var(a=4, b=5, audit_flags=AuditFlag.RESOURCE) @@ -325,7 +417,7 @@ task5.result() One can turn on both audit flags using `AuditFlag.ALL`, and print the messages on the terminal using the `PrintMessenger`. -```{code-cell} +```{code-cell} ipython3 task5 = add_var( a=4, b=5, audit_flags=AuditFlag.ALL, messengers=PrintMessenger() ) diff --git a/notebooks/3_intro_functiontask_state.md b/notebooks/3_intro_functiontask_state.md index 6db829a..3bf2d2b 100644 --- a/notebooks/3_intro_functiontask_state.md +++ b/notebooks/3_intro_functiontask_state.md @@ -4,9 +4,9 @@ jupytext: extension: .md format_name: myst format_version: 0.13 - jupytext_version: 1.14.0 + jupytext_version: 1.15.0 kernelspec: - display_name: Python 3 + display_name: Python 3 (ipykernel) language: python name: python3 --- @@ -17,7 +17,7 @@ Task might be run for a single set of input values or we can generate multiple s Let's start from a simple `FunctionTask` that takes a list as an input: -```{code-cell} +```{code-cell} ipython3 --- jupyter: outputs_hidden: false @@ -31,7 +31,7 @@ import nest_asyncio nest_asyncio.apply() ``` -```{code-cell} +```{code-cell} ipython3 import pydra @@ -45,31 +45,31 @@ task1 = add_two(x=[1, 2, 3]) Before we set any splitter, the task's `state` should be `None` -```{code-cell} +```{code-cell} ipython3 task1.state is None ``` -Now, we can set the `splitter` by using the `split` method. Since our task has only one input, there is only one option to create a set of inputs, i.e. `splitter="x"`: +Now, we can set the `splitter` using the `split` method. Since our task has only one input, there is only one option to create a set of inputs, i.e. `split(splitter='x', x=[1, 2, 3])`; make sure you define the value of `x` in the splitter as you did in `task`: -```{code-cell} -task1.split('x') +```{code-cell} ipython3 +task1.split('x', x=[1, 2, 3]) ``` Now, we can check that our task has a `state`: -```{code-cell} +```{code-cell} ipython3 task1.state ``` And we can print information about the state: -```{code-cell} +```{code-cell} ipython3 print(task1.state) ``` within the `state` information about the splitter has been stored: -```{code-cell} +```{code-cell} ipython3 task1.state.splitter ``` @@ -77,20 +77,20 @@ Note, that *pydra* adds name of the function to the name of the input. Now, we can run the task and check results: -```{code-cell} +```{code-cell} ipython3 task1() task1.result() ``` We can also return results together with values of the input, we just have to set an additional argument `return_inputs` to `True` (or `val`) -```{code-cell} +```{code-cell} ipython3 task1.result(return_inputs=True) ``` If we want to return indices instead of values, we can set `return_inputs` to `ind` -```{code-cell} +```{code-cell} ipython3 task1.result(return_inputs='ind') ``` @@ -106,7 +106,7 @@ For tasks with a state *pydra* prepare all sets of inputs and run the task for e We can also use `State` for functions with multiple inputs: -```{code-cell} +```{code-cell} ipython3 @pydra.mark.task def add_var(a, b): return a + b @@ -114,8 +114,8 @@ def add_var(a, b): Now we have more options to define `splitter`, it depends on the type of inputs and on our application. For example, we could have `a` that is a list, `b` that is a single value, and split over `a` values: -```{code-cell} -task2 = add_var(a=[1, 2, 3], b=10).split('a') +```{code-cell} ipython3 +task2 = add_var(a=[1, 2, 3], b=10).split('a', a=[1, 2, 3]) task2() task2.result() ``` @@ -130,7 +130,7 @@ Now we have three results for each element from the `a` list and the value of `b But we can have lists for both inputs, and use both inputs in the splitter. Let's assume that `a` and `b` are two elements lists. -```{code-cell} +```{code-cell} ipython3 task3 = add_var(a=[1, 2], b=[10, 100]) ``` @@ -144,8 +144,8 @@ Now, we have two options to map the input values, we might want to run the task Let's start from the scalar splitter, that uses parentheses in the syntax: -```{code-cell} -task3.split(('a', 'b')) +```{code-cell} ipython3 +task3.split(('a', 'b'), a=[1, 2], b=[10, 100]) task3() task3.result() ``` @@ -164,9 +164,9 @@ We can represent the execution by the graph: For the outer splitter we will use brackets: -```{code-cell} +```{code-cell} ipython3 task4 = add_var(a=[1, 2], b=[10, 100]) -task4.split(['a', 'b']) +task4.split(['a', 'b'], a=[1, 2], b=[10, 100]) task4() task4.result() ``` @@ -181,17 +181,17 @@ Now, we have results for all of the combinations of values from `a` and `b`. Note, that once you set the splitter, you will get error when you try to set the splitter again. However, you can always set `overwrite` to `True` if you really intend to change the splitter. -```{code-cell} +```{code-cell} ipython3 :tags: [raises-exception] -task4.split(('a', 'b')) +task4.split(('a', 'b'), a=[1, 2], b=[10, 100]) ``` For more inputs we can create more complex splitter, and use scalar and outer splitters together. **Note, that the scalar splitter can only work for lists that have the same length, but the outer splitter doesn't have this limitation.** Let's run one more example that takes four inputs, `x` and `y` components of two vectors, and calculates all possible sums of vectors. `x` components should be kept together with corresponding `y` components (i.e. scalar splitters: `("x1", "y1")` and `("x2", "y2")`), but we should use outer splitter for two vectors to get all combinations. -```{code-cell} +```{code-cell} ipython3 @pydra.mark.task def add_vector(x1, y1, x2, y2): return (x1 + x2, y1 + y2) @@ -205,7 +205,8 @@ task5 = add_vector( x2=[10, 20, 30], y2=[10, 20, 30], ) -task5.split(splitter=[('x1', 'y1'), ('x2', 'y2')]) +task5.split(splitter=[('x1', 'y1'), ('x2', 'y2')], + x1=[10, 20], y1=[1, 2], x2=[10, 20, 30], y2=[10, 20, 30]) task5() task5.result() ``` @@ -220,9 +221,9 @@ When we use `splitter`, we can also define `combiner`, if we want to combine tog If we take the `task4` as an example and combine all results for each element of the input `b`, we can modify the task as follows: -```{code-cell} +```{code-cell} ipython3 task5 = add_var(a=[1, 2], b=[10, 100]) -task5.split(['a', 'b']) +task5.split(['a', 'b'], a=[1, 2], b=[10, 100]) # adding combiner task5.combine('b') task5() @@ -231,7 +232,7 @@ task5.result() Now our result contains two elements, each one is a list. The first one contains results for `a=1` and both values of `b`, and the second contains results for `a=2` and both values of `b`. Let's print the result again using `return_inputs`: -```{code-cell} +```{code-cell} ipython3 all_results = task5.result(return_inputs=True) print(f'first list, a=1: {all_results[0]}') print(f'\n second list, a=2: {all_results[1]}') @@ -243,9 +244,9 @@ print(f'\n second list, a=2: {all_results[1]}') But we could also group all elements from the input `a` and have a different combined output: -```{code-cell} +```{code-cell} ipython3 task6 = add_var(a=[1, 2], b=[10, 100]) -task6.split(['a', 'b']) +task6.split(['a', 'b'], a=[1, 2], b=[10, 100]) # changing the combiner task6.combine('a') task6() @@ -254,7 +255,7 @@ task6.result() We still have two elements in our results, but this time the first element contains results for `b=10` and both values of `a`, and the second contains results for `b=100` and both values of `a`. -```{code-cell} +```{code-cell} ipython3 all_results = task6.result(return_inputs=True) print(f'first list, b=10: {all_results[0]}') print(f'\n second list, b=100: {all_results[1]}') @@ -266,9 +267,9 @@ print(f'\n second list, b=100: {all_results[1]}') We can also combine all elements by providing a list of all inputs to the `combiner`: -```{code-cell} +```{code-cell} ipython3 task7 = add_var(a=[1, 2], b=[10, 100]) -task7.split(['a', 'b']) +task7.split(['a', 'b'], a=[1, 2], b=[10, 100]) # combining all inputs task7.combine(['a', 'b']) task7() @@ -287,7 +288,7 @@ This time the output contains one element that is a list of all outputs: Note that list can be used as an input even without using any splitter, there are functions that take a list as a single input value: -```{code-cell} +```{code-cell} ipython3 @pydra.mark.task def moment(lst, n): return sum([i**n for i in lst]) / len(lst) @@ -307,7 +308,7 @@ Let's say we want to calculate squares and cubes of integers from 2 to 5, and co First we will define a function that returns powers: -```{code-cell} +```{code-cell} ipython3 :tags: [hide-cell] @pydra.mark.task @@ -317,17 +318,17 @@ def power(x, n): Now we can create a task that takes two lists as its input, outer splitter for `x` and `n`, and combine all `x`: -```{code-cell} +```{code-cell} ipython3 :tags: [hide-cell] -task_ex1 = power(x=[2, 3, 4, 5], n=[2, 3]).split(['x', 'n']).combine('x') +task_ex1 = power(x=[2, 3, 4, 5], n=[2, 3]).split(['x', 'n'], x=[2, 3, 4, 5], n=[2, 3]).combine('x') task_ex1() task_ex1.result() ``` The result should contain two list, the first one is for squares, the second for cubes. -```{code-cell} +```{code-cell} ipython3 :tags: [hide-cell] squares_list = [el.output.out for el in task_ex1.result()[0]] @@ -340,7 +341,7 @@ print(f'cubes: {cubes_list}') We run task multiple times for multiple sets of input, but we didn't talk about the execution time. Let's create a function that sleeps for a second and run for four values: -```{code-cell} +```{code-cell} ipython3 import time @@ -350,7 +351,7 @@ def add_two_sleep(x): return x + 2 -task9 = add_two_sleep(x=[1, 2, 3, 4]).split('x') +task9 = add_two_sleep(x=[1, 2, 3, 4]).split('x', x=[1, 2, 3, 4]) t0 = time.time() task9() print(f'total time: {time.time() - t0}') @@ -363,8 +364,8 @@ If we run `Task` that has a `State`, pydra will automatically create a `Submitte We could also create a `Submitter` first, and than use it to run the task: -```{code-cell} -task10 = add_two_sleep(x=[1, 2, 3, 4]).split('x') +```{code-cell} ipython3 +task10 = add_two_sleep(x=[1, 2, 3, 4]).split('x', x=[1, 2, 3, 4]) t0 = time.time() with pydra.Submitter(plugin='cf') as sub: @@ -375,8 +376,8 @@ print(f'results: {task10.result()}') or we can provide the name of the plugin: -```{code-cell} -task11 = add_two_sleep(x=[1, 2, 3, 4]).split('x') +```{code-cell} ipython3 +task11 = add_two_sleep(x=[1, 2, 3, 4]).split('x', x=[1, 2, 3, 4]) t0 = time.time() task11(plugin='cf') @@ -386,8 +387,8 @@ print(f'results: {task11.result()}') The last option for running the task is to create a `Submitter` first and run the submitter (`Submitter` is also a callable object) with the task as a `runnable`: -```{code-cell} -task12 = add_two_sleep(x=[1, 2, 3, 4]).split('x') +```{code-cell} ipython3 +task12 = add_two_sleep(x=[1, 2, 3, 4]).split('x', x=[1, 2, 3, 4]) t0 = time.time() with pydra.Submitter(plugin='cf') as sub: @@ -398,8 +399,8 @@ print(f'results: {task12.result()}') All of the execution time should be similar, since all tasks are run by *pydra* in the same way, i.e. *pydra* creates a submitter with `ConcurrentFutures` worker, if a number of processors is not provided, `ConcurrentFutures` takes all available processors as `max_workers`. However, if we want to set a specific number of processors, we can set it using `n_procs` when creating a `Submitter`. Let's see how the execution time changes when we use `n_procs=2`. -```{code-cell} -task13 = add_two_sleep(x=[1, 2, 3, 4]).split('x') +```{code-cell} ipython3 +task13 = add_two_sleep(x=[1, 2, 3, 4]).split('x', x=[1, 2, 3, 4]) t0 = time.time() with pydra.Submitter(plugin='cf', n_procs=2) as sub: diff --git a/notebooks/4_intro_workflow.md b/notebooks/4_intro_workflow.md index 0a150ca..1cad004 100644 --- a/notebooks/4_intro_workflow.md +++ b/notebooks/4_intro_workflow.md @@ -4,16 +4,16 @@ jupytext: extension: .md format_name: myst format_version: 0.13 - jupytext_version: 1.14.0 + jupytext_version: 1.15.0 kernelspec: - display_name: Python 3 + display_name: Python 3 (ipykernel) language: python name: python3 --- # Workflow -```{code-cell} +```{code-cell} ipython3 --- jupyter: outputs_hidden: false @@ -27,7 +27,7 @@ import nest_asyncio nest_asyncio.apply() ``` -```{code-cell} +```{code-cell} ipython3 import pydra # functions used later in the notebook: @@ -52,25 +52,25 @@ In order to run multiple tasks within one pipeline, we use another *pydra* class Let's start from a workflow with a single task that has one input `x`. When we create a `Workflow`, we have to specify `input_spec` that contains all of the workflow inputs: -```{code-cell} +```{code-cell} ipython3 wf1 = pydra.Workflow(name='wf1', input_spec=['x'], x=3) ``` Now, we can add a task and specify that `x` will be taken from the workflow input by using so-called *Lazy Input*, `x=wf1.lzin.x`. We should also add the `name` to the task we are using in the `Workflow`. -```{code-cell} +```{code-cell} ipython3 wf1.add(add_two(name='sum', x=wf1.lzin.x)) ``` Now, we can access the task by using the task name: -```{code-cell} +```{code-cell} ipython3 wf1.sum ``` We have to also specify what would be the workflow output, for this one-task workflow, we simply take the output of `sum` and we use *Lazy Output* to set it to `wf.output.out`: -```{code-cell} +```{code-cell} ipython3 wf1.set_output([('out', wf1.sum.lzout.out)]) ``` @@ -80,7 +80,7 @@ We could also use dictionary to set the output - `wf1.set_output({"out": wf1.sum Now, we are ready to run the workflow: -```{code-cell} +```{code-cell} ipython3 with pydra.Submitter(plugin='cf') as sub: sub(wf1) @@ -99,7 +99,7 @@ We could think about the workflow as follows: the workflow has an input `x` that You can add as many tasks as you want to the workflow and return multiple variables: -```{code-cell} +```{code-cell} ipython3 wf2 = pydra.Workflow(name='wf2', input_spec=['x'], x=3) wf2.add(add_two(name='add_two', x=wf2.lzin.x)) wf2.add(power(name='power', a=wf2.lzin.x)) @@ -129,7 +129,7 @@ The previous example showed a workflow with two nodes, but they were not connect If we want to connect the tasks with each other, we have to set the input of the second task to the output of the first task, and we use again the `Lazy Output` concept: -```{code-cell} +```{code-cell} ipython3 wf3 = pydra.Workflow(name='wf3', input_spec=['x'], x=3) wf3.add(add_two(name='sum', x=wf3.lzin.x)) # by setting a=wf3.sum.lzout.out we create a connection @@ -145,7 +145,7 @@ wf3.result() Now, we could see that the second task took an input from the first one: -```{code-cell} +```{code-cell} ipython3 wf3.power.inputs.a ``` @@ -159,7 +159,7 @@ So this time the workflow graph will look like this: The node can be connected to multiple nodes, we can modify `wf` to add additional node that uses `mult_var` to multiple the outputs of two previous tasks: -```{code-cell} +```{code-cell} ipython3 wf4 = pydra.Workflow(name='wf4', input_spec=['x'], x=3) wf4.add(add_two(name='add_two', x=wf4.lzin.x)) wf4.add(power(name='power', a=wf4.lzin.x)) @@ -187,7 +187,7 @@ Previously we had workflows that had `Task`s as nodes, but *pydra* treats `Workf Let's modify the previous workflow, and instead of `sum` and `power` tasks we use `wf2` as the first node: -```{code-cell} +```{code-cell} ipython3 wf2a = pydra.Workflow(name='wf2a', input_spec=['x']) wf2a.add(add_two(name='add_two', x=wf2a.lzin.x)) wf2a.add(power(name='power', a=wf2a.lzin.x)) @@ -223,7 +223,7 @@ We should get exactly the same result as previously, but this time we run `wf2a` Workflow as any other task could also have a splitter. Let's take one of our previous workflows and add a splitter for the workflow input by setting `splitter` using the `split` method. -```{code-cell} +```{code-cell} ipython3 wf6 = pydra.Workflow(name='wf6', input_spec=['x']) # setting a plitter for the entire workflow wf6.split('x', x=[3, 5]) @@ -251,7 +251,7 @@ Behind the scene *pydra* expanded two workflows for two values of the workflow i Let's create a new workflow that has two inputs and more complicated splitter. -```{code-cell} +```{code-cell} ipython3 wf7 = pydra.Workflow(name='wf7', input_spec=['x', 'y']) wf7.split(['x', 'y'], x=[3, 5], y=[2, 3]) wf7.add(add_two(name='sum', x=wf7.lzin.x)) @@ -278,7 +278,7 @@ We should have four results for four sets of inputs, and the graph should look l In the same way as we did for `Task`, we can add a `combiner` to the entire workflow: -```{code-cell} +```{code-cell} ipython3 wf7.combine('x') with pydra.Submitter(plugin='cf') as sub: @@ -301,7 +301,7 @@ We presented how to set a `splitter` and a `combiner` for entire workflow, but w Let's create a workflow that takes a list as an input, and pass this input to two nodes. One node can take entire list as its input and the second node splits the input: -```{code-cell} +```{code-cell} ipython3 @pydra.mark.task def mean(x_list): return sum(x_list) / len(x_list) @@ -310,7 +310,7 @@ def mean(x_list): wf8 = pydra.Workflow(name='wf8', input_spec=['x'], x=[3, 5, 7]) wf8.add(mean(name='mean', x_list=wf8.lzin.x)) # adding a task that has its own splitter -wf8.add(power(name='power', a=wf8.lzin.x).split('a')) +wf8.add(power(name='power', a=wf8.lzin.x).split('a', a=wf8.lzin.x)) wf8.set_output([('out_m', wf8.mean.lzout.out), ('out_p', wf8.power.lzout.out)]) diff --git a/notebooks/6_firstlevel_glm_nilearn.md b/notebooks/6_firstlevel_glm_nilearn.md index 3ea6aef..8717481 100644 --- a/notebooks/6_firstlevel_glm_nilearn.md +++ b/notebooks/6_firstlevel_glm_nilearn.md @@ -4,15 +4,13 @@ jupytext: extension: .md format_name: myst format_version: 0.13 - jupytext_version: 1.13.8 + jupytext_version: 1.15.0 kernelspec: display_name: Python 3 (ipykernel) language: python name: python3 --- -+++ {"tags": []} - # First Level GLM (from Nilearn) +++ @@ -26,8 +24,6 @@ import nest_asyncio nest_asyncio.apply() ``` -+++ {"tags": []} - ## Preparation Import packages that will be used globally and set up output directory @@ -76,7 +72,9 @@ workflow_out_dir = workflow_dir / '6_glm' os.makedirs(workflow_out_dir, exist_ok=True) ``` -+++ {"tags": []} +```{code-cell} ipython3 +workflow_out_dir +``` ## Create tasks @@ -214,6 +212,7 @@ What we are doing here is: 1. use the design matrix to fit the first level model 2. compute the contrast 3. save the z_map and masker for futher use +4. generate a glm report (HTML file) ```{code-cell} ipython3 @pydra.mark.task @@ -223,7 +222,7 @@ What we are doing here is: 'imgs': ty.Any, 'dm_path': ty.Any, 'contrast': str, - 'return': {'model': ty.Any, 'z_map_path': str, 'masker': ty.Any}, + 'return': {'model': ty.Any, 'z_map_path': str, 'masker': ty.Any, 'glm_report_file': str}, } ) def model_fit(model, imgs, dm_path, contrast): @@ -234,12 +233,15 @@ def model_fit(model, imgs, dm_path, contrast): z_map.to_filename(z_map_path) masker_path = os.path.join(workflow_out_dir, 'firstlevel_masker.nii.gz') masker = model.masker_ - return model, z_map_path, masker + glm_report_file = os.path.join(workflow_out_dir, 'glm_report.html') + report = make_glm_report(model, contrast) + report.save_as_html(glm_report_file) + return model, z_map_path, masker, glm_report_file ``` -### Get cluster table and glm report +### Get cluster table -For publication purposes, we obtain a cluster table and a summary report. +For publication purposes, we obtain a cluster table. ```{code-cell} ipython3 @pydra.mark.task @@ -252,18 +254,6 @@ def cluster_table(z_map_path): ) df.to_csv(output_file, index=None) return output_file - - -# get glm report -@pydra.mark.task -@pydra.mark.annotate( - {'model': ty.Any, 'contrasts': str, 'return': {'output_file': str}} -) -def glm_report(model, contrasts): - output_file = os.path.join(workflow_out_dir, 'glm_report.html') - report = make_glm_report(model, contrasts) - report.save_as_html(output_file) - return output_file ``` ### Make plots @@ -342,7 +332,9 @@ def plots(data_dir,dm_path,z_map_path,contrast,subject,masker): ) old = os.path.join(workflow_out_dir, '0000.png') new = os.path.join(workflow_out_dir, 'nilearn_fsl_comp.jpg') - output_file3 = os.rename(old, new) + os.rename(old, new) + output_file3 = new + print(output_file3) # plot and save design matrix contrast design_matrix = pd.read_csv(dm_path) @@ -351,8 +343,6 @@ def plots(data_dir,dm_path,z_map_path,contrast,subject,masker): return output_file1, output_file2, output_file3, output_file4 ``` -+++ {"tags": []} - ## Make a workflow from tasks Now we have created all tasks we need for this first level analysis, and there are two choices for our next step. @@ -419,14 +409,6 @@ wf_firstlevel.add( z_map_path=wf_firstlevel.l1estimation.lzout.z_map_path, ) ) -# add task - glm_report -wf_firstlevel.add( - glm_report( - name='glm_report', - model=wf_firstlevel.l1estimation.lzout.model, - contrasts=wf_firstlevel.lzin.contrast, - ) -) # specify output wf_firstlevel.set_output( [ @@ -435,13 +417,11 @@ wf_firstlevel.set_output( ('subject', wf_firstlevel.get_info_from_bids.lzout.subject), ('dm_path', wf_firstlevel.get_designmatrix.lzout.dm_path), ('cluster_table', wf_firstlevel.cluster_table.lzout.output_file), - ('glm_report', wf_firstlevel.glm_report.lzout.output_file), + ('glm_report', wf_firstlevel.l1estimation.lzout.glm_report_file), ] ) ``` -+++ {"tags": []} - ## The overaching workflow Connect other tasks and the above workflow into one @@ -523,8 +503,6 @@ results = wf.result() print(results) ``` -+++ {"tags": []} - ## Visualization +++ @@ -581,8 +559,6 @@ Image(filename='../outputs/6_glm/fsl_z_map.jpg') Image(filename='../outputs/6_glm/nilearn_fsl_comp.jpg') ``` -+++ {"tags": []} - ## Exercise +++ diff --git a/notebooks/7_twolevel_glm_nilearn.md b/notebooks/7_twolevel_glm_nilearn.md index 4888fda..b5647d8 100644 --- a/notebooks/7_twolevel_glm_nilearn.md +++ b/notebooks/7_twolevel_glm_nilearn.md @@ -5,7 +5,7 @@ jupytext: extension: .md format_name: myst format_version: 0.13 - jupytext_version: 1.13.8 + jupytext_version: 1.15.0 kernelspec: display_name: Python 3 (ipykernel) language: python @@ -66,7 +66,7 @@ pydra_tutorial_dir = os.path.dirname(os.getcwd()) # set up output directory workflow_dir = Path(pydra_tutorial_dir) / 'outputs' -workflow_out_dir = workflow_dir / '7_glm' /'results' +workflow_out_dir = workflow_dir / '9_glm' /'results' # create folders if not exit os.makedirs(workflow_out_dir, exist_ok=True) @@ -108,8 +108,6 @@ We need to get four types of data from two folders: 4. confounds: `*desc-confounds_timeseries.tsv` from `fmriprep_path` (this is implicitly needed by `load_confounds_strategy`) ```{code-cell} ipython3 -:tags: [] - @pydra.mark.task @pydra.mark.annotate( { @@ -275,8 +273,6 @@ def firstlevel_estimation(subj_id, run_id, subj_imgs, subj_masks, smoothing_fwhm This workflow include GLM for each run. ```{code-cell} ipython3 -:tags: [] - # initiate the first-level GLM workflow wf_firstlevel = Workflow( name='wf_firstlevel', @@ -293,7 +289,7 @@ wf_firstlevel = Workflow( ], ) -wf_firstlevel.split('run_id') +wf_firstlevel.split('run_id', run_id = wf_firstlevel.lzin.run_id) # add task - get_firstlevel_dm wf_firstlevel.add( get_firstlevel_dm( @@ -360,7 +356,7 @@ Before we move to the second(group) level, we need to average results from all t } ) def get_fixed_effcts(subj_id, subj_masks, contrasts, effect_size_path_dict_list, effect_variance_path_dict_list): - + print(f"contrast:{contrast}") print(f'Compute fixed effects for subject-{subj_id}...') # average mask across three runs mean_mask = math_img('np.mean(img, axis=-1)', img=subj_masks) @@ -404,7 +400,7 @@ wf_fixed_effect = Workflow( ], ) -wf_fixed_effect.split('subj_id') +wf_fixed_effect.split('subj_id', subj_id = wf_fixed_effect.lzin.subj_id) # add task - get_subj_file wf_fixed_effect.add( get_subjdata( @@ -444,6 +440,8 @@ wf_fixed_effect.set_output( ('fx_t_test_list', wf_fixed_effect.get_fixed_effcts.lzout.fixed_fx_ttest_path_dict), ] ) + +print(wf_fixed_effect.lzout.first_level_contrast) ``` ## Second-Level GLM @@ -453,7 +451,7 @@ The second level GLM, as known as the group level, averages results across subje - fit the second-level GLM - statistical testing -+++ {"tags": []} ++++ ### Get the second level design matrix @@ -739,11 +737,11 @@ wf = Workflow( input_spec=['n_subj'], ) -wf.inputs.n_subj = 5 +wf.inputs.n_subj = 2 # randomly choose subjects wf_fixed_effect.inputs.subj_id = random.sample(range(1,17), wf.inputs.n_subj) -wf_fixed_effect.inputs.run_id =[1,2,3] +wf_fixed_effect.inputs.run_id =[1,2] wf_fixed_effect.inputs.tr = 2.3 wf_fixed_effect.inputs.n_scans = 300 wf_fixed_effect.inputs.hrf_model = 'glover'