Skip to content

Commit 53735b6

Browse files
Use _PyObject_GetXIDataWithFallback().
1 parent ced8a4e commit 53735b6

File tree

10 files changed

+486
-438
lines changed

10 files changed

+486
-438
lines changed

Lib/concurrent/futures/interpreter.py

Lines changed: 13 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@ def __str__(self):
3636
""".strip())
3737

3838

39-
UNBOUND = 2 # error; this should not happen.
40-
41-
4239
class WorkerContext(_thread.WorkerContext):
4340

4441
@classmethod
@@ -47,23 +44,13 @@ def resolve_task(fn, args, kwargs):
4744
if isinstance(fn, str):
4845
# XXX Circle back to this later.
4946
raise TypeError('scripts not supported')
50-
if args or kwargs:
51-
raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}')
52-
data = textwrap.dedent(fn)
53-
kind = 'script'
54-
# Make sure the script compiles.
55-
# Ideally we wouldn't throw away the resulting code
56-
# object. However, there isn't much to be done until
57-
# code objects are shareable and/or we do a better job
58-
# of supporting code objects in _interpreters.exec().
59-
compile(data, '<string>', 'exec')
6047
else:
6148
# Functions defined in the __main__ module can't be pickled,
6249
# so they can't be used here. In the future, we could possibly
6350
# borrow from multiprocessing to work around this.
64-
data = pickle.dumps((fn, args, kwargs))
65-
kind = 'function'
66-
return (data, kind)
51+
task = (fn, args, kwargs)
52+
data = pickle.dumps(task)
53+
return data
6754

6855
if initializer is not None:
6956
try:
@@ -86,24 +73,20 @@ def _capture_exc(cls, resultsid):
8673
except BaseException as exc:
8774
# Send the captured exception out on the results queue,
8875
# but still leave it unhandled for the interpreter to handle.
89-
err = pickle.dumps(exc)
90-
_interpqueues.put(resultsid, (None, err), 1, UNBOUND)
76+
_interpqueues.put(resultsid, (None, exc))
9177
raise # re-raise
9278

9379
@classmethod
9480
def _send_script_result(cls, resultsid):
95-
_interpqueues.put(resultsid, (None, None), 0, UNBOUND)
81+
_interpqueues.put(resultsid, (None, None))
9682

9783
@classmethod
9884
def _call(cls, func, args, kwargs, resultsid):
9985
with cls._capture_exc(resultsid):
10086
res = func(*args or (), **kwargs or {})
10187
# Send the result back.
102-
try:
103-
_interpqueues.put(resultsid, (res, None), 0, UNBOUND)
104-
except _interpreters.NotShareableError:
105-
res = pickle.dumps(res)
106-
_interpqueues.put(resultsid, (res, None), 1, UNBOUND)
88+
with cls._capture_exc(resultsid):
89+
_interpqueues.put(resultsid, (res, None))
10790

10891
@classmethod
10992
def _call_pickled(cls, pickled, resultsid):
@@ -134,8 +117,7 @@ def initialize(self):
134117
_interpreters.incref(self.interpid)
135118

136119
maxsize = 0
137-
fmt = 0
138-
self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND)
120+
self.resultsid = _interpqueues.create(maxsize)
139121

140122
self._exec(f'from {__name__} import WorkerContext')
141123

@@ -166,17 +148,8 @@ def finalize(self):
166148
pass
167149

168150
def run(self, task):
169-
data, kind = task
170-
if kind == 'script':
171-
raise NotImplementedError('script kind disabled')
172-
script = f"""
173-
with WorkerContext._capture_exc({self.resultsid}):
174-
{textwrap.indent(data, ' ')}
175-
WorkerContext._send_script_result({self.resultsid})"""
176-
elif kind == 'function':
177-
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
178-
else:
179-
raise NotImplementedError(kind)
151+
data = task
152+
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
180153

181154
try:
182155
self._exec(script)
@@ -199,15 +172,13 @@ def run(self, task):
199172
continue
200173
else:
201174
break
202-
(res, excdata), pickled, unboundop = obj
175+
(res, exc), unboundop = obj
203176
assert unboundop is None, unboundop
204-
if excdata is not None:
177+
if exc is not None:
205178
assert res is None, res
206-
assert pickled
207179
assert exc_wrapper is not None
208-
exc = pickle.loads(excdata)
209180
raise exc from exc_wrapper
210-
return pickle.loads(res) if pickled else res
181+
return res
211182

212183

213184
class BrokenInterpreterPool(_thread.BrokenThreadPool):

Lib/test/support/interpreters/channels.py

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,23 @@ def create(*, unbounditems=UNBOUND):
5555
"""
5656
unbound = _serialize_unbound(unbounditems)
5757
unboundop, = unbound
58-
cid = _channels.create(unboundop)
59-
recv, send = RecvChannel(cid), SendChannel(cid, _unbound=unbound)
58+
cid = _channels.create(unboundop, -1)
59+
recv, send = RecvChannel(cid), SendChannel(cid)
60+
send._set_unbound(unboundop, unbounditems)
6061
return recv, send
6162

