diff --git a/CHANGES b/CHANGES index 7235b27f51..3571b5f1d3 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,7 @@ Next release ============ +* ENH: [Interfaced|Graft]Workflows (https://github.com/nipy/nipype/pull/882) * TST: Cache APT in CircleCI (https://github.com/nipy/nipype/pull/1333) * ENH: Add new flags to the BRAINSABC for new features (https://github.com/nipy/nipype/pull/1322) * ENH: Provides a Nipype wrapper for ANTs DenoiseImage (https://github.com/nipy/nipype/pull/1291) diff --git a/doc/users/graft_workflow.rst b/doc/users/graft_workflow.rst new file mode 100644 index 0000000000..ba844b20a3 --- /dev/null +++ b/doc/users/graft_workflow.rst @@ -0,0 +1,87 @@ +.. _graft_workflow: + +===================================================== +Interfaced workflows and GraftWorkflow (experimental) +===================================================== + +:class:`nipype.pipeline.engine.InterfacedWorkflow` provides automatic input/output +nodes generation, with some other utilities such as fast connection (avoiding +to specify the connecting fields). + +:class:`nipype.pipeline.engine.GraftWorkflow` is intended to create evaluation workflows, +where all the inputs are the same but several different methods are to be compared, stacking +the outputs in lists. + + +Interfaced workflows +-------------------- + +:class:`~nipype.pipeline.engine.InterfacedWorkflow` generates workflows with default +``inputnode`` and ``outputnode``. It also exposes the fields without the ``inputnode.`` and +``outputnode.`` prefix. + +Let's create a very simple workflow with a segmentation node. Please, notice the fundamental +differences with a standard :class:`~nipype.pipeline.engine.Workflow`: +1) No need for ``inputnode`` and ``outputnode``; 2) fast connection of fields. +:: + + import nipype.pipeline.engine as pe + from nipype.interfaces import fsl + segm0 = pe.Node(fsl.FAST(number_classes=3, probability_maps=True), + name='FSLFAST') + ifwf0 = pe.InterfacedWorkflow(name='testname0', input_names=['in_t1w'], + output_names=['out_tpm']) + ifwf0.connect([ + ('in', segm0, [('in_t1w', 'in_files')]), + (segm0, 'out', [('probability_maps', 'out_tpm')]) + ]) + + +We can connect an input to this workflow as usual +:: + + import nipype.interfaces.io as nio + ds = pe.Node(nio.DataGrabber(base_directory=os.getcwd(), template='t1.nii'), + name='DataSource') + mywf = pe.Workflow(name='FullWorkflow') + mywf.connect(ds, 't1', ifwf0, 'inputnode.in_t1w') + + +The InterfacedWorkflow is useful to create several segmentation alternatives that always take one input +named ``in_t1w`` and return one output named ``out_tpm``. Independently, +:class:`InterfacedWorkflows ` do not add much value +to the conventional :class:`Workflows `, but they are interesting as units inside +:class:`GraftWorkflows `. + + + +Workflows to run cross-comparisons of methods +--------------------------------------------- + +Say we want to compare segmentation algorithms: FAST from FSL, and Atropos from ANTS. +We want all the comparing methods to have the same names and number of inputs and outputs. + +We first create the :class:`~nipype.pipeline.engine.GraftWorkflow`, using a existing workflow +as reference. + +:: + + compare_wf = pe.GraftWorkflow(name='Comparison', fields_from=ifwf0) + +We create the alternate segmentation workflow:: + + from nipype.interfaces import ants + segm1 = pe.Node(ants.Atropos(dimension=3, number_of_tissue_classes=3), + name='Atropos') + ifwf1 = pe.InterfacedWorkflow(name='testname1', input_names=['in_t1w'], + output_names=['out_tpm']) + ifwf1.connect([ + ('in', segm1, [('in_t1w', 'intensity_images')]), + (segm1, 'out', [('posteriors', 'out_tpm')]) + ]) + +Finally, our workflows under comparison are inserted in the :class:`~nipype.pipeline.engine.GraftWorkflow` using +the ``insert()`` method:: + + compare_wf.insert([ifwf0, ifwf1]) + diff --git a/doc/users/index.rst b/doc/users/index.rst index 3a432135a6..7e74cd6b4c 100644 --- a/doc/users/index.rst +++ b/doc/users/index.rst @@ -35,6 +35,7 @@ joinnode_and_itersource model_specification saving_workflows + graft_workflow spmmcr mipav nipypecmd diff --git a/nipype/interfaces/tests/test_auto_CollateInterface.py b/nipype/interfaces/tests/test_auto_CollateInterface.py new file mode 100644 index 0000000000..f32cc23031 --- /dev/null +++ b/nipype/interfaces/tests/test_auto_CollateInterface.py @@ -0,0 +1,25 @@ +# AUTO-GENERATED by tools/checkspecs.py - DO NOT EDIT +from nipype.testing import assert_equal +from nipype.interfaces.utility import CollateInterface + +def test_CollateInterface_inputs(): + input_map = dict(_outputs=dict(usedefault=True, + ), + ignore_exception=dict(nohash=True, + usedefault=True, + ), + ) + inputs = CollateInterface.input_spec() + + for key, metadata in input_map.items(): + for metakey, value in metadata.items(): + yield assert_equal, getattr(inputs.traits()[key], metakey), value + +def test_CollateInterface_outputs(): + output_map = dict() + outputs = CollateInterface.output_spec() + + for key, metadata in output_map.items(): + for metakey, value in metadata.items(): + yield assert_equal, getattr(outputs.traits()[key], metakey), value + diff --git a/nipype/interfaces/tests/test_auto_MultipleSelectInterface.py b/nipype/interfaces/tests/test_auto_MultipleSelectInterface.py new file mode 100644 index 0000000000..0df8228ef4 --- /dev/null +++ b/nipype/interfaces/tests/test_auto_MultipleSelectInterface.py @@ -0,0 +1,22 @@ +# AUTO-GENERATED by tools/checkspecs.py - DO NOT EDIT +from nipype.testing import assert_equal +from nipype.interfaces.utility import MultipleSelectInterface + +def test_MultipleSelectInterface_inputs(): + input_map = dict(index=dict(mandatory=True, + ), + ) + inputs = MultipleSelectInterface.input_spec() + + for key, metadata in input_map.items(): + for metakey, value in metadata.items(): + yield assert_equal, getattr(inputs.traits()[key], metakey), value + +def test_MultipleSelectInterface_outputs(): + output_map = dict() + outputs = MultipleSelectInterface.output_spec() + + for key, metadata in output_map.items(): + for metakey, value in metadata.items(): + yield assert_equal, getattr(outputs.traits()[key], metakey), value + diff --git a/nipype/interfaces/utility.py b/nipype/interfaces/utility.py index 37883d4e5c..fe6c6885b5 100644 --- a/nipype/interfaces/utility.py +++ b/nipype/interfaces/utility.py @@ -488,6 +488,163 @@ def _run_interface(self, runtime): return runtime +class CollateInterfaceInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec): + _outputs = traits.Dict(traits.Any, value={}, usedefault=True) + + def __setattr__(self, key, value): + if key not in self.copyable_trait_names(): + if not isdefined(value): + super(CollateInterfaceInputSpec, self).__setattr__(key, value) + self._outputs[key] = value + else: + if key in self._outputs: + self._outputs[key] = value + super(CollateInterfaceInputSpec, self).__setattr__(key, value) + + +class CollateInterface(IOBase): + """ + A simple interface to multiplex inputs through a unique output node. + Channel is defined by the prefix of the fields. In order to avoid + inconsistencies, output fields should be defined forehand at initialization.. + + Example + ------- + + >>> from nipype.interfaces.utility import CollateInterface + >>> coll = CollateInterface(fields=['file','miscdata']) + >>> coll.inputs.src1_file = 'exfile1.csv' + >>> coll.inputs.src2_file = 'exfile2.csv' + >>> coll.inputs.src1_miscdata = 1.0 + >>> coll.inputs.src2_miscdata = 2.0 + >>> res = coll.run() + >>> print(res.outputs.file) + ['exfile1.csv', 'exfile2.csv'] + >>> print(res.outputs.miscdata) + [1.0, 2.0] + """ + input_spec = CollateInterfaceInputSpec + output_spec = DynamicTraitedSpec + + def __init__(self, fields=None, fill_missing=False, **kwargs): + super(CollateInterface, self).__init__(**kwargs) + + if fields is None or not fields: + raise ValueError('CollateInterface fields must be a non-empty list') + # Each input must be in the fields. + self._fields = fields + self._fill_missing = fill_missing + + def _add_output_traits(self, base): + undefined_traits = {} + for key in self._fields: + base.add_trait(key, traits.Any) + undefined_traits[key] = Undefined + base.trait_set(trait_change_notify=False, **undefined_traits) + return base + + def _list_outputs(self): + #manual mandatory inputs check + valuedict = dict( (key, {}) for key in self._fields) + nodekeys = [] + + for inputkey, inputval in self.inputs._outputs.items(): + for key in self._fields: + if inputkey.endswith(key): + nodekey = inputkey[::-1].replace(key[::-1], '', 1)[::-1] + nodekeys.append(nodekey) + + if nodekey in valuedict[key].keys(): + msg = ('Trying to add field from existing node') + raise ValueError(msg) + valuedict[key][nodekey] = inputval + + nodekeys = sorted(set(nodekeys)) + outputs = self._outputs().get() + for key in self._fields: + outputs[key] = [] + for nk in nodekeys: + + if nk in valuedict[key]: + val = valuedict[key][nk] + else: + if self._fill_missing: + val = None + else: + raise RuntimeError('Input missing for field to collate.') + outputs[key].append(val) + + return outputs + + +class MultipleSelectInputSpec(DynamicTraitedSpec): + index = InputMultiPath(traits.Int, mandatory=True, + desc='0-based indices of values to choose') + + +class MultipleSelectInterface(IOBase): + """ + Basic interface that demultiplexes lists generated by CollateInterface + + Example + ------- + + >>> from nipype.interfaces.utility import MultipleSelectInterface + >>> demux = MultipleSelectInterface(fields=['file','miscdata'], index=0) + >>> demux.inputs.file = ['exfile1.csv', 'exfile2.csv'] + >>> demux.inputs.miscdata = [1.0, 2.0] + >>> res = demux.run() + >>> print(res.outputs.file) + exfile1.csv + >>> print(res.outputs.miscdata) + 1.0 + """ + input_spec = MultipleSelectInputSpec + output_spec = DynamicTraitedSpec + + def __init__(self, fields=None, mandatory_inputs=True, **inputs): + super(MultipleSelectInterface, self).__init__(**inputs) + if fields is None or not fields: + raise ValueError('Identity Interface fields must be a non-empty list') + # Each input must be in the fields. + for in_field in inputs: + if in_field not in fields and in_field != 'index': + raise ValueError('Identity Interface input is not in the fields: %s' % in_field) + self._fields = fields + self._mandatory_inputs = mandatory_inputs + add_traits(self.inputs, fields) + # Adding any traits wipes out all input values set in superclass initialization, + # even it the trait is not in the add_traits argument. The work-around is to reset + # the values after adding the traits. + self.inputs.set(**inputs) + + def _add_output_traits(self, base): + undefined_traits = {} + for key in self._fields: + base.add_trait(key, traits.Any) + undefined_traits[key] = Undefined + base.trait_set(trait_change_notify=False, **undefined_traits) + return base + + def _list_outputs(self): + #manual mandatory inputs check + if self._fields and self._mandatory_inputs: + for key in self._fields: + value = getattr(self.inputs, key) + if not isdefined(value): + msg = "%s requires a value for input '%s' because it was listed in 'fields'. \ + You can turn off mandatory inputs checking by passing mandatory_inputs = False to the constructor." % \ + (self.__class__.__name__, key) + raise ValueError(msg) + + outputs = self._outputs().get() + for key in self._fields: + val = getattr(self.inputs, key) + if isdefined(val): + outputs[key] = np.squeeze(np.array(val)[np.array(self.inputs.index)]).tolist() + return outputs + + class CSVReaderInputSpec(DynamicTraitedSpec, TraitedSpec): in_file = File(exists=True, mandatory=True, desc='Input comma-seperated value (CSV) file') header = traits.Bool(False, usedefault=True, desc='True if the first line is a column header') @@ -566,3 +723,4 @@ def _list_outputs(self): entry = self._parse_line(line) outputs = self._append_entry(outputs, entry) return outputs + diff --git a/nipype/pipeline/__init__.py b/nipype/pipeline/__init__.py index b7a6afe20e..802052ab36 100644 --- a/nipype/pipeline/__init__.py +++ b/nipype/pipeline/__init__.py @@ -7,4 +7,5 @@ from __future__ import absolute_import __docformat__ = 'restructuredtext' -from .engine import Node, MapNode, JoinNode, Workflow +from .engine import (Node, MapNode, JoinNode, Workflow, + InterfacedWorkflow, GraftWorkflow) diff --git a/nipype/pipeline/engine/__init__.py b/nipype/pipeline/engine/__init__.py index e950086307..5d46fe3570 100644 --- a/nipype/pipeline/engine/__init__.py +++ b/nipype/pipeline/engine/__init__.py @@ -9,6 +9,6 @@ from __future__ import absolute_import __docformat__ = 'restructuredtext' -from .workflows import Workflow +from .workflows import Workflow, InterfacedWorkflow, GraftWorkflow from .nodes import Node, MapNode, JoinNode from .utils import generate_expanded_graph diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index 9f9165e3b2..991b2da7b2 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -231,9 +231,9 @@ def output_dir(self): def set_input(self, parameter, val): """ Set interface input value""" - logger.debug('setting nodelevel(%s) input %s = %s' % (str(self), - parameter, - str(val))) + logger.debug('Node: setting nodelevel(%s) input %s = %s' % (str(self), + parameter, + str(val))) setattr(self.inputs, parameter, deepcopy(val)) def get_output(self, parameter): @@ -290,7 +290,7 @@ def run(self, updatehash=False): else: self.config = merge_dict(deepcopy(config._sections), self.config) if not self._got_inputs: - self._get_inputs() + self._get_all_inputs() self._got_inputs = True outdir = self.output_dir() logger.info("Executing node %s in dir: %s" % (self._id, outdir)) @@ -424,7 +424,7 @@ def _parameterization_dir(self, param): def _get_hashval(self): """Return a hash of the input state""" if not self._got_inputs: - self._get_inputs() + self._get_all_inputs() self._got_inputs = True hashed_inputs, hashvalue = self.inputs.get_hashval( hash_method=self.config['execution']['hash_method']) @@ -457,14 +457,17 @@ def _save_hashfile(self, hashfile, hashed_inputs): hashfile) def _get_inputs(self): + return self._get_all_inputs() + + def _get_all_inputs(self): """Retrieve inputs from pointers to results file This mechanism can be easily extended/replaced to retrieve data from other data sources (e.g., XNAT, HTTP, etc.,.) """ - logger.debug('Setting node inputs') - for key, info in list(self.input_source.items()): - logger.debug('input: %s' % key) + logger.debug('Setting node %s inputs' % self.name) + for key, info in self.input_source.items(): + logger.debug('input: %s, info: %s' % (key, str(info))) results_file = info[0] logger.debug('results file: %s' % results_file) results = loadpkl(results_file) @@ -982,6 +985,7 @@ def _slot_value(self, field, index): % (self, slot_field, field, index, e)) + class MapNode(Node): """Wraps interface objects that need to be iterated on a list of inputs. @@ -1014,9 +1018,9 @@ def __init__(self, interface, iterfield, name, serial=False, nested=False, **kwa node specific name serial : boolean flag to enforce executing the jobs of the mapnode in a serial manner rather than parallel - nested : boolea + nested : boolean support for nested lists, if set the input list will be flattened before running, and the - nested list structure of the outputs will be resored + nested list structure of the outputs will be resorted See Node docstring for additional keyword arguments. """ @@ -1058,9 +1062,9 @@ def set_input(self, parameter, val): Priority goes to interface. """ - logger.debug('setting nodelevel(%s) input %s = %s' % (str(self), - parameter, - str(val))) + logger.debug('MapNode: setting nodelevel(%s) input %s = %s' % (str(self), + parameter, + str(val))) self._set_mapnode_input(self.inputs, parameter, deepcopy(val)) def _set_mapnode_input(self, object, name, newvalue): @@ -1075,7 +1079,7 @@ def _set_mapnode_input(self, object, name, newvalue): def _get_hashval(self): """ Compute hash including iterfield lists.""" if not self._got_inputs: - self._get_inputs() + self._get_all_inputs() self._got_inputs = True self._check_iterfield() hashinputs = deepcopy(self._interface.inputs) @@ -1229,7 +1233,7 @@ def write_report(self, report_type=None, cwd=None): def get_subnodes(self): if not self._got_inputs: - self._get_inputs() + self._get_all_inputs() self._got_inputs = True self._check_iterfield() self.write_report(report_type='preexec', cwd=self.output_dir()) @@ -1237,7 +1241,7 @@ def get_subnodes(self): def num_subnodes(self): if not self._got_inputs: - self._get_inputs() + self._get_all_inputs() self._got_inputs = True self._check_iterfield() if self._serial: @@ -1249,11 +1253,14 @@ def num_subnodes(self): return len(filename_to_list(getattr(self.inputs, self.iterfield[0]))) def _get_inputs(self): + return self._get_all_inputs() + + def _get_all_inputs(self): old_inputs = self._inputs.get() self._inputs = self._create_dynamic_traits(self._interface.inputs, fields=self.iterfield) self._inputs.set(**old_inputs) - super(MapNode, self)._get_inputs() + super(MapNode, self)._get_all_inputs() def _check_iterfield(self): """Checks iterfield diff --git a/nipype/pipeline/engine/tests/test_graft.py b/nipype/pipeline/engine/tests/test_graft.py new file mode 100644 index 0000000000..d2e7aa6821 --- /dev/null +++ b/nipype/pipeline/engine/tests/test_graft.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: + +from copy import deepcopy +import os.path as op +from tempfile import mkdtemp +from shutil import rmtree + +from ....testing import (assert_raises, assert_equal, + assert_true, assert_false) +from ... import engine as pe +from ....interfaces import base as nib +from ....interfaces import utility as niu + + +ifresult = None + + +class SetInputSpec(nib.TraitedSpec): + val = nib.traits.Any(mandatory=True, desc='input') + + +class SetOutputSpec(nib.TraitedSpec): + out = nib.traits.Any(desc='ouput') + + +class SetInterface(nib.BaseInterface): + input_spec = SetInputSpec + output_spec = SetOutputSpec + _always_run = True + + def _run_interface(self, runtime): + global ifresult + runtime.returncode = 0 + ifresult = self.inputs.val + return runtime + + def _list_outputs(self): + global ifresult + outputs = self._outputs().get() + outputs['out'] = self.inputs.val + return outputs + + +def _base_workflow(name='InterfacedWorkflow'): + wf = pe.InterfacedWorkflow( + name=name, input_names=['input0'], output_names=['output0']) + + mynode = pe.Node(SetInterface(), name='internalnode') + wf.connect('in', 'input0', mynode, 'val') + wf.connect(mynode, 'out', 'out', 'output0') + return wf + + +def _sum_workflow(name='InterfacedSumWorkflow', b=0): + name += '%02d' % b + + def _sum(a, b): + return a + b + 1 + + wf = pe.InterfacedWorkflow( + name=name, input_names=['input0'], + output_names=['output0']) + sum0 = pe.Node(niu.Function( + input_names=['a', 'b'], output_names=['out'], function=_sum), + name='testnode') + sum0.inputs.b = b + + # test connections + wf.connect('in', 'input0', sum0, 'a') + wf.connect(sum0, 'out', 'out', 'output0') + return wf + + +def test_interfaced_workflow(): + global ifresult + + x = lambda: pe.InterfacedWorkflow(name='ShouldRaise') + yield assert_raises, ValueError, x + x = lambda: pe.InterfacedWorkflow(name='ShouldRaise', + input_names=['input0']) + yield assert_raises, ValueError, x + x = lambda: pe.InterfacedWorkflow(name='ShouldRaise', + output_names=['output0']) + yield assert_raises, ValueError, x + + wf = pe.InterfacedWorkflow( + name='InterfacedWorkflow', input_names=['input0'], + output_names=['output0']) + + # Check it doesn't expose inputs/outputs of internal nodes + inputs = wf.inputs.get() + yield assert_equal, inputs, {'input0': nib.Undefined} + + outputs = wf.outputs.get() + yield assert_equal, outputs, {'output0': None} + + # test connections + mynode = pe.Node(SetInterface(), name='internalnode') + wf.connect('in', 'input0', mynode, 'val') + wf.connect(mynode, 'out', 'out', 'output0') + + # test setting input + wf.inputs.input0 = 5 + yield assert_equal, wf.inputs.get(), {'input0': 5} + + wf.run() + yield assert_equal, ifresult, 5 + + # Try to create an outbound connection from an inner node + wf = _base_workflow() + outerwf = pe.Workflow('OuterWorkflow') + outernode = pe.Node(niu.IdentityInterface(fields=['val']), + name='outernode') + x = lambda: outerwf.connect(wf, 'internalnode.out', outernode, 'val') + yield assert_raises, Exception, x + + # Try to create an inbound connection from an outer node + wf = _base_workflow() + outerwf = pe.Workflow('OuterWorkflow') + outernode = pe.Node(niu.IdentityInterface(fields=['val']), + name='outernode') + x = lambda: outerwf.connect(outernode, 'val', wf, 'internalnode.val') + yield assert_raises, Exception, x + + # Try to insert a sub-workflow with an outbound connection + outerwf = pe.Workflow('OuterWorkflow') + outernode = pe.Node(niu.IdentityInterface(fields=['val']), + name='outernode') + + subwf = pe.Workflow('SubWorkflow') + inputnode = pe.Node(niu.IdentityInterface(fields=['in']), name='inputnode') + outputnode = pe.Node(niu.IdentityInterface(fields=['out']), + name='outputnode') + subnode = pe.Node(SetInterface(), name='internalnode') + subwf.connect([ + (inputnode, subnode, [('in', 'val')]), + (subnode, outputnode, [('out', 'out')]), + ]) + + outerwf.connect(subwf, 'internalnode.out', outernode, 'val') + + wf = pe.InterfacedWorkflow( + name='InterfacedWorkflow', input_names=['input0'], + output_names=['output0']) + x = lambda: wf.connect('in', 'input0', subwf, 'inputnode.in') + yield assert_raises, Exception, x + + # Try to insert a sub-workflow with an inbound connection + outerwf = pe.Workflow('OuterWorkflow') + outernode = pe.Node(niu.IdentityInterface(fields=['val']), + name='outernode') + + subwf = pe.Workflow('SubWorkflow') + inputnode = pe.Node(niu.IdentityInterface(fields=['in']), name='inputnode') + outputnode = pe.Node(niu.IdentityInterface(fields=['out']), + name='outputnode') + subnode = pe.Node(SetInterface(), name='internalnode') + subwf.connect([ + (subnode, outputnode, [('out', 'out')]), + ]) + + outerwf.connect(outernode, 'val', subwf, 'internalnode.val') + + wf = pe.InterfacedWorkflow( + name='InterfacedWorkflow', input_names=['input0'], + output_names=['output0']) + x = lambda: wf.connect('in', 'input0', subwf, 'inputnode.in') + yield assert_raises, Exception, x + +def test_graft_workflow(): + global ifresult + wf1 = _sum_workflow() + wf = pe.GraftWorkflow( + name='GraftWorkflow', fields_from=wf1) + wf.insert(wf1) + wf.insert(_sum_workflow(b=2)) + + outer = pe.Workflow('OuterWorkflow') + mynode = pe.Node(SetInterface(), name='internalnode') + + outer.connect([ + (wf, mynode, [('outputnode.output0', 'val')]) + ]) + + wf.inputs.input0 = 3 + + ifresult = None + outer.run() + yield assert_equal, ifresult, [4, 6] diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index b14d73a307..277ce1c26b 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -209,10 +209,11 @@ def connect(self, *args, **kwargs): # determine their inputs/outputs depending on # connection settings. Skip these modules in the check if dest in connected_ports[destnode]: - raise Exception(""" -Trying to connect %s:%s to %s:%s but input '%s' of node '%s' is already -connected. -""" % (srcnode, source, destnode, dest, dest, destnode)) + raise Exception( + 'Trying to connect %s:%s to %s:%s but input \'%s\' of ' + 'node \'%s\' is already connected.' % + (srcnode, source, destnode, dest, dest, destnode)) + if not (hasattr(destnode, '_interface') and '.io' in str(destnode._interface.__class__)): if not destnode._check_inputs(dest): @@ -714,9 +715,9 @@ def _has_attr(self, parameter, subtype='in'): """Checks if a parameter is available as an input or output """ if subtype == 'in': - subobject = self.inputs + subobject = self._get_all_inputs() else: - subobject = self.outputs + subobject = self._get_all_outputs() attrlist = parameter.split('.') cur_out = subobject for attr in attrlist: @@ -730,9 +731,9 @@ def _get_parameter_node(self, parameter, subtype='in'): output parameter """ if subtype == 'in': - subobject = self.inputs + subobject = self._get_all_inputs() else: - subobject = self.outputs + subobject = self._get_all_outputs() attrlist = parameter.split('.') cur_out = subobject for attr in attrlist[:-1]: @@ -746,8 +747,10 @@ def _check_inputs(self, parameter): return self._has_attr(parameter, subtype='in') def _get_inputs(self): - """Returns the inputs of a workflow + return self._get_all_inputs() + def _get_all_inputs(self): + """Returns the inputs of a workflow This function does not return any input ports that are already connected """ @@ -755,7 +758,7 @@ def _get_inputs(self): for node in self._graph.nodes(): inputdict.add_trait(node.name, traits.Instance(TraitedSpec)) if isinstance(node, Workflow): - setattr(inputdict, node.name, node.inputs) + setattr(inputdict, node.name, node._get_all_inputs()) else: taken_inputs = [] for _, _, d in self._graph.in_edges_iter(nbunch=node, @@ -763,7 +766,7 @@ def _get_inputs(self): for cd in d['connect']: taken_inputs.append(cd[1]) unconnectedinputs = TraitedSpec() - for key, trait in list(node.inputs.items()): + for key, trait in node.inputs.items(): if key not in taken_inputs: unconnectedinputs.add_trait(key, traits.Trait(trait, @@ -775,16 +778,19 @@ def _get_inputs(self): return inputdict def _get_outputs(self): + return self._get_all_outputs() + + def _get_all_outputs(self): """Returns all possible output ports that are not already connected """ outputdict = TraitedSpec() for node in self._graph.nodes(): outputdict.add_trait(node.name, traits.Instance(TraitedSpec)) if isinstance(node, Workflow): - setattr(outputdict, node.name, node.outputs) + setattr(outputdict, node.name, node._get_all_outputs()) elif node.outputs: outputs = TraitedSpec() - for key, _ in list(node.outputs.items()): + for key, _ in node.outputs.items(): outputs.add_trait(key, traits.Any(node=node)) setattr(outputs, key, None) setattr(outputdict, node.name, outputs) @@ -1003,3 +1009,264 @@ def _get_dot(self, prefix=None, hierarchy=None, colored=False, vname1.replace('.', '_'))) logger.debug('cross connection: ' + dotlist[-1]) return ('\n' + prefix).join(dotlist) + + +class InterfacedWorkflow(Workflow): + """ + A workflow that automatically generates the input and output nodes to avoid + repetitive inputnode.* and outputnode.* connections and allow for agile + pipeline chaining. + """ + + @property + def input_names(self): + return self._input_names + + @property + def output_names(self): + return self._output_names + + def __init__(self, name, base_dir=None, input_names=[], output_names=[]): + """Create a workflow object. + + Parameters + ---------- + name : alphanumeric string + unique identifier for the workflow + base_dir : string, optional + path to workflow storage + + """ + from nipype.interfaces.utility import IdentityInterface + super(InterfacedWorkflow, self).__init__(name, base_dir) + + if isinstance(input_names, str): + input_names = [input_names] + + if input_names is None or not input_names: + raise ValueError(('InterfacedWorkflow input_names must be a ' + 'non-empty list')) + + if isinstance(output_names, str): + output_names = [output_names] + + if output_names is None or not output_names: + raise ValueError(('InterfacedWorkflow output_names must be a ' + 'non-empty list')) + + self._input_names = input_names + self._output_names = output_names + + self._inputnode = Node(IdentityInterface(fields=input_names), + name='inputnode') + self._outputnode = Node(IdentityInterface(fields=output_names), + name='outputnode') + self.add_nodes([self._inputnode, self._outputnode]) + + def connect(self, *args, **kwargs): + """ + Extends the connect method to accept void nodes for inputs and outputs. + + Parameters + ---------- + + args : same as superclass + + """ + if len(args) == 1: + conns = args[0] + elif len(args) == 4: + conns = [(args[0], args[2], [(args[1], args[3])])] + else: + raise TypeError('connect() takes either 4 arguments, or 1 list of' + ' connection tuples (%d args given)' % len(args)) + + disconnect = False + if kwargs: + disconnect = kwargs.get('disconnect', False) + + if disconnect: + self.disconnect(conns) + return + + connection_list = [] + for conn in conns: + srcpref = '' + dstpref = '' + + if (isinstance(conn[0], str) and conn[0][0:2] == 'in'): + srcnode = self._inputnode + else: + srcnode = conn[0] + + if (isinstance(conn[1], str) and conn[1][0:3] == 'out'): + dstnode = self._outputnode + else: + dstnode = conn[1] + + if isinstance(srcnode, InterfacedWorkflow): + srcpref = 'outputnode.' + + if isinstance(dstnode, InterfacedWorkflow): + dstpref = 'inputnode.' + + if len(conn) == 2: + srcnames = srcnode.outputs.copyable_trait_names() + dstnames = dstnode.inputs.copyable_trait_names() + for n in srcnames: + if n not in dstnames: + raise RuntimeError(('Interface missmatch between ' + 'workflows, %s port is not ' + 'present in destination node') % n) + ports = [(srcpref+k, dstpref+k) for k in srcnames] + else: + ports = [] + for c in conn[2]: + srcport = c[0] + dstport = c[1] + if len(srcpref) > 0 and not '.' in srcport: + srcport = srcpref + srcport + + if len(dstpref) > 0 and not '.' in dstport: + dstport = dstpref + dstport + + ports.append((srcport, dstport)) + + connection_list.append((srcnode, dstnode, ports)) + + super(InterfacedWorkflow, self).connect(connection_list, **kwargs) + + def _get_inputs(self): + """ + Returns the inputs of a workflow + + This function does not return any input ports that are already + connected, nor internal input ports + """ + inputdict = TraitedSpec() + node = self._inputnode + + taken_inputs = [] + for _, _, d in self._graph.in_edges_iter(nbunch=[node], + data=True): + for cd in d['connect']: + taken_inputs.append(cd[1]) + + for key, trait in node.inputs.items(): + if key not in taken_inputs: + inputdict.add_trait(key, traits.Trait(trait, node=node)) + value = getattr(node.inputs, key) + setattr(inputdict, key, value) + + inputdict.on_trait_change(self._set_input) + return inputdict + + def _get_outputs(self): + """ + Returns all possible output ports of the output node that are not + already connected + """ + outputdict = TraitedSpec() + node = self._outputnode + + if node.outputs: + for key, _ in node.outputs.items(): + outputdict.add_trait(key, traits.Any(node=node)) + setattr(outputdict, key, None) + return outputdict + + +class GraftWorkflow(InterfacedWorkflow): + """ + A workflow ready to insert several internal workflows that share + i/o interfaces, and are run on the same input data. + This workflow produces as many outputs as inserted subworkflows, and + an outputnode with all the outputs merged. + + Example + ------- + >>> import nipype.pipeline.engine as npe + >>> import nipype.interfaces.utility as niu + >>> from nipype.interfaces.fsl import Threshold + >>> from nipype.interfaces.utility import IdentityInterface + >>> wf1 = npe.InterfacedWorkflow(name='testname1', input_names=['in_file', \ + 'thresh'], output_names=['out_file']) + >>> node = npe.Node(Threshold(), name='internalnode') + >>> wf1.connect([ ('in', node), (node, 'out') ]) + >>> wf2 = wf1.clone(name='testname2') + >>> wf = npe.GraftWorkflow(name='graft', fields_from=wf1) + >>> wf.insert(wf1) + 0 + >>> wf.insert(wf2) + 1 + >>> wf.inputs.in_file = 'structural.nii' + >>> wf.inputs.thresh = 1.0 + >>> wf.run() # doctest: +SKIP + """ + + def __init__(self, name, base_dir=None, fields_from=None, + input_names=[], output_names=[]): + """ + Initializes the workflow from an existing InterfacedWorkflow + """ + self._children = dict() + self._cids = dict() + self._outnodes = dict() + from nipype.interfaces.utility import IdentityInterface, CollateInterface + fields_undefined = ((input_names is None) or (output_names is None)) + wf_undefined = (fields_from is None) + if wf_undefined and fields_undefined: + raise ValueError(('An existing InterfacedWorkflow or the in/output' + ' names are required to initialize the ' + 'GraftWorkflow')) + if not wf_undefined: + if not isinstance(fields_from, InterfacedWorkflow): + raise TypeError('Workflow is not an InterfacedWorkflow.') + input_names = fields_from.input_names + output_names = fields_from.output_names + if (((input_names is None) or (not input_names)) and + ((output_names is None) or (not output_names))): + raise ValueError(('A GraftWorkflow cannot be initialized without ' + 'specifying either a fields_from workflow or i/o' + ' names lists')) + + super(GraftWorkflow, self).__init__(name=name, base_dir=base_dir, + input_names=input_names, + output_names=output_names) + self._outputnode = Node(CollateInterface(fields=output_names), name='outputnode') + + def get_cid(self, name): + return self._cids[name] + + def insert(self, workflow): + """ + Inserts an InterfacedWorkflow into the workflow + """ + from nipype.interfaces.utility import IdentityInterface + + if not isinstance(workflow, InterfacedWorkflow): + raise TypeError(('Only InterfacedWorkflows can be inserted in ' + 'a GraftWorkflow.')) + + ckey = workflow.name + cid = len(self._children) + + if ckey in self._children.keys(): + raise RuntimeError(('Trying to add an existing workflow to ' + 'GraftWorkflow')) + + childname = 'out%02d' % cid + self._children[ckey] = workflow + self._cids[ckey] = cid + self._outnodes[ckey] = Node(IdentityInterface(fields=self.output_names), + name=childname) + + # Check that interfaces are satisfied + if ((workflow.input_names != self.input_names) or + (workflow.output_names != self.output_names)): + raise RuntimeError('Workflow does not meet the general interface') + + self.connect([('in', workflow), (workflow, self._outnodes[ckey]), + (self._outnodes[ckey], 'out', + [(key, '%s_%s' % (childname, key)) for key in self.output_names])]) + return cid