Skip to content

Best practices for linking outputs of nested workflows into parent workflows? #2423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
dPys opened this issue Feb 1, 2018 · 9 comments
Closed

Comments

@dPys
Copy link

dPys commented Feb 1, 2018

What is the preferable way to link outputs of a nested workflow back into its parent workflow? Does anyone have a really good example (e.g. from fmriprep) where this is done?

Specifically, how might the workflow_selector function of this workflow be restructured so as to eliminate lines 421-434, where the results of a nested workflow ("sub_func_wf") that is run within the "imp_est" node of the main workflow is queried to retrieve the variable "est_path"?
...After "est_path" is fetched, the main workflow proceeds to compile lists of 8 iterfields including est_path from the nested workflow (right now, this is done explicitly with a function, rather than with pure nipype for various reasons outside of the scope of this issue) and then sends those iterfields to the "extractnetstats" node of the main workflow, and so forth...

@mgxd
Copy link
Member

mgxd commented Feb 2, 2018

@dpisner453

You may want to make a meta workflow that takes in either sub_func_wf or sub_struct_wf and adds a new Function Node wrapping pynets.utils.compile_iterfields - I've attached a little example below:

from nipype import Node, Workflow, Function

# ensure sub_func_wf and sub_struct_wf are defined
base_wf = sub_func_wf if sub_func_wf else sub_struct_wf

# here we define a workflow that will contain base_wf
meta_wf = Workflow(name='meta')
meta_wf.add_nodes([base_wf])

# you'll need these imports for the function to run
imports = ['import sys',
           'import os',
           'import nibabel as nib'
           'import numpy as np',
           'from pynets import utils']

comp_iter = Node(Function(function=utils.compile_iterfields), 
                 name='compile_iterfields')
# there are a lot of parameters...
comp_iter.inputs.<<INPUTS>> = <<INPUT>>
meta_wf.connect(base_wf, "thresh_and_fit_node.est_path", comp_iter, "est_path")

eg = meta_wf.run(plugin="MultiProc")
outputs = [x for x in egg.nodes() if x.name == 'compile_iterfields'][0].result.outputs

return (outputs.thr, outputs.est_path, outputs.ID, ...)

This is scalable - you could continue adding extra nodes and workflows to meta_wf

@dPys
Copy link
Author

dPys commented Feb 3, 2018

With a few other tweaks, that worked well (and added an additional layer of flexibility to the pipeline) Nice work @mgxd ! https://github.com/dpisner453/PyNets/blob/development/pynets/pynets_run.py#L422

One more thing -- Given this setup, the pipeline still has to run 'wf.run' twice; once in the workflow_selector function to start the meta-workflow, and once more on line #861, where the parent workflow is triggered (i.e. the beginning of all workflows). Now what would be the most preferable way to connect the meta-workflow to the parent workflow (i.e. here called 'wf')?

@dPys
Copy link
Author

dPys commented Jul 19, 2018

Hi @mgxd ,

Just an update that I have made a lot of progress towards tackling this problem, and I'm curious if you have any thoughts on how I might be able to resolve what seems like may be the last step...

Basically, all the iterfields that I was previously compiling manually, are now being aggregated using a JoinNode 'outputnode' that looks like this:

outputnode = pe.JoinNode(interface=niu.IdentityInterface(fields=['est_path', 'thr', 'network', 'conn_model', 'node_size']), name='outputnode', joinfield=['est_path', 'thr', 'network', 'conn_model', 'node_size'], joinsource='thresh_func_node')