6263

6364
def list_all():
6465
"""Return a list of (recv, send) for all open channels."""
65-
return [(RecvChannel(cid), SendChannel(cid, _unbound=unbound))
66-
for cid, unbound in _channels.list_all()]
66+
channels = []
67+
for cid, unboundop, _ in _channels.list_all():
68+
chan = _, send = RecvChannel(cid), SendChannel(cid)
69+
if not hasattr(send, '_unboundop'):
70+
send._set_unbound(unboundop)
71+
else:
72+
assert send._unbound[0] == op
73+
channels.append(chan)
74+
return channels
6775

6876

6977
class _ChannelEnd:
@@ -175,78 +183,95 @@ class SendChannel(_ChannelEnd):
175183

176184
_end = 'send'
177185

178-
def __new__(cls, cid, *, _unbound=None):
179-
if _unbound is None:
180-
try:
181-
op = _channels.get_channel_defaults(cid)
182-
_unbound = (op,)
183-
except ChannelNotFoundError:
184-
_unbound = _serialize_unbound(UNBOUND)
185-
self = super().__new__(cls, cid)
186-
self._unbound = _unbound
187-
return self
186+
# def __new__(cls, cid, *, _unbound=None):
187+
# if _unbound is None:
188+
# try:
189+
# op = _channels.get_channel_defaults(cid)
190+
# _unbound = (op,)
191+
# except ChannelNotFoundError:
192+
# _unbound = _serialize_unbound(UNBOUND)
193+
# self = super().__new__(cls, cid)
194+
# self._unbound = _unbound
195+
# return self
196+
197+
def _set_unbound(self, op, items=None):
198+
assert not hasattr(self, '_unbound')
199+
if items is None:
200+
items = _resolve_unbound(op)
201+
unbound = (op, items)
202+
self._unbound = unbound
203+
return unbound
204+
205+
@property
206+
def unbounditems(self):
207+
try:
208+
_, items = self._unbound
209+
except AttributeError:
210+
op, _ = _channels.get_queue_defaults(self._id)
211+
_, items = self._set_unbound(op)
212+
return items
188213

189214
@property
190215
def is_closed(self):
191216
info = self._info
192217
return info.closed or info.closing
193218

194219
def send(self, obj, timeout=None, *,
195-
unbound=None,
220+
unbounditems=None,
196221
):
197222
"""Send the object (i.e. its data) to the channel's receiving end.
198223
199224
This blocks until the object is received.
200225
"""
201-
if unbound is None:
202-
unboundop, = self._unbound
226+
if unbounditems is None:
227+
unboundop = -1
203228
else:
204-
unboundop, = _serialize_unbound(unbound)
229+
unboundop, = _serialize_unbound(unbounditems)
205230
_channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True)
206231

207232
def send_nowait(self, obj, *,
208-
unbound=None,
233+
unbounditems=None,
209234
):
210235
"""Send the object to the channel's receiving end.
211236
212237
If the object is immediately received then return True
213238
(else False). Otherwise this is the same as send().
214239
"""
215-
if unbound is None:
216-
unboundop, = self._unbound
240+
if unbounditems is None:
241+
unboundop = -1
217242
else:
218-
unboundop, = _serialize_unbound(unbound)
243+
unboundop, = _serialize_unbound(unbounditems)
219244
# XXX Note that at the moment channel_send() only ever returns
220245
# None. This should be fixed when channel_send_wait() is added.
221246
# See bpo-32604 and gh-19829.
222247
return _channels.send(self._id, obj, unboundop, blocking=False)
223248

224249
def send_buffer(self, obj, timeout=None, *,
225-
unbound=None,
250+
unbounditems=None,
226251
):
227252
"""Send the object's buffer to the channel's receiving end.
228253
229254
This blocks until the object is received.
230255
"""
231-
if unbound is None:
232-
unboundop, = self._unbound
256+
if unbounditems is None:
257+
unboundop = -1
233258
else:
234-
unboundop, = _serialize_unbound(unbound)
259+
unboundop, = _serialize_unbound(unbounditems)
235260
_channels.send_buffer(self._id, obj, unboundop,
236261
timeout=timeout, blocking=True)
237262

238263
def send_buffer_nowait(self, obj, *,
239-
unbound=None,
264+
unbounditems=None,
240265
):
241266
"""Send the object's buffer to the channel's receiving end.
242267
243268
If the object is immediately received then return True
244269
(else False). Otherwise this is the same as send().
245270
"""
246-
if unbound is None:
247-
unboundop, = self._unbound
271+
if unbounditems is None:
272+
unboundop = -1
248273
else:
249-
unboundop, = _serialize_unbound(unbound)
274+
unboundop, = _serialize_unbound(unbounditems)
250275
return _channels.send_buffer(self._id, obj, unboundop, blocking=False)
251276

252277
def close(self):

0 commit comments

Comments
 (0)