Skip to content

[ENH] Restructure of pipeline engine #1308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Dec 31, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nipype/caching/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from ..interfaces.base import BaseInterface
from ..pipeline.engine import Node
from ..pipeline.utils import modify_paths
from ..pipeline.engine.utils import modify_paths

################################################################################
# PipeFunc object: callable interface to nipype.interface objects
Expand Down
2 changes: 1 addition & 1 deletion nipype/caching/tests/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from nose.tools import assert_equal

from .. import Memory
from ...pipeline.tests.test_engine import TestInterface
from ...pipeline.engine.tests.test_engine import TestInterface

from ... import config
config.set_default_config()
Expand Down
14 changes: 14 additions & 0 deletions nipype/pipeline/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
"""
Package contains modules for generating pipelines using interfaces

"""

from __future__ import absolute_import
__docformat__ = 'restructuredtext'
from .workflows import Workflow
from .nodes import Node, MapNode, JoinNode
from .utils import generate_expanded_graph
125 changes: 125 additions & 0 deletions nipype/pipeline/engine/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
"""Defines functionality for pipelined execution of interfaces

The `EngineBase` class implements the more general view of a task.

.. testsetup::
# Change directory to provide relative paths for doctests
import os
filepath = os.path.dirname(os.path.realpath( __file__ ))
datadir = os.path.realpath(os.path.join(filepath, '../../testing/data'))
os.chdir(datadir)

"""

from future import standard_library
standard_library.install_aliases()
from builtins import object

try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict

from copy import deepcopy
import re
import numpy as np
from ...interfaces.traits_extension import traits, Undefined
from ...interfaces.base import DynamicTraitedSpec
from ...utils.filemanip import loadpkl, savepkl

from ... import logging
logger = logging.getLogger('workflow')


class EngineBase(object):
"""Defines common attributes and functions for workflows and nodes."""

def __init__(self, name=None, base_dir=None):
""" Initialize base parameters of a workflow or node

Parameters
----------
name : string (mandatory)
Name of this node. Name must be alphanumeric and not contain any
special characters (e.g., '.', '@').
base_dir : string
base output directory (will be hashed before creations)
default=None, which results in the use of mkdtemp

"""
self.base_dir = base_dir
self.config = None
self._verify_name(name)
self.name = name
# for compatibility with node expansion using iterables
self._id = self.name
self._hierarchy = None

@property
def inputs(self):
raise NotImplementedError

@property
def outputs(self):
raise NotImplementedError

@property
def fullname(self):
fullname = self.name
if self._hierarchy:
fullname = self._hierarchy + '.' + self.name
return fullname

def clone(self, name):
"""Clone an EngineBase object

Parameters
----------

name : string (mandatory)
A clone of node or workflow must have a new name
"""
if (name is None) or (name == self.name):
raise Exception('Cloning requires a new name')
self._verify_name(name)
clone = deepcopy(self)
clone.name = name
clone._id = name
clone._hierarchy = None
return clone

def _check_outputs(self, parameter):
return hasattr(self.outputs, parameter)

def _check_inputs(self, parameter):
if isinstance(self.inputs, DynamicTraitedSpec):
return True
return hasattr(self.inputs, parameter)

def _verify_name(self, name):
valid_name = bool(re.match('^[\w-]+$', name))
if not valid_name:
raise ValueError('[Workflow|Node] name \'%s\' contains'
' special characters' % name)

def __repr__(self):
if self._hierarchy:
return '.'.join((self._hierarchy, self._id))
else:
return self._id

def save(self, filename=None):
if filename is None:
filename = 'temp.pklz'
savepkl(filename, self)

def load(self, filename):
if '.npz' in filename:
DeprecationWarning(('npz files will be deprecated in the next '
'release. you can use numpy to open them.'))
return np.load(filename)
return loadpkl(filename)
Loading