Skip to content

Commit 16808ae

Browse files
committed
Merge pull request #1259 from demianw/oar_support
Added support for the OAR task scheduler http://oar.imag.fr
2 parents f4f31b5 + 0110028 commit 16808ae

File tree

5 files changed

+234
-1
lines changed

5 files changed

+234
-1
lines changed

CHANGES

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
Next Release
22
============
3+
4+
* ENH: Added an OAR scheduler plugin (https://github.com/nipy/nipype/pull/1259)
35
* ENH: New ANTs interface: antsBrainExtraction (https://github.com/nipy/nipype/pull/1231)
46
* API: Default model level for the bedpostx workflow has been set to "2" following FSL 5.0.9 lead
57
* ENH: New interfaces for interacting with AWS S3: S3DataSink and S3DataGrabber (https://github.com/nipy/nipype/pull/1201)

doc/users/plugins.rst

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ available plugins allow local and distributed execution of workflows and
99
debugging. Each available plugin is described below.
1010

1111
Current plugins are available for Linear, Multiprocessing, IPython_ distributed
12-
processing platforms and for direct processing on SGE_, PBS_, HTCondor_, LSF_, and SLURM_. We
12+
processing platforms and for direct processing on SGE_, PBS_, HTCondor_, LSF_, OAR_, and SLURM_. We
1313
anticipate future plugins for the Soma_ workflow.
1414

1515
.. note::
@@ -276,6 +276,34 @@ for all nodes could look like this::
276276
wrapper_args=shim_args)
277277
)
278278

279+
OAR
280+
---
281+
282+
In order to use nipype with OAR_ you simply need to call::
283+
284+
workflow.run(plugin='OAR')
285+
286+
Optional arguments::
287+
288+
template: custom template file to use
289+
oar_args: any other command line args to be passed to qsub.
290+
max_jobname_len: (PBS only) maximum length of the job name. Default 15.
291+
292+
For example, the following snippet executes the workflow on myqueue with
293+
a custom template::
294+
295+
workflow.run(plugin='oar',
296+
plugin_args=dict(template='mytemplate.sh', oarsub_args='-q myqueue')
297+
298+
In addition to overall workflow configuration, you can use node level
299+
configuration for OAR::
300+
301+
node.plugin_args = {'oarsub_args': '-l "nodes=1/cores=3"'}
302+
303+
this would apply only to the node and is useful in situations, where a
304+
particular node might use more resources than other nodes in a workflow.
305+
306+
279307
``qsub`` emulation
280308
~~~~~~~~~~~~~~~~~~
281309

nipype/pipeline/plugins/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from .linear import LinearPlugin
66
from .ipythonx import IPythonXPlugin
77
from .pbs import PBSPlugin
8+
from .oar import OARPlugin
89
from .sge import SGEPlugin
910
from .condor import CondorPlugin
1011
from .dagman import CondorDAGManPlugin

nipype/pipeline/plugins/oar.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
"""Parallel workflow execution via OAR http://oar.imag.fr
2+
"""
3+
4+
import os
5+
import stat
6+
from time import sleep
7+
import subprocess
8+
import json
9+
10+
from .base import (SGELikeBatchManagerBase, logger, iflogger, logging)
11+
12+
from nipype.interfaces.base import CommandLine
13+
14+
15+
class OARPlugin(SGELikeBatchManagerBase):
16+
"""Execute using OAR
17+
18+
The plugin_args input to run can be used to control the OAR execution.
19+
Currently supported options are:
20+
21+
- template : template to use for batch job submission
22+
- oarsub_args : arguments to be prepended to the job execution
23+
script in the oarsub call
24+
- max_jobname_len: maximum length of the job name. Default 15.
25+
26+
"""
27+
28+
# Addtional class variables
29+
_max_jobname_len = 15
30+
_oarsub_args = ''
31+
32+
def __init__(self, **kwargs):
33+
template = """
34+
# oarsub -J
35+
"""
36+
self._retry_timeout = 2
37+
self._max_tries = 2
38+
self._max_jobname_length = 15
39+
if 'plugin_args' in kwargs and kwargs['plugin_args']:
40+
if 'retry_timeout' in kwargs['plugin_args']:
41+
self._retry_timeout = kwargs['plugin_args']['retry_timeout']
42+
if 'max_tries' in kwargs['plugin_args']:
43+
self._max_tries = kwargs['plugin_args']['max_tries']
44+
if 'max_jobname_len' in kwargs['plugin_args']:
45+
self._max_jobname_len = \
46+
kwargs['plugin_args']['max_jobname_len']
47+
super(OARPlugin, self).__init__(template, **kwargs)
48+
49+
def _is_pending(self, taskid):
50+
# subprocess.Popen requires taskid to be a string
51+
proc = subprocess.Popen(
52+
['oarstat', '-J', '-s',
53+
'-j', taskid],
54+
stdout=subprocess.PIPE,
55+
stderr=subprocess.PIPE
56+
)
57+
o, e = proc.communicate()
58+
parsed_result = json.loads(o)[taskid].lower()
59+
is_pending = (
60+
('error' not in parsed_result) and
61+
('terminated' not in parsed_result)
62+
)
63+
return is_pending
64+
65+
def _submit_batchtask(self, scriptfile, node):
66+
cmd = CommandLine('oarsub', environ=os.environ.data,
67+
terminal_output='allatonce')
68+
path = os.path.dirname(scriptfile)
69+
oarsubargs = ''
70+
if self._oarsub_args:
71+
oarsubargs = self._oarsub_args
72+
if 'oarsub_args' in node.plugin_args:
73+
if (
74+
'overwrite' in node.plugin_args and
75+
node.plugin_args['overwrite']
76+
):
77+
oarsubargs = node.plugin_args['oarsub_args']
78+
else:
79+
oarsubargs += (" " + node.plugin_args['oarsub_args'])
80+
81+
if node._hierarchy:
82+
jobname = '.'.join((os.environ.data['LOGNAME'],
83+
node._hierarchy,
84+
node._id))
85+
else:
86+
jobname = '.'.join((os.environ.data['LOGNAME'],
87+
node._id))
88+
jobnameitems = jobname.split('.')
89+
jobnameitems.reverse()
90+
jobname = '.'.join(jobnameitems)
91+
jobname = jobname[0:self._max_jobname_len]
92+
93+
if '-O' not in oarsubargs:
94+
oarsubargs = '%s -O %s' % (
95+
oarsubargs,
96+
os.path.join(path, jobname + '.stdout')
97+
)
98+
if '-E' not in oarsubargs:
99+
oarsubargs = '%s -E %s' % (
100+
oarsubargs,
101+
os.path.join(path, jobname + '.stderr')
102+
)
103+
if '-J' not in oarsubargs:
104+
oarsubargs = '%s -J' % (oarsubargs)
105+
106+
os.chmod(scriptfile, stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE)
107+
cmd.inputs.args = '%s -n %s -S %s' % (
108+
oarsubargs,
109+
jobname,
110+
scriptfile
111+
)
112+
113+
oldlevel = iflogger.level
114+
iflogger.setLevel(logging.getLevelName('CRITICAL'))
115+
tries = 0
116+
while True:
117+
try:
118+
result = cmd.run()
119+
except Exception as e:
120+
if tries < self._max_tries:
121+
tries += 1
122+
sleep(self._retry_timeout)
123+
# sleep 2 seconds and try again.
124+
else:
125+
iflogger.setLevel(oldlevel)
126+
raise RuntimeError('\n'.join((('Could not submit OAR task'
127+
' for node %s') % node._id,
128+
str(e))))
129+
else:
130+
break
131+
iflogger.setLevel(oldlevel)
132+
# retrieve OAR taskid
133+
134+
o = ''
135+
add = False
136+
for line in result.runtime.stdout.splitlines():
137+
if line.strip().startswith('{'):
138+
add = True
139+
if add:
140+
o += line + '\n'
141+
if line.strip().startswith('}'):
142+
break
143+
taskid = json.loads(o)['job_id']
144+
self._pending[taskid] = node.output_dir()
145+
logger.debug('submitted OAR task: %s for node %s' % (taskid, node._id))
146+
return taskid
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import os
2+
from shutil import rmtree
3+
from tempfile import mkdtemp
4+
5+
import nipype.interfaces.base as nib
6+
from nipype.testing import assert_equal, skipif
7+
import nipype.pipeline.engine as pe
8+
9+
10+
class InputSpec(nib.TraitedSpec):
11+
input1 = nib.traits.Int(desc='a random int')
12+
input2 = nib.traits.Int(desc='a random int')
13+
14+
15+
class OutputSpec(nib.TraitedSpec):
16+
output1 = nib.traits.List(nib.traits.Int, desc='outputs')
17+
18+
19+
class TestInterface(nib.BaseInterface):
20+
input_spec = InputSpec
21+
output_spec = OutputSpec
22+
23+
def _run_interface(self, runtime):
24+
runtime.returncode = 0
25+
return runtime
26+
27+
def _list_outputs(self):
28+
outputs = self._outputs().get()
29+
outputs['output1'] = [1, self.inputs.input1]
30+
return outputs
31+
32+
33+
@skipif(True)
34+
def test_run_oar():
35+
cur_dir = os.getcwd()
36+
temp_dir = mkdtemp(prefix='test_engine_', dir=os.getcwd())
37+
os.chdir(temp_dir)
38+
39+
pipe = pe.Workflow(name='pipe')
40+
mod1 = pe.Node(interface=TestInterface(), name='mod1')
41+
mod2 = pe.MapNode(interface=TestInterface(),
42+
iterfield=['input1'],
43+
name='mod2')
44+
pipe.connect([(mod1, mod2, [('output1', 'input1')])])
45+
pipe.base_dir = os.getcwd()
46+
mod1.inputs.input1 = 1
47+
execgraph = pipe.run(plugin="OAR")
48+
names = [
49+
'.'.join((node._hierarchy, node.name))
50+
for node in execgraph.nodes()
51+
]
52+
node = execgraph.nodes()[names.index('pipe.mod1')]
53+
result = node.get_output('output1')
54+
yield assert_equal, result, [1, 1]
55+
os.chdir(cur_dir)
56+
rmtree(temp_dir)

0 commit comments

Comments
 (0)