Skip to content

Use reference to err to avoid deletion in Py3.4 #1551

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion nipype/pipeline/engine/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,8 @@ def _node_runner(self, nodes, updatehash=False):
err = None
try:
node.run(updatehash=updatehash)
except Exception as err:
except Exception as this_err:
err = this_err # save reference
if str2bool(self.config['execution']['stop_on_first_crash']):
self._result = node.result
raise
Expand Down
96 changes: 96 additions & 0 deletions nipype/pipeline/engine/tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import os
from shutil import rmtree
from tempfile import mkdtemp
import logging

import networkx as nx

Expand Down Expand Up @@ -44,6 +45,29 @@ def _list_outputs(self):
return outputs


class MockLoggingHandler(logging.Handler):
"""Mock logging handler to check for expected logs.

Credit: Gustavo @ http://stackoverflow.com/a/1049375/3903368
"""

def __init__(self, *args, **kwargs):
self.reset()
logging.Handler.__init__(self, *args, **kwargs)

def emit(self, record):
self.messages[record.levelname.lower()].append(record.getMessage())

def reset(self):
self.messages = {
'debug': [],
'info': [],
'warning': [],
'error': [],
'critical': [],
}


def test_init():
yield assert_raises, Exception, pe.Workflow
pipe = pe.Workflow(name='pipe')
Expand Down Expand Up @@ -750,3 +774,75 @@ def func1(in1):

os.chdir(cwd)
rmtree(wd)



def test_mapnode_handles_wrapped_crash():
cwd = os.getcwd()
wd = mkdtemp()
os.chdir(wd)
from nipype import MapNode, Function, Workflow, logging

mock_handler = MockLoggingHandler()
logging.loggers['workflow'].addHandler(mock_handler)

def subworkflow_wrapper(arg):
import os
from nipype.pipeline.engine import Node, Workflow
from nipype.interfaces import Function
from nipype.utils.filemanip import loadpkl

crash_program = """\
def crash_program(arg):
raise Exception('Important message, {}'.format(arg))
"""
sub_wf = Workflow(name='sub_wf')
sub_wf.base_dir = '.'
crash = Node(Function(function_str=crash_program,
input_names=['arg'],
output_names=['res']),
name='crash')
sub_wf.add_nodes([crash])

sub_wf.inputs.crash.arg = arg

sub_wf.run()

results = loadpkl(
os.path.join(sub_wf.name, crash.name,
'result_{}.pklz'.format(crash.name)))
return results.inputs['_outputs']['res']

wf = Workflow(name='wf')
sub_wf_each = MapNode(
Function(function=subworkflow_wrapper,
input_names=['arg'],
output_names=['res']),
iterfield=['arg'],
name='sub_wf_each')

wf.add_nodes((sub_wf_each, ))
wf.inputs.sub_wf_each.arg = ['Crash once', 'Crash twice']
wf.config['execution'] = {'stop_on_first_crash': 'false',
'local_hash_check': 'true',
'crashdump_dir': wd,
'poll_sleep_duration': 2}

try:
wf.run()
except Exception as e:
error_string = str(e)
error_raised = True

yield assert_true, error_raised, "Didn't raise an error"
yield assert_true, 'Workflow did not execute' in error_string, \
'Raised wrong type of error "%s"' % error_string

# check for correct logging
for message in wf.inputs.sub_wf_each.arg:
yield assert_true, any(message in m
for m in mock_handler.messages['info']), \
"Didn't log error correctly"

os.chdir(cwd)
rmtree(wd)