diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index 6391b2ac5a..025037a66c 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -1127,7 +1127,8 @@ def _node_runner(self, nodes, updatehash=False): err = None try: node.run(updatehash=updatehash) - except Exception as err: + except Exception as this_err: + err = this_err # save reference if str2bool(self.config['execution']['stop_on_first_crash']): self._result = node.result raise diff --git a/nipype/pipeline/engine/tests/test_engine.py b/nipype/pipeline/engine/tests/test_engine.py index 3af9c9c564..13f643afaf 100644 --- a/nipype/pipeline/engine/tests/test_engine.py +++ b/nipype/pipeline/engine/tests/test_engine.py @@ -13,6 +13,7 @@ import os from shutil import rmtree from tempfile import mkdtemp +import logging import networkx as nx @@ -44,6 +45,29 @@ def _list_outputs(self): return outputs +class MockLoggingHandler(logging.Handler): + """Mock logging handler to check for expected logs. + + Credit: Gustavo @ http://stackoverflow.com/a/1049375/3903368 + """ + + def __init__(self, *args, **kwargs): + self.reset() + logging.Handler.__init__(self, *args, **kwargs) + + def emit(self, record): + self.messages[record.levelname.lower()].append(record.getMessage()) + + def reset(self): + self.messages = { + 'debug': [], + 'info': [], + 'warning': [], + 'error': [], + 'critical': [], + } + + def test_init(): yield assert_raises, Exception, pe.Workflow pipe = pe.Workflow(name='pipe') @@ -750,3 +774,75 @@ def func1(in1): os.chdir(cwd) rmtree(wd) + + + +def test_mapnode_handles_wrapped_crash(): + cwd = os.getcwd() + wd = mkdtemp() + os.chdir(wd) + from nipype import MapNode, Function, Workflow, logging + + mock_handler = MockLoggingHandler() + logging.loggers['workflow'].addHandler(mock_handler) + + def subworkflow_wrapper(arg): + import os + from nipype.pipeline.engine import Node, Workflow + from nipype.interfaces import Function + from nipype.utils.filemanip import loadpkl + + crash_program = """\ + def crash_program(arg): + raise Exception('Important message, {}'.format(arg)) + """ + sub_wf = Workflow(name='sub_wf') + sub_wf.base_dir = '.' + crash = Node(Function(function_str=crash_program, + input_names=['arg'], + output_names=['res']), + name='crash') + sub_wf.add_nodes([crash]) + + sub_wf.inputs.crash.arg = arg + + sub_wf.run() + + results = loadpkl( + os.path.join(sub_wf.name, crash.name, + 'result_{}.pklz'.format(crash.name))) + return results.inputs['_outputs']['res'] + + wf = Workflow(name='wf') + sub_wf_each = MapNode( + Function(function=subworkflow_wrapper, + input_names=['arg'], + output_names=['res']), + iterfield=['arg'], + name='sub_wf_each') + + wf.add_nodes((sub_wf_each, )) + wf.inputs.sub_wf_each.arg = ['Crash once', 'Crash twice'] + wf.config['execution'] = {'stop_on_first_crash': 'false', + 'local_hash_check': 'true', + 'crashdump_dir': wd, + 'poll_sleep_duration': 2} + + try: + wf.run() + except Exception as e: + error_string = str(e) + error_raised = True + + yield assert_true, error_raised, "Didn't raise an error" + yield assert_true, 'Workflow did not execute' in error_string, \ + 'Raised wrong type of error "%s"' % error_string + + # check for correct logging + for message in wf.inputs.sub_wf_each.arg: + yield assert_true, any(message in m + for m in mock_handler.messages['info']), \ + "Didn't log error correctly" + + os.chdir(cwd) + rmtree(wd)