Skip to content

Commit 62d7c2c

Browse files
Use _interpreters.call().
1 parent b8171d9 commit 62d7c2c

File tree

1 file changed

+40
-67
lines changed

1 file changed

+40
-67
lines changed

Lib/concurrent/futures/interpreter.py

Lines changed: 40 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,8 @@ def resolve_task(fn, args, kwargs):
4545
# XXX Circle back to this later.
4646
raise TypeError('scripts not supported')
4747
else:
48-
# Functions defined in the __main__ module can't be pickled,
49-
# so they can't be used here. In the future, we could possibly
50-
# borrow from multiprocessing to work around this.
5148
task = (fn, args, kwargs)
52-
data = pickle.dumps(task)
53-
return data
49+
return task
5450

5551
if initializer is not None:
5652
try:
@@ -65,35 +61,6 @@ def create_context():
6561
return cls(initdata, shared)
6662
return create_context, resolve_task
6763

68-
@classmethod
69-
@contextlib.contextmanager
70-
def _capture_exc(cls, resultsid):
71-
try:
72-
yield
73-
except BaseException as exc:
74-
# Send the captured exception out on the results queue,
75-
# but still leave it unhandled for the interpreter to handle.
76-
_interpqueues.put(resultsid, (None, exc))
77-
raise # re-raise
78-
79-
@classmethod
80-
def _send_script_result(cls, resultsid):
81-
_interpqueues.put(resultsid, (None, None))
82-
83-
@classmethod
84-
def _call(cls, func, args, kwargs, resultsid):
85-
with cls._capture_exc(resultsid):
86-
res = func(*args or (), **kwargs or {})
87-
# Send the result back.
88-
with cls._capture_exc(resultsid):
89-
_interpqueues.put(resultsid, (res, None))
90-
91-
@classmethod
92-
def _call_pickled(cls, pickled, resultsid):
93-
with cls._capture_exc(resultsid):
94-
fn, args, kwargs = pickle.loads(pickled)
95-
cls._call(fn, args, kwargs, resultsid)
96-
9764
def __init__(self, initdata, shared=None):
9865
self.initdata = initdata
9966
self.shared = dict(shared) if shared else None
@@ -104,11 +71,42 @@ def __del__(self):
10471
if self.interpid is not None:
10572
self.finalize()
10673

107-
def _exec(self, script):
108-
assert self.interpid is not None
109-
excinfo = _interpreters.exec(self.interpid, script, restrict=True)
74+
def _call(self, fn, args, kwargs):
75+
def do_call(resultsid, func, *args, **kwargs):
76+
try:
77+
return func(*args, **kwargs)
78+
except BaseException as exc:
79+
# Avoid relying on globals.
80+
import _interpqueues
81+
# Send the captured exception out on the results queue,
82+
# but still leave it unhandled for the interpreter to handle.
83+
_interpqueues.put(resultsid, err)
84+
raise # re-raise
85+
86+
args = (self.resultsid, fn, *args)
87+
res, excinfo = _interpreters.call(self.interpid, do_call, args, kwargs)
11088
if excinfo is not None:
11189
raise ExecutionFailed(excinfo)
90+
return res
91+
92+
def _get_exception(self):
93+
# Wait for the exception data to show up.
94+
while True:
95+
try:
96+
excdata = _interpqueues.get(self.resultsid)
97+
except _interpqueues.QueueNotFoundError:
98+
raise # re-raise
99+
except _interpqueues.QueueError:
100+
continue
101+
except ModuleNotFoundError:
102+
# interpreters.queues doesn't exist, which means
103+
# QueueEmpty doesn't. Act as though it does.
104+
continue
105+
else:
106+
break
107+
exc, unboundop = excdata
108+
assert unboundop is None, unboundop
109+
return exc
112110

113111
def initialize(self):
114112
assert self.interpid is None, self.interpid
@@ -148,37 +146,12 @@ def finalize(self):
148146
pass
149147

150148
def run(self, task):
151-
data = task
152-
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
153-
149+
fn, args, kwargs = task
154150
try:
155-
self._exec(script)
156-
except ExecutionFailed as exc:
157-
exc_wrapper = exc
158-
else:
159-
exc_wrapper = None
160-
161-
# Return the result, or raise the exception.
162-
while True:
163-
try:
164-
obj = _interpqueues.get(self.resultsid)
165-
except _interpqueues.QueueNotFoundError:
166-
raise # re-raise
167-
except _interpqueues.QueueError:
168-
continue
169-
except ModuleNotFoundError:
170-
# interpreters.queues doesn't exist, which means
171-
# QueueEmpty doesn't. Act as though it does.
172-
continue
173-
else:
174-
break
175-
(res, exc), unboundop = obj
176-
assert unboundop is None, unboundop
177-
if exc is not None:
178-
assert res is None, res
179-
assert exc_wrapper is not None
180-
raise exc from exc_wrapper
181-
return res
151+
return self._call(fn, args, kwargs)
152+
except ExecutionFailed as wrapper:
153+
exc = self._get_exception()
154+
raise exc from wrapper
182155

183156

184157
class BrokenInterpreterPool(_thread.BrokenThreadPool):

0 commit comments

Comments
 (0)