Skip to content

Commit 70897b2

Browse files
committed
add ResourceMultiprocPlugi
2 parents 11111cd + 827d2c2 commit 70897b2

File tree

8 files changed

+658
-6
lines changed

8 files changed

+658
-6
lines changed

nipype/interfaces/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,8 @@ def __init__(self, **inputs):
750750
raise Exception('No input_spec in class: %s' %
751751
self.__class__.__name__)
752752
self.inputs = self.input_spec(**inputs)
753+
self.memory = 1
754+
self.num_threads = 1
753755

754756
@classmethod
755757
def help(cls, returnhelp=False):

nipype/pipeline/plugins/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@
99
from .condor import CondorPlugin
1010
from .dagman import CondorDAGManPlugin
1111
from .multiproc import MultiProcPlugin
12+
from .multiproc import ResourceMultiProcPlugin
1213
from .ipython import IPythonPlugin
1314
from .somaflow import SomaFlowPlugin
1415
from .pbsgraph import PBSGraphPlugin
1516
from .sgegraph import SGEGraphPlugin
1617
from .lsf import LSFPlugin
1718
from .slurm import SLURMPlugin
1819
from .slurmgraph import SLURMGraphPlugin
20+
21+
from .callback_log import log_nodes_cb

nipype/pipeline/plugins/base.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import numpy as np
1818
import scipy.sparse as ssp
1919

20-
2120
from ..utils import (nx, dfs_preorder, topological_sort)
2221
from ..engine import (MapNode, str2bool)
2322

@@ -261,10 +260,15 @@ def run(self, graph, config, updatehash=False):
261260
graph=graph)
262261
else:
263262
logger.debug('Not submitting')
264-
sleep(float(self._config['execution']['poll_sleep_duration']))
263+
self._wait()
265264
self._remove_node_dirs()
266265
report_nodes_not_run(notrun)
267266

267+
268+
269+
def _wait(self):
270+
sleep(float(self._config['execution']['poll_sleep_duration']))
271+
268272
def _get_result(self, taskid):
269273
raise NotImplementedError
270274

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import datetime
2+
import logging
3+
4+
def log_nodes_cb(node, status):
5+
print 'status', status
6+
logger = logging.getLogger('callback')
7+
if status == 'start':
8+
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' +\
9+
node._id + '"' + ',"start":' + '"' +str(datetime.datetime.now()) +\
10+
'"' + ',"memory":' + str(node._interface.memory) + ',"num_threads":' \
11+
+ str(node._interface.num_threads) + '}'
12+
13+
logger.debug(message)
14+
15+
elif status == 'end':
16+
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' + \
17+
node._id + '"' + ',"finish":' + '"' + str(datetime.datetime.now()) +\
18+
'"' + ',"memory":' + str(node._interface.memory) + ',"num_threads":' \
19+
+ str(node._interface.num_threads) + '}'
20+
21+
logger.debug(message)
22+
23+
else:
24+
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' + \
25+
node._id + '"' + ',"finish":' + '"' + str(datetime.datetime.now()) +\
26+
'"' + ',"memory":' + str(node._interface.memory) + ',"num_threads":' \
27+
+ str(node._interface.num_threads) + ',"error":"True"}'
28+
29+
logger.debug(message)

nipype/pipeline/plugins/multiproc.py

Lines changed: 170 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from .base import (DistributedPluginBase, report_crash)
1414

15+
1516
def run_node(node, updatehash):
1617
result = dict(result=None, traceback=None)
1718
try:
@@ -22,6 +23,7 @@ def run_node(node, updatehash):
2223
result['result'] = node.result
2324
return result
2425

26+
2527
class NonDaemonProcess(Process):
2628
"""A non-daemon process to support internal multiprocessing.
2729
"""
@@ -66,6 +68,7 @@ def __init__(self, plugin_args=None):
6668
else:
6769
self.pool = Pool(processes=n_procs)
6870