The trouble is that due to some of these iterables living upstream in the workflow, I end up with multiple of these outputnodes (I intended for this happen and don't expect for this to change). So what I need to do next is create another special node in my meta-wf that collects the list outputs from each of these JoinNode 'outputnodes' but only after all of them have finished running, and then merge them into flat lists. Then I can connect the outputs of this new node to the parent workflow so that I only have to run wf.run() once. Right now, I am working around this issue by performing the following:

meta_wf = Workflow(name="%s%s" % ('Meta_wf_', str(ID)))
meta_wf.add_nodes([base_wf])
egg = meta_wf.run(plugin='MultiProc')
conn_model_iters = []
est_path_iters = []
network_iters = []
node_size_iters = []
thr_iters = []
out_nodes = [x for x in egg.nodes() if x.name == 'outputnode']
for out in range(len(out_nodes)):
    conn_model_iters.append(out_nodes[out].result.outputs.conn_model)
    est_path_iters.append(out_nodes[out].result.outputs.est_path)
    network_iters.append(out_nodes[out].result.outputs.network)
    node_size_iters.append(out_nodes[out].result.outputs.node_size)
    thr_iters.append(out_nodes[out].result.outputs.thr)
conn_model_iterlist = [item for sublist in conn_model_iters for item in sublist]
est_path_iterlist = [item for sublist in est_path_iters for item in sublist]
network_iterlist = [item for sublist in network_iters for item in sublist]
node_size_iterlist = [item for sublist in node_size_iters for item in sublist]
thr_iterlist = [item for sublist in thr_iters for item in sublist]
prune_iterlist = [prune] * len(est_path_iterlist)
ID_iterlist = [str(ID)] * len(est_path_iterlist)
mask_iterlist = [mask] * len(est_path_iterlist)

Does nipype have a type of Node that can replace the above? I tried using niu.merge instead, but the problem is that the workflow proceeds too early without waiting for all of the other outputnodes to populate first. To see this all in more detail, the meta-wf now lives here: https://github.com/dPys/PyNets/blob/master/pynets/pynets_run.py#L629 and the workflow being added as a node to the meta-wf lives here: https://github.com/dPys/PyNets/blob/master/pynets/workflows.py#L155

Curious to hear what you think!
derek;

@mgxd
Copy link
Member

mgxd commented Jul 23, 2018

@dPys hmm...I'm not sure how Merge can handle this functionality (maybe @effigies @satra might), but your workaround should work fine in this case, since it will be run once the workflow (and all downstream "outputnodes") have finished. However, you should be able to cut down some redundancy if you aggregate all outputs to a dictionary:

meta_wf_outputs = {}
outputs = [n.result.outputs.get() for n in egg.nodes() if n.name == 'outputnode']
for out in outputs:
    for k, v in out.items():
        if k in meta_wf_outputs:
            meta_wf_outputs[k].extend(v) # since you want to flatten across workflows
        else:
            meta_wf_outputs[k] = v

HTH,
Mathias

@dPys
Copy link
Author

dPys commented Jul 25, 2018

Nice work on cutting that down @mgxd . I doubt that Merge will be able to handle it either. It seems like we'd need another Join Node that can assemble the outputs of multiple join nodes, but one that also knows how to pause and wait for a predefined iterfield to populate before proceeding? Or does something like this already exist in nipype? @effigies @satra ? And if I am missing something obvious here, please let me know because I'd like to restructure accordingly.

@dPys dPys closed this as completed Aug 6, 2018
@dPys dPys reopened this Aug 6, 2018
@dPys
Copy link
Author

dPys commented Aug 6, 2018

Hi @satra,

Just checking in again in case this issue got stuck in limbo...

To rephrase: is it currently possible to create a JoinNode that returns its joinfield as a single list? If not, where should I edit/ begin a pull request?

Here's another example of what I'm trying to do (but doesn't presently work):

collect_pd_list_net_pickles = pe.JoinNode(niu.IdentityInterface(fields='out_file'), name='collect_pd_list_net_pickles', joinsource=export_to_pandas_node)

basically, I'd like to have the JoinNode assemble a list of all of the file paths received by the joinfield out_file (e.g. ['/Users/PSYC-dap3463/Downloads/002/coords_power_2011/002_est_sps_0.1_2.npy','/Users/PSYC-dap3463/Downloads/002/coords_power_2011/002_est_sps_0.09_2.npy','/Users/PSYC-dap3463/Downloads/002/coords_power_2011/002_est_sps_0.11_2.npy'] so that I can assign that list to a variable on a subsequent node.

-Derek

@satra
Copy link
Member

satra commented Aug 6, 2018

@dPys - IdentityInterfaces get removed before a workflow is run, so that's probably the key issue. can you try replacing the IdentityInterface with a Function interface? even if all the function does is a passthrough?

see the joinnode example here:

https://miykael.github.io/nipype_tutorial/notebooks/introduction_quickstart_non-neuroimaging.html

@dPys
Copy link
Author

dPys commented Aug 7, 2018

@satra

I don't know why it is that whenever I get to ask you a question, the answer is just something obvious that I'm missing! haha.

And the Node didn't even need to be a JoinNode after all, just a pass-through Function Node. The pynets workflow now relies on pure nipype for iters, with the only exception being lines 768-784 of pynets_run.py which actually motivated this issue originally...but it sounds like that will probably need to wait for nipype 2.0 :)

thanks for all the help,
derek;

@dPys
Copy link
Author

dPys commented Aug 9, 2018

@satra and @mgxd ,

May not have to wait for nipype 2.0 after all... this issue is now resolved. The solution was to, very carefully, create a 'meta-meta' workflow combined with the passthrough FunctionNode approach mentioned in my last post:
See lines 706-1047

Pynets is now a single nipype workflow. Thanks all for the help and for creating such an amazingly flexible piece of software.

derek;

@oesteban oesteban closed this as completed Aug 9, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants