Skip to content

Commit 2bbb9a8

Browse files
committed
Hack to make submit work
1 parent 081d3dc commit 2bbb9a8

File tree

2 files changed

+22
-5
lines changed

2 files changed

+22
-5
lines changed

charm4py/pool.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ def startSingleTask(self, func, future, *args):
165165
job.remote = self.workers.runTask_star_th
166166
else:
167167
job.remote = self.workers.runTask_star
168+
#future.set_running_or_notify_cancel()
168169
self.schedule()
169170

170171
def start(self, func, tasks, result, ncores, chunksize):
@@ -218,6 +219,7 @@ def schedule(self):
218219
job = self.job_next
219220
prev = self
220221
while job is not None:
222+
#job.future.set_running_or_notify_cancel()
221223
if len(self.idle_workers) == 0:
222224
return
223225
while True:
@@ -482,6 +484,7 @@ def Task(self, func, args, ret=False, awaitable=False):
482484
f = Future()
483485
# unpack the arguments for sending to allow benefiting from direct copy
484486
#print(f)
487+
#f.set_running_or_notify_cancel()
485488
self.pool_scheduler.startSingleTask(func, f, *args)
486489
return f
487490

@@ -556,13 +559,16 @@ def submit(self, fn, /, *args, **kwargs):
556559
"charm4py.pool.PoolExecutor object has been shut down")
557560

558561
if kwargs is None or len(kwargs) == 0:
559-
return self.pool.Task(fn, args, ret=True)
562+
future = self.pool.Task(fn, args, ret=True)
560563
#return self.pool.map_async(_StarmappedFunction(fn), (args,),
561564
# chunksize=1, ncores=-1)
562565
else:
563566
# Task doesn't support kwargs so this sneaks them in with a tuple
564567
iterable_arg = tuple([tuple([args, frozendict(kwargs)])])
565-
return self.pool.Task(_WrappedFunction(fn), iterable_arg, ret=True)
568+
future = self.pool.Task(_WrappedFunction(fn), iterable_arg, ret=True)
569+
570+
future.set_running_or_notify_cancel()
571+
return future
566572

567573
def map(self, func, *iterables, timeout=None, chunksize=1, ncores=-1):
568574
if self.is_shutdown:
@@ -586,7 +592,7 @@ def shutdown(self, wait=True, *, cancel_futures=False):
586592
job.future.cancel()
587593

588594
# Is this necessary?
589-
self.pool.pool_scheduler.schedule()
595+
#self.pool.pool_scheduler.schedule()
590596

591597
if wait:
592598
for job in self.pool.pool_scheduler.jobs:

charm4py/threads.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ def getTargetProxyEntryMethod(self):
106106

107107
def deposit(self, result):
108108
""" Deposit a value for this future. """
109-
110109
retval = False
111110
with self._condition:
112111
if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
@@ -283,11 +282,23 @@ def set_running_or_notify_cancel(self):
283282
"""
284283

285284
# How to force this future to start running asynchronously?
285+
# Can't really control which greenlet executes next so
286+
# a greenlet might complete before it is marked to run.
287+
# Need to make Concurrent.wait() work here.
288+
#if self._state == FINISHED:
289+
# return True
286290
retval = super().set_running_or_notify_cancel()
287291
if retval:
288292
#self.waitReady(None)
293+
# Pause this thread (and allow other threads to execute)
294+
threadMgr.pauseThread()
295+
#self.blocked = True
296+
# Resume this thread
289297
#self.resume(threadMgr)
290-
threadMgr.start()
298+
#threadMgr.start()
299+
#self.gr = getcurrent()
300+
#threadMgr.resumeThread(self.gr, self)
301+
#threadMgr.resumeThread(threadMgr.main_gr, self)
291302

292303
return retval
293304

0 commit comments

Comments
 (0)