71+
6972
def _get_result(self, taskid):
7073
if taskid not in self._taskresult:
7174
raise RuntimeError('Multiproc task %d not found'%taskid)
@@ -80,8 +83,7 @@ def _submit_job(self, node, updatehash=False):
8083
node.inputs.terminal_output = 'allatonce'
8184
except:
8285
pass
83-
self._taskresult[self._taskid] = self.pool.apply_async(run_node,
84-
(node,
86+
self._taskresult[self._taskid] = self.pool.apply_async(run_node, (node,
8587
updatehash,))
8688
return self._taskid
8789

@@ -96,3 +98,169 @@ def _report_crash(self, node, result=None):
9698

9799
def _clear_task(self, taskid):
98100
del self._taskresult[taskid]
101+
102+
103+
104+
import numpy as np
105+
from copy import deepcopy
106+
from ..engine import (MapNode, str2bool)
107+
import datetime
108+
import psutil
109+
from ... import logging
110+
import semaphore_singleton
111+
logger = logging.getLogger('workflow')
112+
113+
def release_lock(args):
114+
semaphore_singleton.semaphore.release()
115+
116+
class ResourceMultiProcPlugin(MultiProcPlugin):
117+
"""Execute workflow with multiprocessing not sending more jobs at once
118+
than the system can support.
119+
120+
The plugin_args input to run can be used to control the multiprocessing
121+
execution and defining the maximum amount of memory and threads that
122+
should be used. When those parameters are not specified,
123+
the number of threads and memory of the system is used.
124+
125+
System consuming nodes should be tagged:
126+
memory_consuming_node.interface.memory = 8 #Gb
127+
thread_consuming_node.interface.num_threads = 16
128+
129+
The default number of threads and memory for a node is 1.
130+
131+
Currently supported options are:
132+
133+
- num_thread: maximum number of threads to be executed in parallel
134+
- memory: maximum memory that can be used at once.
135+
136+
"""
137+
138+
def __init__(self, plugin_args=None):
139+
super(ResourceMultiProcPlugin, self).__init__(plugin_args=plugin_args)
140+
self.plugin_args = plugin_args
141+
self.processors = cpu_count()
142+
memory = psutil.virtual_memory()
143+
self.memory = memory.total / (1024*1024*1024)
144+
if self.plugin_args:
145+
if 'n_procs' in self.plugin_args:
146+
self.processors = self.plugin_args['n_procs']
147+
if 'memory' in self.plugin_args:
148+
self.memory = self.plugin_args['memory']
149+
150+
def _wait(self):
151+
if len(self.pending_tasks) > 0:
152+
semaphore_singleton.semaphore.acquire()
153+
semaphore_singleton.semaphore.release()
154+
155+
156+
def _submit_job(self, node, updatehash=False):
157+
self._taskid += 1
158+
try:
159+
if node.inputs.terminal_output == 'stream':
160+
node.inputs.terminal_output = 'allatonce'
161+
except:
162+
pass
163+
self._taskresult[self._taskid] = self.pool.apply_async(run_node, (node,
164+
updatehash,), callback=release_lock)
165+
return self._taskid
166+
167+
def _send_procs_to_workers(self, updatehash=False, graph=None):
168+
""" Sends jobs to workers when system resources are available.
169+
Check memory (gb) and cores usage before running jobs.
170+
"""
171+
executing_now = []
172+
173+
# Check to see if a job is available
174+
jobids = np.flatnonzero((self.proc_pending == True) & (self.depidx.sum(axis=0) == 0).__array__())
175+
176+
#check available system resources by summing all threads and memory used
177+
busy_memory = 0
178+
busy_processors = 0
179+
for jobid in jobids:
180+
busy_memory+= self.procs[jobid]._interface.memory
181+
busy_processors+= self.procs[jobid]._interface.num_threads
182+
183+
free_memory = self.memory - busy_memory
184+
free_processors = self.processors - busy_processors
185+
186+
187+
#check all jobs without dependency not run
188+
jobids = np.flatnonzero((self.proc_done == False) & (self.depidx.sum(axis=0) == 0).__array__())
189+
190+
191+
#sort jobs ready to run first by memory and then by number of threads
192+
#The most resource consuming jobs run first
193+
jobids = sorted(jobids, key=lambda item: (self.procs[item]._interface.memory, self.procs[item]._interface.num_threads))
194+
195+
logger.debug('Free memory: %d, Free processors: %d', free_memory, free_processors)
196+
197+
198+
#while have enough memory and processors for first job
199+
#submit first job on the list
200+
for jobid in jobids:
201+
logger.debug('Next Job: %d, memory: %d, threads: %d' %(jobid, self.procs[jobid]._interface.memory, self.procs[jobid]._interface.num_threads))
202+
203+
if self.procs[jobid]._interface.memory <= free_memory and self.procs[jobid]._interface.num_threads <= free_processors:
204+
logger.info('Executing: %s ID: %d' %(self.procs[jobid]._id, jobid))
205+
executing_now.append(self.procs[jobid])
206+
207+
if isinstance(self.procs[jobid], MapNode):
208+
try:
209+
num_subnodes = self.procs[jobid].num_subnodes()
210+
except Exception:
211+
self._clean_queue(jobid, graph)
212+
self.proc_pending[jobid] = False
213+
continue
214+
if num_subnodes > 1:
215+
submit = self._submit_mapnode(jobid)
216+
if not submit:
217+
continue
218+
219+
# change job status in appropriate queues
220+
self.proc_done[jobid] = True
221+
self.proc_pending[jobid] = True
222+
223+
free_memory -= self.procs[jobid]._interface.memory
224+
free_processors -= self.procs[jobid]._interface.num_threads
225+
226+
# Send job to task manager and add to pending tasks
227+
if self._status_callback:
228+
self._status_callback(self.procs[jobid], 'start')
229+
230+
if str2bool(self.procs[jobid].config['execution']['local_hash_check']):
231+
logger.debug('checking hash locally')
232+
try:
233+
hash_exists, _, _, _ = self.procs[
234+
jobid].hash_exists()
235+
logger.debug('Hash exists %s' % str(hash_exists))
236+
if (hash_exists and (self.procs[jobid].overwrite == False or (self.procs[jobid].overwrite == None and not self.procs[jobid]._interface.always_run))):
237+
self._task_finished_cb(jobid)
238+
self._remove_node_dirs()
239+
continue
240+
except Exception:
241+
self._clean_queue(jobid, graph)
242+
self.proc_pending[jobid] = False
243+
continue
244+
logger.debug('Finished checking hash')
245+
246+
if self.procs[jobid].run_without_submitting:
247+
logger.debug('Running node %s on master thread' %self.procs[jobid])
248+
try:
249+
self.procs[jobid].run()
250+
except Exception:
251+
self._clean_queue(jobid, graph)
252+
self._task_finished_cb(jobid)
253+
self._remove_node_dirs()
254+
255+
else:
256+
logger.debug('submitting', jobid)
257+
tid = self._submit_job(deepcopy(self.procs[jobid]), updatehash=updatehash)
258+
if tid is None:
259+
self.proc_done[jobid] = False
260+
self.proc_pending[jobid] = False
261+
else:
262+
self.pending_tasks.insert(0, (tid, jobid))
263+
else:
264+
break
265+
266+
logger.debug('No jobs waiting to execute')
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
import threading
2+
semaphore = threading.Semaphore(1)

0 commit comments

Comments
 (0)