Skip to content

Commit a9f8598

Browse files
committed
Merge pull request nipy#4 from hanke/enh/masterthread
enh: condor plugin via sge emulation
2 parents 102f688 + f1b7a31 commit a9f8598

File tree

2 files changed

+101
-1
lines changed

2 files changed

+101
-1
lines changed

nipype/pipeline/plugins/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@
66
from .ipython import IPythonPlugin
77
from .pbs import PBSPlugin
88
from .sge import SGEPlugin
9+
from .condor import CondorPlugin
910
from .multiproc import MultiProcPlugin
10-
from .ipythonxi import IPythonXIPlugin
11+
from .ipythonxi import IPythonXIPlugin

nipype/pipeline/plugins/condor.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
"""Parallel workflow execution via Condor
2+
"""
3+
4+
import os
5+
6+
from .base import (SGELikeBatchManagerBase, logger, iflogger, logging)
7+
8+
from nipype.interfaces.base import CommandLine
9+
10+
class CondorPlugin(SGELikeBatchManagerBase):
11+
"""Execute using Condor
12+
13+
This plugin doesn't work with a plain stock-Condor installation, but
14+
requires a 'qsub' emulation script for Condor, called 'condor_qsub'.
15+
This script is shipped with the Condor package from NeuroDebian, or can be
16+
downloaded from its Git repository at
17+
18+
http://anonscm.debian.org/gitweb/?p=pkg-exppsy/condor.git;a=blob_plain;f=debian/condor_qsub;hb=HEAD
19+
20+
The plugin_args input to run can be used to control the Condor execution.
21+
Currently supported options are:
22+
23+
- template : template to use for batch job submission. This can be an
24+
SGE-style script with the (limited) set of options supported
25+
by condor_qsub
26+
- qsub_args : arguments to be prepended to the job execution script in the
27+
qsub call
28+
"""
29+
30+
def __init__(self, **kwargs):
31+
template="""
32+
#$ -V
33+
#$ -S /bin/sh
34+
"""
35+
self._retry_timeout = 2
36+
self._max_tries = 2
37+
if 'plugin_args' in kwargs and kwargs['plugin_args']:
38+
if 'retry_timeout' in kwargs['plugin_args']:
39+
self._retry_timeout = kwargs['plugin_args']['retry_timeout']
40+
if 'max_tries' in kwargs['plugin_args']:
41+
self._max_tries = kwargs['plugin_args']['max_tries']
42+
super(CondorPlugin, self).__init__(template, **kwargs)
43+
44+
def _is_pending(self, taskid):
45+
cmd = CommandLine('condor_q')
46+
cmd.inputs.args = '%d'%taskid
47+
# check condor cluster
48+
oldlevel = iflogger.level
49+
iflogger.setLevel(logging.getLevelName('CRITICAL'))
50+
result = cmd.run(ignore_exception=True)
51+
iflogger.setLevel(oldlevel)
52+
if result.runtime.stdout.count('\n%d' % taskid):
53+
return True
54+
return False
55+
56+
def _submit_batchtask(self, scriptfile, node):
57+
cmd = CommandLine('condor_qsub', environ=os.environ.data)
58+
qsubargs = ''
59+
if self._qsub_args:
60+
qsubargs = self._qsub_args
61+
if node._hierarchy:
62+
jobname = '.'.join((os.environ.data['LOGNAME'],
63+
node._hierarchy,
64+
node._id))
65+
else:
66+
jobname = '.'.join((os.environ.data['LOGNAME'],
67+
node._id))
68+
jobnameitems = jobname.split('.')
69+
jobnameitems.reverse()
70+
jobname = '.'.join(jobnameitems)
71+
batch_dir = os.path.dirname(scriptfile)
72+
cmd.inputs.args = '-e %s -o %s %s -N %s %s'%(batch_dir,
73+
batch_dir,
74+
qsubargs,
75+
jobname,
76+
scriptfile)
77+
oldlevel = iflogger.level
78+
iflogger.setLevel(logging.getLevelName('CRITICAL'))
79+
tries = 0
80+
while True:
81+
try:
82+
result = cmd.run()
83+
except Exception, e:
84+
if tries<self._max_tries:
85+
tries += 1
86+
sleep(self._retry_timeout) # sleep 2 seconds and try again.
87+
else:
88+
iflogger.setLevel(oldlevel)
89+
raise RuntimeError('\n'.join((('Could not submit condor cluster'
90+
' for node %s') % node._id,
91+
str(e))))
92+
else:
93+
break
94+
iflogger.setLevel(oldlevel)
95+
# retrieve condor clusterid
96+
taskid = int(result.runtime.stdout.split(' ')[2])
97+
self._pending[taskid] = node.output_dir()
98+
logger.debug('submitted condor cluster: %d for node %s'%(taskid, node._id))
99+
return taskid

0 commit comments

Comments
 (0)