From 8b3c8df5cdcaab4b9bb4bf1c77ca35b8a13f2119 Mon Sep 17 00:00:00 2001 From: ashgillman Date: Wed, 27 Jul 2016 12:39:48 +1000 Subject: [PATCH 1/2] Use reference to err to avoid deletion in Py3.4 --- nipype/pipeline/engine/nodes.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index 9442db1b19..6d0cc74d73 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -1154,7 +1154,8 @@ def _node_runner(self, nodes, updatehash=False): err = None try: node.run(updatehash=updatehash) - except Exception as err: + except Exception as this_err: + err = this_err # save reference if str2bool(self.config['execution']['stop_on_first_crash']): self._result = node.result raise From d794f022fc5a89a7b05e53ee9984b115f958f205 Mon Sep 17 00:00:00 2001 From: ashgillman Date: Mon, 24 Oct 2016 17:40:14 +1000 Subject: [PATCH 2/2] Add test case demonstrating py3 failure on wrapped workflow mapnode --- nipype/pipeline/engine/tests/test_engine.py | 96 +++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/nipype/pipeline/engine/tests/test_engine.py b/nipype/pipeline/engine/tests/test_engine.py index 5eaaa81fbf..daa74ab7a9 100644 --- a/nipype/pipeline/engine/tests/test_engine.py +++ b/nipype/pipeline/engine/tests/test_engine.py @@ -9,6 +9,7 @@ import os from shutil import rmtree from tempfile import mkdtemp +import logging import networkx as nx @@ -40,6 +41,29 @@ def _list_outputs(self): return outputs +class MockLoggingHandler(logging.Handler): + """Mock logging handler to check for expected logs. + + Credit: Gustavo @ http://stackoverflow.com/a/1049375/3903368 + """ + + def __init__(self, *args, **kwargs): + self.reset() + logging.Handler.__init__(self, *args, **kwargs) + + def emit(self, record): + self.messages[record.levelname.lower()].append(record.getMessage()) + + def reset(self): + self.messages = { + 'debug': [], + 'info': [], + 'warning': [], + 'error': [], + 'critical': [], + } + + def test_init(): yield assert_raises, Exception, pe.Workflow pipe = pe.Workflow(name='pipe') @@ -746,3 +770,75 @@ def func1(in1): os.chdir(cwd) rmtree(wd) + + + +def test_mapnode_handles_wrapped_crash(): + cwd = os.getcwd() + wd = mkdtemp() + os.chdir(wd) + from nipype import MapNode, Function, Workflow, logging + + mock_handler = MockLoggingHandler() + logging.loggers['workflow'].addHandler(mock_handler) + + def subworkflow_wrapper(arg): + import os + from nipype.pipeline.engine import Node, Workflow + from nipype.interfaces import Function + from nipype.utils.filemanip import loadpkl + + crash_program = """\ + def crash_program(arg): + raise Exception('Important message, {}'.format(arg)) + """ + sub_wf = Workflow(name='sub_wf') + sub_wf.base_dir = '.' + crash = Node(Function(function_str=crash_program, + input_names=['arg'], + output_names=['res']), + name='crash') + sub_wf.add_nodes([crash]) + + sub_wf.inputs.crash.arg = arg + + sub_wf.run() + + results = loadpkl( + os.path.join(sub_wf.name, crash.name, + 'result_{}.pklz'.format(crash.name))) + return results.inputs['_outputs']['res'] + + wf = Workflow(name='wf') + sub_wf_each = MapNode( + Function(function=subworkflow_wrapper, + input_names=['arg'], + output_names=['res']), + iterfield=['arg'], + name='sub_wf_each') + + wf.add_nodes((sub_wf_each, )) + wf.inputs.sub_wf_each.arg = ['Crash once', 'Crash twice'] + wf.config['execution'] = {'stop_on_first_crash': 'false', + 'local_hash_check': 'true', + 'crashdump_dir': wd, + 'poll_sleep_duration': 2} + + try: + wf.run() + except Exception as e: + error_string = str(e) + error_raised = True + + yield assert_true, error_raised, "Didn't raise an error" + yield assert_true, 'Workflow did not execute' in error_string, \ + 'Raised wrong type of error "%s"' % error_string + + # check for correct logging + for message in wf.inputs.sub_wf_each.arg: + yield assert_true, any(message in m + for m in mock_handler.messages['info']), \ + "Didn't log error correctly" + + os.chdir(cwd) + rmtree(wd)