Skip to content

Added support for the OAR task scheduler http://oar.imag.fr #1259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Nov 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
Next Release
============

* ENH: Added an OAR scheduler plugin (https://github.com/nipy/nipype/pull/1259)
* ENH: New ANTs interface: antsBrainExtraction (https://github.com/nipy/nipype/pull/1231)
* API: Default model level for the bedpostx workflow has been set to "2" following FSL 5.0.9 lead
* ENH: New interfaces for interacting with AWS S3: S3DataSink and S3DataGrabber (https://github.com/nipy/nipype/pull/1201)
Expand Down
30 changes: 29 additions & 1 deletion doc/users/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ available plugins allow local and distributed execution of workflows and
debugging. Each available plugin is described below.

Current plugins are available for Linear, Multiprocessing, IPython_ distributed
processing platforms and for direct processing on SGE_, PBS_, HTCondor_, LSF_, and SLURM_. We
processing platforms and for direct processing on SGE_, PBS_, HTCondor_, LSF_, OAR_, and SLURM_. We
anticipate future plugins for the Soma_ workflow.

.. note::
Expand Down Expand Up @@ -276,6 +276,34 @@ for all nodes could look like this::
wrapper_args=shim_args)
)

OAR
---

In order to use nipype with OAR_ you simply need to call::

workflow.run(plugin='OAR')

Optional arguments::

template: custom template file to use
oar_args: any other command line args to be passed to qsub.
max_jobname_len: (PBS only) maximum length of the job name. Default 15.

For example, the following snippet executes the workflow on myqueue with
a custom template::

workflow.run(plugin='oar',
plugin_args=dict(template='mytemplate.sh', oarsub_args='-q myqueue')

In addition to overall workflow configuration, you can use node level
configuration for OAR::

node.plugin_args = {'oarsub_args': '-l "nodes=1/cores=3"'}

this would apply only to the node and is useful in situations, where a
particular node might use more resources than other nodes in a workflow.


``qsub`` emulation
~~~~~~~~~~~~~~~~~~

Expand Down
1 change: 1 addition & 0 deletions nipype/pipeline/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .linear import LinearPlugin
from .ipythonx import IPythonXPlugin
from .pbs import PBSPlugin
from .oar import OARPlugin
from .sge import SGEPlugin
from .condor import CondorPlugin
from .dagman import CondorDAGManPlugin
Expand Down
146 changes: 146 additions & 0 deletions nipype/pipeline/plugins/oar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""Parallel workflow execution via OAR http://oar.imag.fr
"""

import os
import stat
from time import sleep
import subprocess
import json

from .base import (SGELikeBatchManagerBase, logger, iflogger, logging)

from nipype.interfaces.base import CommandLine


class OARPlugin(SGELikeBatchManagerBase):
"""Execute using OAR

The plugin_args input to run can be used to control the OAR execution.
Currently supported options are:

- template : template to use for batch job submission
- oarsub_args : arguments to be prepended to the job execution
script in the oarsub call
- max_jobname_len: maximum length of the job name. Default 15.

"""

# Addtional class variables
_max_jobname_len = 15
_oarsub_args = ''

def __init__(self, **kwargs):
template = """
# oarsub -J
"""
self._retry_timeout = 2
self._max_tries = 2
self._max_jobname_length = 15
if 'plugin_args' in kwargs and kwargs['plugin_args']:
if 'retry_timeout' in kwargs['plugin_args']:
self._retry_timeout = kwargs['plugin_args']['retry_timeout']
if 'max_tries' in kwargs['plugin_args']:
self._max_tries = kwargs['plugin_args']['max_tries']
if 'max_jobname_len' in kwargs['plugin_args']:
self._max_jobname_len = \
kwargs['plugin_args']['max_jobname_len']
super(OARPlugin, self).__init__(template, **kwargs)

def _is_pending(self, taskid):
# subprocess.Popen requires taskid to be a string
proc = subprocess.Popen(
['oarstat', '-J', '-s',
'-j', taskid],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
o, e = proc.communicate()
parsed_result = json.loads(o)[taskid].lower()
is_pending = (
('error' not in parsed_result) and
('terminated' not in parsed_result)
)
return is_pending

def _submit_batchtask(self, scriptfile, node):
cmd = CommandLine('oarsub', environ=os.environ.data,
terminal_output='allatonce')
path = os.path.dirname(scriptfile)
oarsubargs = ''
if self._oarsub_args:
oarsubargs = self._oarsub_args
if 'oarsub_args' in node.plugin_args:
if (
'overwrite' in node.plugin_args and
node.plugin_args['overwrite']
):
oarsubargs = node.plugin_args['oarsub_args']
else:
oarsubargs += (" " + node.plugin_args['oarsub_args'])

if node._hierarchy:
jobname = '.'.join((os.environ.data['LOGNAME'],
node._hierarchy,
node._id))
else:
jobname = '.'.join((os.environ.data['LOGNAME'],
node._id))
jobnameitems = jobname.split('.')
jobnameitems.reverse()
jobname = '.'.join(jobnameitems)
jobname = jobname[0:self._max_jobname_len]

if '-O' not in oarsubargs:
oarsubargs = '%s -O %s' % (
oarsubargs,
os.path.join(path, jobname + '.stdout')
)
if '-E' not in oarsubargs:
oarsubargs = '%s -E %s' % (
oarsubargs,
os.path.join(path, jobname + '.stderr')
)
if '-J' not in oarsubargs:
oarsubargs = '%s -J' % (oarsubargs)

os.chmod(scriptfile, stat.S_IEXEC | stat.S_IREAD | stat.S_IWRITE)
cmd.inputs.args = '%s -n %s -S %s' % (
oarsubargs,
jobname,
scriptfile
)

oldlevel = iflogger.level
iflogger.setLevel(logging.getLevelName('CRITICAL'))
tries = 0
while True:
try:
result = cmd.run()
except Exception as e:
if tries < self._max_tries:
tries += 1
sleep(self._retry_timeout)
# sleep 2 seconds and try again.
else:
iflogger.setLevel(oldlevel)
raise RuntimeError('\n'.join((('Could not submit OAR task'
' for node %s') % node._id,
str(e))))
else:
break
iflogger.setLevel(oldlevel)
# retrieve OAR taskid

o = ''
add = False
for line in result.runtime.stdout.splitlines():
if line.strip().startswith('{'):
add = True
if add:
o += line + '\n'
if line.strip().startswith('}'):
break
taskid = json.loads(o)['job_id']
self._pending[taskid] = node.output_dir()
logger.debug('submitted OAR task: %s for node %s' % (taskid, node._id))
return taskid
56 changes: 56 additions & 0 deletions nipype/pipeline/plugins/tests/test_oar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os
from shutil import rmtree
from tempfile import mkdtemp

import nipype.interfaces.base as nib
from nipype.testing import assert_equal, skipif
import nipype.pipeline.engine as pe


class InputSpec(nib.TraitedSpec):
input1 = nib.traits.Int(desc='a random int')
input2 = nib.traits.Int(desc='a random int')


class OutputSpec(nib.TraitedSpec):
output1 = nib.traits.List(nib.traits.Int, desc='outputs')


class TestInterface(nib.BaseInterface):
input_spec = InputSpec
output_spec = OutputSpec

def _run_interface(self, runtime):
runtime.returncode = 0
return runtime

def _list_outputs(self):
outputs = self._outputs().get()
outputs['output1'] = [1, self.inputs.input1]
return outputs


@skipif(True)
def test_run_oar():
cur_dir = os.getcwd()
temp_dir = mkdtemp(prefix='test_engine_', dir=os.getcwd())
os.chdir(temp_dir)

pipe = pe.Workflow(name='pipe')
mod1 = pe.Node(interface=TestInterface(), name='mod1')
mod2 = pe.MapNode(interface=TestInterface(),
iterfield=['input1'],
name='mod2')
pipe.connect([(mod1, mod2, [('output1', 'input1')])])
pipe.base_dir = os.getcwd()
mod1.inputs.input1 = 1
execgraph = pipe.run(plugin="OAR")
names = [
'.'.join((node._hierarchy, node.name))
for node in execgraph.nodes()
]
node = execgraph.nodes()[names.index('pipe.mod1')]
result = node.get_output('output1')
yield assert_equal, result, [1, 1]
os.chdir(cur_dir)
rmtree(temp_dir)