Skip to content

Commit 3bfb3d7

Browse files
committed
enh: added timeout options to sge to be similar to pbs
1 parent 40ec114 commit 3bfb3d7

File tree

2 files changed

+31
-14
lines changed

2 files changed

+31
-14
lines changed

nipype/pipeline/plugins/pbs.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ def __init__(self, **kwargs):
3131
self._retry_timeout = kwargs['plugin_args']['retry_timeout']
3232
if 'max_tries' in kwargs['plugin_args']:
3333
self._max_tries = kwargs['plugin_args']['max_tries']
34-
3534
super(PBSPlugin, self).__init__(template, **kwargs)
3635

3736
def _is_pending(self, taskid):

nipype/pipeline/plugins/sge.py

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,17 @@ class SGEPlugin(SGELikeBatchManagerBase):
2020
"""
2121

2222
def __init__(self, **kwargs):
23-
template="""#$ -V\n#$ -S /bin/sh\n"""
23+
template="""
24+
#$ -V
25+
#$ -S /bin/sh
26+
"""
27+
self._retry_timeout = 2
28+
self._max_tries = 2
29+
if 'plugin_args' in kwargs and kwargs['plugin_args']:
30+
if 'retry_timeout' in kwargs['plugin_args']:
31+
self._retry_timeout = kwargs['plugin_args']['retry_timeout']
32+
if 'max_tries' in kwargs['plugin_args']:
33+
self._max_tries = kwargs['plugin_args']['max_tries']
2434
super(SGEPlugin, self).__init__(template, **kwargs)
2535

2636
def _is_pending(self, taskid):
@@ -55,16 +65,24 @@ def _submit_batchtask(self, scriptfile, node):
5565
scriptfile)
5666
oldlevel = iflogger.level
5767
iflogger.setLevel(logging.getLevelName('CRITICAL'))
58-
try:
59-
result = cmd.run()
60-
except Exception, e:
61-
iflogger.setLevel(oldlevel)
62-
raise RuntimeError('\n'.join(('Could not submit sge task for node %s'%node._id,
63-
str(e))))
64-
else:
65-
iflogger.setLevel(oldlevel)
66-
# retrieve sge taskid
67-
taskid = int(result.runtime.stdout.split(' ')[2])
68-
self._pending[taskid] = node.output_dir()
69-
logger.debug('submitted sge task: %d for node %s'%(taskid, node._id))
68+
tries = 0
69+
while True:
70+
try:
71+
result = cmd.run()
72+
except Exception, e:
73+
if tries<self._max_tries:
74+
tries += 1
75+
sleep(self._retry_timeout) # sleep 2 seconds and try again.
76+
else:
77+
iflogger.setLevel(oldlevel)
78+
raise RuntimeError('\n'.join((('Could not submit sge task'
79+
' for node %s') % node._id,
80+
str(e))))
81+
else:
82+
break
83+
iflogger.setLevel(oldlevel)
84+
# retrieve sge taskid
85+
taskid = int(result.runtime.stdout.split(' ')[2])
86+
self._pending[taskid] = node.output_dir()
87+
logger.debug('submitted sge task: %d for node %s'%(taskid, node._id))
7088
return taskid

0 commit comments

Comments
 (0)