From c61736198c911f50c4771a7ad03f4e7d0cdda017 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 18 Sep 2023 12:01:42 -0600 Subject: [PATCH 1/7] Implement waiting for _channels.send(). --- Lib/test/support/interpreters.py | 12 ++-- Lib/test/test_interpreters.py | 2 +- Modules/_xxinterpchannelsmodule.c | 114 ++++++++++++++++++++++++------ 3 files changed, 97 insertions(+), 31 deletions(-) diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters.py index d61724ca86b66c..4ad9ee47692edd 100644 --- a/Lib/test/support/interpreters.py +++ b/Lib/test/support/interpreters.py @@ -208,11 +208,7 @@ def send(self, obj): This blocks until the object is received. """ - _channels.send(self._id, obj) - # XXX We are missing a low-level channel_send_wait(). - # See bpo-32604 and gh-19829. - # Until that shows up we fake it: - time.sleep(2) + _channels.send(self._id, obj, wait=True) def send_nowait(self, obj): """Send the object to the channel's receiving end. @@ -223,14 +219,14 @@ def send_nowait(self, obj): # XXX Note that at the moment channel_send() only ever returns # None. This should be fixed when channel_send_wait() is added. # See bpo-32604 and gh-19829. - return _channels.send(self._id, obj) + return _channels.send(self._id, obj, wait=False) def send_buffer(self, obj): """Send the object's buffer to the channel's receiving end. This blocks until the object is received. """ - _channels.send_buffer(self._id, obj) + _channels.send_buffer(self._id, obj, wait=True) def send_buffer_nowait(self, obj): """Send the object's buffer to the channel's receiving end. @@ -238,7 +234,7 @@ def send_buffer_nowait(self, obj): If the object is immediately received then return True (else False). Otherwise this is the same as send(). """ - return _channels.send_buffer(self._id, obj) + return _channels.send_buffer(self._id, obj, wait=False) def close(self): _channels.close(self._id, send=True) diff --git a/Lib/test/test_interpreters.py b/Lib/test/test_interpreters.py index fe7b14de459a7d..0910b51bfe5dbd 100644 --- a/Lib/test/test_interpreters.py +++ b/Lib/test/test_interpreters.py @@ -964,8 +964,8 @@ def f(): orig = b'spam' s.send(orig) - t.join() obj = r.recv() + t.join() self.assertEqual(obj, orig) self.assertIsNot(obj, orig) diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index a1531c5c3db34d..88bd5346e40fa9 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -234,6 +234,14 @@ add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared, return cls; } +static void +wait_for_lock(PyThread_type_lock mutex) +{ + // XXX Handle eintr, etc. + PyThread_acquire_lock(mutex, WAIT_LOCK); + PyThread_release_lock(mutex); +} + /* Cross-interpreter Buffer Views *******************************************/ @@ -567,6 +575,7 @@ struct _channelitem; typedef struct _channelitem { _PyCrossInterpreterData *data; + PyThread_type_lock recv_mutex; struct _channelitem *next; } _channelitem; @@ -612,10 +621,11 @@ _channelitem_free_all(_channelitem *item) } static _PyCrossInterpreterData * -_channelitem_popped(_channelitem *item) +_channelitem_popped(_channelitem *item, PyThread_type_lock *recv_mutex) { _PyCrossInterpreterData *data = item->data; item->data = NULL; + *recv_mutex = item->recv_mutex; _channelitem_free(item); return data; } @@ -657,13 +667,15 @@ _channelqueue_free(_channelqueue *queue) } static int -_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data) +_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data, + PyThread_type_lock recv_mutex) { _channelitem *item = _channelitem_new(); if (item == NULL) { return -1; } item->data = data; + item->recv_mutex = recv_mutex; queue->count += 1; if (queue->first == NULL) { @@ -677,7 +689,7 @@ _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data) } static _PyCrossInterpreterData * -_channelqueue_get(_channelqueue *queue) +_channelqueue_get(_channelqueue *queue, PyThread_type_lock *recv_mutex) { _channelitem *item = queue->first; if (item == NULL) { @@ -689,7 +701,7 @@ _channelqueue_get(_channelqueue *queue) } queue->count -= 1; - return _channelitem_popped(item); + return _channelitem_popped(item, recv_mutex); } static void @@ -1006,7 +1018,7 @@ _channel_free(_PyChannelState *chan) static int _channel_add(_PyChannelState *chan, int64_t interp, - _PyCrossInterpreterData *data) + _PyCrossInterpreterData *data, PyThread_type_lock recv_mutex) { int res = -1; PyThread_acquire_lock(chan->mutex, WAIT_LOCK); @@ -1020,7 +1032,7 @@ _channel_add(_PyChannelState *chan, int64_t interp, goto done; } - if (_channelqueue_put(chan->queue, data) != 0) { + if (_channelqueue_put(chan->queue, data, recv_mutex) != 0) { goto done; } @@ -1046,12 +1058,17 @@ _channel_next(_PyChannelState *chan, int64_t interp, goto done; } - _PyCrossInterpreterData *data = _channelqueue_get(chan->queue); + PyThread_type_lock recv_mutex = NULL; + _PyCrossInterpreterData *data = _channelqueue_get(chan->queue, &recv_mutex); if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) { chan->open = 0; } *res = data; + if (recv_mutex != NULL) { + PyThread_release_lock(recv_mutex); + } + done: PyThread_release_lock(chan->mutex); if (chan->queue->count == 0) { @@ -1571,7 +1588,8 @@ _channel_destroy(_channels *channels, int64_t id) } static int -_channel_send(_channels *channels, int64_t id, PyObject *obj) +_channel_send(_channels *channels, int64_t id, PyObject *obj, + PyThread_type_lock recv_mutex) { PyInterpreterState *interp = _get_current_interp(); if (interp == NULL) { @@ -1606,7 +1624,8 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj) } // Add the data to the channel. - int res = _channel_add(chan, PyInterpreterState_GetID(interp), data); + int res = _channel_add(chan, PyInterpreterState_GetID(interp), data, + recv_mutex); PyThread_release_lock(mutex); if (res != 0) { // We may chain an exception here: @@ -2489,22 +2508,47 @@ receive end."); static PyObject * channel_send(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"cid", "obj", NULL}; + // XXX Add a timeout arg. + static char *kwlist[] = {"cid", "obj", "wait", NULL}; int64_t cid; struct channel_id_converter_data cid_data = { .module = self, }; PyObject *obj; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist, - channel_id_converter, &cid_data, &obj)) { + int wait = 1; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$p:channel_send", kwlist, + channel_id_converter, &cid_data, &obj, + &wait)) { return NULL; } cid = cid_data.cid; - int err = _channel_send(&_globals.channels, cid, obj); - if (handle_channel_error(err, self, cid)) { - return NULL; + if (wait) { + PyThread_type_lock mutex = PyThread_allocate_lock(); + if (mutex == NULL) { + PyErr_NoMemory(); + return NULL; + } + PyThread_acquire_lock(mutex, WAIT_LOCK); + + /* Queue up the object. */ + int err = _channel_send(&_globals.channels, cid, obj, mutex); + if (handle_channel_error(err, self, cid)) { + PyThread_release_lock(mutex); + return NULL; + } + + /* Wait until the object is received. */ + wait_for_lock(mutex); } + else { + /* Queue up the object. */ + int err = _channel_send(&_globals.channels, cid, obj, NULL); + if (handle_channel_error(err, self, cid)) { + return NULL; + } + } + Py_RETURN_NONE; } @@ -2516,15 +2560,17 @@ Add the object's data to the channel's queue."); static PyObject * channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"cid", "obj", NULL}; + static char *kwlist[] = {"cid", "obj", "wait", NULL}; int64_t cid; struct channel_id_converter_data cid_data = { .module = self, }; PyObject *obj; + int wait = 1; if (!PyArg_ParseTupleAndKeywords(args, kwds, - "O&O:channel_send_buffer", kwlist, - channel_id_converter, &cid_data, &obj)) { + "O&O|$p:channel_send_buffer", kwlist, + channel_id_converter, &cid_data, &obj, + &wait)) { return NULL; } cid = cid_data.cid; @@ -2534,11 +2580,35 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) return NULL; } - int err = _channel_send(&_globals.channels, cid, tempobj); - Py_DECREF(tempobj); - if (handle_channel_error(err, self, cid)) { - return NULL; + if (wait) { + PyThread_type_lock mutex = PyThread_allocate_lock(); + if (mutex == NULL) { + Py_DECREF(tempobj); + PyErr_NoMemory(); + return NULL; + } + PyThread_acquire_lock(mutex, WAIT_LOCK); + + /* Queue up the buffer. */ + int err = _channel_send(&_globals.channels, cid, tempobj, mutex); + Py_DECREF(tempobj); + if (handle_channel_error(err, self, cid)) { + PyThread_acquire_lock(mutex, WAIT_LOCK); + return NULL; + } + + /* Wait until the buffer is received. */ + wait_for_lock(mutex); } + else { + /* Queue up the buffer. */ + int err = _channel_send(&_globals.channels, cid, tempobj, NULL); + Py_DECREF(tempobj); + if (handle_channel_error(err, self, cid)) { + return NULL; + } + } + Py_RETURN_NONE; } From 1701ca022b83658f72685d553c5b92638ac0e286 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 9 Oct 2023 06:25:56 -0600 Subject: [PATCH 2/7] Release the GIL while waiting for the send lock. --- Modules/_xxinterpchannelsmodule.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index 88bd5346e40fa9..c53a50f02cf0d4 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -237,8 +237,11 @@ add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared, static void wait_for_lock(PyThread_type_lock mutex) { + Py_BEGIN_ALLOW_THREADS // XXX Handle eintr, etc. PyThread_acquire_lock(mutex, WAIT_LOCK); + Py_END_ALLOW_THREADS + PyThread_release_lock(mutex); } From b5dbf81cfeb2909d4778cf4d6e83aade70bb1d98 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 9 Oct 2023 06:55:36 -0600 Subject: [PATCH 3/7] Fix tests. --- Lib/test/test__xxinterpchannels.py | 136 ++++++++++++++--------------- 1 file changed, 68 insertions(+), 68 deletions(-) diff --git a/Lib/test/test__xxinterpchannels.py b/Lib/test/test__xxinterpchannels.py index cb69f73c4348d4..32549ebf37898d 100644 --- a/Lib/test/test__xxinterpchannels.py +++ b/Lib/test/test__xxinterpchannels.py @@ -189,7 +189,7 @@ def run_action(cid, action, end, state, *, hideclosed=True): def _run_action(cid, action, end, state): if action == 'use': if end == 'send': - channels.send(cid, b'spam') + channels.send(cid, b'spam', wait=False) return state.incr() elif end == 'recv': if not state.pending: @@ -332,7 +332,7 @@ def test_shareable(self): chan = channels.create() obj = channels.create() - channels.send(chan, obj) + channels.send(chan, obj, wait=False) got = channels.recv(chan) self.assertEqual(got, obj) @@ -390,7 +390,7 @@ def test_channel_list_interpreters_basic(self): """Test basic listing channel interpreters.""" interp0 = interpreters.get_main() cid = channels.create() - channels.send(cid, "send") + channels.send(cid, "send", wait=False) # Test for a channel that has one end associated to an interpreter. send_interps = channels.list_interpreters(cid, send=True) recv_interps = channels.list_interpreters(cid, send=False) @@ -416,10 +416,10 @@ def test_channel_list_interpreters_multiple(self): interp3 = interpreters.create() cid = channels.create() - channels.send(cid, "send") + channels.send(cid, "send", wait=False) _run_output(interp1, dedent(f""" import _xxinterpchannels as _channels - _channels.send({cid}, "send") + _channels.send({cid}, "send", wait=False) """)) _run_output(interp2, dedent(f""" import _xxinterpchannels as _channels @@ -439,7 +439,7 @@ def test_channel_list_interpreters_destroyed(self): interp0 = interpreters.get_main() interp1 = interpreters.create() cid = channels.create() - channels.send(cid, "send") + channels.send(cid, "send", wait=False) _run_output(interp1, dedent(f""" import _xxinterpchannels as _channels obj = _channels.recv({cid}) @@ -465,12 +465,12 @@ def test_channel_list_interpreters_released(self): interp1 = interpreters.create() interp2 = interpreters.create() cid = channels.create() - channels.send(cid, "data") + channels.send(cid, "data", wait=False) _run_output(interp1, dedent(f""" import _xxinterpchannels as _channels obj = _channels.recv({cid}) """)) - channels.send(cid, "data") + channels.send(cid, "data", wait=False) _run_output(interp2, dedent(f""" import _xxinterpchannels as _channels obj = _channels.recv({cid}) @@ -506,7 +506,7 @@ def test_channel_list_interpreters_closed(self): interp1 = interpreters.create() cid = channels.create() # Put something in the channel so that it's not empty. - channels.send(cid, "send") + channels.send(cid, "send", wait=False) # Check initial state. send_interps = channels.list_interpreters(cid, send=True) @@ -528,7 +528,7 @@ def test_channel_list_interpreters_closed_send_end(self): interp1 = interpreters.create() cid = channels.create() # Put something in the channel so that it's not empty. - channels.send(cid, "send") + channels.send(cid, "send", wait=False) # Check initial state. send_interps = channels.list_interpreters(cid, send=True) @@ -562,7 +562,7 @@ def test_channel_list_interpreters_closed_send_end(self): def test_send_recv_main(self): cid = channels.create() orig = b'spam' - channels.send(cid, orig) + channels.send(cid, orig, wait=False) obj = channels.recv(cid) self.assertEqual(obj, orig) @@ -574,7 +574,7 @@ def test_send_recv_same_interpreter(self): import _xxinterpchannels as _channels cid = _channels.create() orig = b'spam' - _channels.send(cid, orig) + _channels.send(cid, orig, wait=False) obj = _channels.recv(cid) assert obj is not orig assert obj == orig @@ -585,7 +585,7 @@ def test_send_recv_different_interpreters(self): id1 = interpreters.create() out = _run_output(id1, dedent(f""" import _xxinterpchannels as _channels - _channels.send({cid}, b'spam') + _channels.send({cid}, b'spam', wait=False) """)) obj = channels.recv(cid) @@ -606,8 +606,8 @@ def f(): t.start() channels.send(cid, b'spam') - t.join() obj = channels.recv(cid) + t.join() self.assertEqual(obj, b'spam') @@ -634,8 +634,8 @@ def f(): t.start() channels.send(cid, b'spam') - t.join() obj = channels.recv(cid) + t.join() self.assertEqual(obj, b'eggs') @@ -656,10 +656,10 @@ def test_recv_default(self): default = object() cid = channels.create() obj1 = channels.recv(cid, default) - channels.send(cid, None) - channels.send(cid, 1) - channels.send(cid, b'spam') - channels.send(cid, b'eggs') + channels.send(cid, None, wait=False) + channels.send(cid, 1, wait=False) + channels.send(cid, b'spam', wait=False) + channels.send(cid, b'eggs', wait=False) obj2 = channels.recv(cid, default) obj3 = channels.recv(cid, default) obj4 = channels.recv(cid) @@ -679,7 +679,7 @@ def test_recv_sending_interp_destroyed(self): interp = interpreters.create() interpreters.run_string(interp, dedent(f""" import _xxinterpchannels as _channels - _channels.send({cid1}, b'spam') + _channels.send({cid1}, b'spam', wait=False) """)) interpreters.destroy(interp) @@ -692,9 +692,9 @@ def test_recv_sending_interp_destroyed(self): interp = interpreters.create() interpreters.run_string(interp, dedent(f""" import _xxinterpchannels as _channels - _channels.send({cid2}, b'spam') + _channels.send({cid2}, b'spam', wait=False) """)) - channels.send(cid2, b'eggs') + channels.send(cid2, b'eggs', wait=False) interpreters.destroy(interp) channels.recv(cid2) @@ -706,7 +706,7 @@ def test_recv_sending_interp_destroyed(self): def test_send_buffer(self): buf = bytearray(b'spamspamspam') cid = channels.create() - channels.send_buffer(cid, buf) + channels.send_buffer(cid, buf, wait=False) obj = channels.recv(cid) self.assertIsNot(obj, buf) @@ -728,7 +728,7 @@ def test_allowed_types(self): ] for obj in objects: with self.subTest(obj): - channels.send(cid, obj) + channels.send(cid, obj, wait=False) got = channels.recv(cid) self.assertEqual(got, obj) @@ -744,7 +744,7 @@ def test_run_string_arg_unresolved(self): out = _run_output(interp, dedent(""" import _xxinterpchannels as _channels print(cid.end) - _channels.send(cid, b'spam') + _channels.send(cid, b'spam', wait=False) """), dict(cid=cid.send)) obj = channels.recv(cid) @@ -764,7 +764,7 @@ def test_run_string_arg_resolved(self): out = _run_output(interp, dedent(""" import _xxinterpchannels as _channels print(chan.id.end) - _channels.send(chan.id, b'spam') + _channels.send(chan.id, b'spam', wait=False) """), dict(chan=cid.send)) obj = channels.recv(cid) @@ -776,7 +776,7 @@ def test_run_string_arg_resolved(self): def test_close_single_user(self): cid = channels.create() - channels.send(cid, b'spam') + channels.send(cid, b'spam', wait=False) channels.recv(cid) channels.close(cid) @@ -791,7 +791,7 @@ def test_close_multiple_users(self): id2 = interpreters.create() interpreters.run_string(id1, dedent(f""" import _xxinterpchannels as _channels - _channels.send({cid}, b'spam') + _channels.send({cid}, b'spam', wait=False) """)) interpreters.run_string(id2, dedent(f""" import _xxinterpchannels as _channels @@ -811,7 +811,7 @@ def test_close_multiple_users(self): def test_close_multiple_times(self): cid = channels.create() - channels.send(cid, b'spam') + channels.send(cid, b'spam', wait=False) channels.recv(cid) channels.close(cid) @@ -828,7 +828,7 @@ def test_close_empty(self): for send, recv in tests: with self.subTest((send, recv)): cid = channels.create() - channels.send(cid, b'spam') + channels.send(cid, b'spam', wait=False) channels.recv(cid) channels.close(cid, send=send, recv=recv) @@ -839,31 +839,31 @@ def test_close_empty(self): def test_close_defaults_with_unused_items(self): cid = channels.create() - channels.send(cid, b'spam') - channels.send(cid, b'ham') + channels.send(cid, b'spam', wait=False) + channels.send(cid, b'ham', wait=False) with self.assertRaises(channels.ChannelNotEmptyError): channels.close(cid) channels.recv(cid) - channels.send(cid, b'eggs') + channels.send(cid, b'eggs', wait=False) def test_close_recv_with_unused_items_unforced(self): cid = channels.create() - channels.send(cid, b'spam') - channels.send(cid, b'ham') + channels.send(cid, b'spam', wait=False) + channels.send(cid, b'ham', wait=False) with self.assertRaises(channels.ChannelNotEmptyError): channels.close(cid, recv=True) channels.recv(cid) - channels.send(cid, b'eggs') + channels.send(cid, b'eggs', wait=False) channels.recv(cid) channels.recv(cid) channels.close(cid, recv=True) def test_close_send_with_unused_items_unforced(self): cid = channels.create() - channels.send(cid, b'spam') - channels.send(cid, b'ham') + channels.send(cid, b'spam', wait=False) + channels.send(cid, b'ham', wait=False) channels.close(cid, send=True) with self.assertRaises(channels.ChannelClosedError): @@ -875,21 +875,21 @@ def test_close_send_with_unused_items_unforced(self): def test_close_both_with_unused_items_unforced(self): cid = channels.create() - channels.send(cid, b'spam') - channels.send(cid, b'ham') + channels.send(cid, b'spam', wait=False) + channels.send(cid, b'ham', wait=False) with self.assertRaises(channels.ChannelNotEmptyError): channels.close(cid, recv=True, send=True) channels.recv(cid) - channels.send(cid, b'eggs') + channels.send(cid, b'eggs', wait=False) channels.recv(cid) channels.recv(cid) channels.close(cid, recv=True) def test_close_recv_with_unused_items_forced(self): cid = channels.create() - channels.send(cid, b'spam') - channels.send(cid, b'ham') + channels.send(cid, b'spam', wait=False) + channels.send(cid, b'ham', wait=False) channels.close(cid, recv=True, force=True) with self.assertRaises(channels.ChannelClosedError): @@ -899,8 +899,8 @@ def test_close_recv_with_unused_items_forced(self): def test_close_send_with_unused_items_forced(self): cid = channels.create() - channels.send(cid, b'spam') - channels.send(cid, b'ham') + channels.send(cid, b'spam', wait=False) + channels.send(cid, b'ham', wait=False) channels.close(cid, send=True, force=True) with self.assertRaises(channels.ChannelClosedError): @@ -910,8 +910,8 @@ def test_close_send_with_unused_items_forced(self): def test_close_both_with_unused_items_forced(self): cid = channels.create() - channels.send(cid, b'spam') - channels.send(cid, b'ham') + channels.send(cid, b'spam', wait=False) + channels.send(cid, b'ham', wait=False) channels.close(cid, send=True, recv=True, force=True) with self.assertRaises(channels.ChannelClosedError): @@ -930,7 +930,7 @@ def test_close_never_used(self): def test_close_by_unassociated_interp(self): cid = channels.create() - channels.send(cid, b'spam') + channels.send(cid, b'spam', wait=False) interp = interpreters.create() interpreters.run_string(interp, dedent(f""" import _xxinterpchannels as _channels @@ -943,9 +943,9 @@ def test_close_by_unassociated_interp(self): def test_close_used_multiple_times_by_single_user(self): cid = channels.create() - channels.send(cid, b'spam') - channels.send(cid, b'spam') - channels.send(cid, b'spam') + channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', wait=False) channels.recv(cid) channels.close(cid, force=True) @@ -1017,7 +1017,7 @@ class ChannelReleaseTests(TestBase): def test_single_user(self): cid = channels.create() - channels.send(cid, b'spam') + channels.send(cid, b'spam', wait=False) channels.recv(cid) channels.release(cid, send=True, recv=True) @@ -1032,7 +1032,7 @@ def test_multiple_users(self): id2 = interpreters.create() interpreters.run_string(id1, dedent(f""" import _xxinterpchannels as _channels - _channels.send({cid}, b'spam') + _channels.send({cid}, b'spam', wait=False) """)) out = _run_output(id2, dedent(f""" import _xxinterpchannels as _channels @@ -1048,7 +1048,7 @@ def test_multiple_users(self): def test_no_kwargs(self): cid = channels.create() - channels.send(cid, b'spam') + channels.send(cid, b'spam', wait=False) channels.recv(cid) channels.release(cid) @@ -1059,7 +1059,7 @@ def test_no_kwargs(self): def test_multiple_times(self): cid = channels.create() - channels.send(cid, b'spam') + channels.send(cid, b'spam', wait=False) channels.recv(cid) channels.release(cid, send=True, recv=True) @@ -1068,8 +1068,8 @@ def test_multiple_times(self): def test_with_unused_items(self): cid = channels.create() - channels.send(cid, b'spam') - channels.send(cid, b'ham') + channels.send(cid, b'spam', wait=False) + channels.send(cid, b'ham', wait=False) channels.release(cid, send=True, recv=True) with self.assertRaises(channels.ChannelClosedError): @@ -1086,7 +1086,7 @@ def test_never_used(self): def test_by_unassociated_interp(self): cid = channels.create() - channels.send(cid, b'spam') + channels.send(cid, b'spam', wait=False) interp = interpreters.create() interpreters.run_string(interp, dedent(f""" import _xxinterpchannels as _channels @@ -1105,7 +1105,7 @@ def test_close_if_unassociated(self): interp = interpreters.create() interpreters.run_string(interp, dedent(f""" import _xxinterpchannels as _channels - obj = _channels.send({cid}, b'spam') + obj = _channels.send({cid}, b'spam', wait=False) _channels.release({cid}) """)) @@ -1115,9 +1115,9 @@ def test_close_if_unassociated(self): def test_partially(self): # XXX Is partial close too weird/confusing? cid = channels.create() - channels.send(cid, None) + channels.send(cid, None, wait=False) channels.recv(cid) - channels.send(cid, b'spam') + channels.send(cid, b'spam', wait=False) channels.release(cid, send=True) obj = channels.recv(cid) @@ -1125,9 +1125,9 @@ def test_partially(self): def test_used_multiple_times_by_single_user(self): cid = channels.create() - channels.send(cid, b'spam') - channels.send(cid, b'spam') - channels.send(cid, b'spam') + channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', wait=False) channels.recv(cid) channels.release(cid, send=True, recv=True) @@ -1212,7 +1212,7 @@ def _new_channel(self, creator): cid = _xxsubchannels.create() # We purposefully send back an int to avoid tying the # channel to the other interpreter. - _xxsubchannels.send({ch}, int(cid)) + _xxsubchannels.send({ch}, int(cid), wait=False) del _xxsubinterpreters """) self._cid = channels.recv(ch) @@ -1442,8 +1442,8 @@ def run_action(self, fix, action, *, hideclosed=True): {repr(fix.state)}, hideclosed={hideclosed}, ) - channels.send({_cid}, result.pending.to_bytes(1, 'little')) - channels.send({_cid}, b'X' if result.closed else b'') + channels.send({_cid}, result.pending.to_bytes(1, 'little'), wait=False) + channels.send({_cid}, b'X' if result.closed else b'', wait=False) """) result = ChannelState( pending=int.from_bytes(channels.recv(_cid), 'little'), @@ -1490,7 +1490,7 @@ def _assert_closed_in_interp(self, fix, interp=None): """) run_interp(interp.id, """ with helpers.expect_channel_closed(): - channels.send(cid, b'spam') + channels.send(cid, b'spam', wait=False) """) run_interp(interp.id, """ with helpers.expect_channel_closed(): From d3aefb8486dec63d0d8fe454500f394311f33e18 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 9 Oct 2023 07:02:44 -0600 Subject: [PATCH 4/7] Update the doc strings. --- Modules/_xxinterpchannelsmodule.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index c53a50f02cf0d4..bb9244f5c60e02 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -2556,9 +2556,10 @@ channel_send(PyObject *self, PyObject *args, PyObject *kwds) } PyDoc_STRVAR(channel_send_doc, -"channel_send(cid, obj)\n\ +"channel_send(cid, obj, wait=True)\n\ \n\ -Add the object's data to the channel's queue."); +Add the object's data to the channel's queue.\n\ +By default this waits for the object to be received."); static PyObject * channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) @@ -2616,9 +2617,10 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) } PyDoc_STRVAR(channel_send_buffer_doc, -"channel_send_buffer(cid, obj)\n\ +"channel_send_buffer(cid, obj, wait=True)\n\ \n\ -Add the object's buffer to the channel's queue."); +Add the object's buffer to the channel's queue.\n\ +By default this waits for the object to be received."); static PyObject * channel_recv(PyObject *self, PyObject *args, PyObject *kwds) From 5b6574e9fcb25d91d30dd91cecf361de0edf462a Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 9 Oct 2023 08:18:13 -0600 Subject: [PATCH 5/7] Wait when receiving from a thread. --- Lib/test/test__xxinterpchannels.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/Lib/test/test__xxinterpchannels.py b/Lib/test/test__xxinterpchannels.py index 32549ebf37898d..1f36fe3df0ae6e 100644 --- a/Lib/test/test__xxinterpchannels.py +++ b/Lib/test/test__xxinterpchannels.py @@ -21,6 +21,14 @@ ################################## # helpers +def recv_wait(cid): + while True: + try: + return channels.recv({cid}) + break + except channels.ChannelEmptyError: + time.sleep(0.1) + #@contextmanager #def run_threaded(id, source, **shared): # def run(): @@ -595,18 +603,13 @@ def test_send_recv_different_threads(self): cid = channels.create() def f(): - while True: - try: - obj = channels.recv(cid) - break - except channels.ChannelEmptyError: - time.sleep(0.1) + obj = recv_wait(cid) channels.send(cid, obj) t = threading.Thread(target=f) t.start() channels.send(cid, b'spam') - obj = channels.recv(cid) + obj = recv_wait(cid) t.join() self.assertEqual(obj, b'spam') @@ -634,7 +637,7 @@ def f(): t.start() channels.send(cid, b'spam') - obj = channels.recv(cid) + obj = recv_wait(cid) t.join() self.assertEqual(obj, b'eggs') From fbd79451ef745e2a0341c29989d265ef6595c0ea Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 10 Oct 2023 02:47:23 -0600 Subject: [PATCH 6/7] wait -> blocking --- Lib/test/support/interpreters.py | 8 +- Lib/test/test__xxinterpchannels.py | 132 ++++++++++++++--------------- Modules/_xxinterpchannelsmodule.c | 20 ++--- 3 files changed, 80 insertions(+), 80 deletions(-) diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters.py index 4ad9ee47692edd..9ba6862a9ee01a 100644 --- a/Lib/test/support/interpreters.py +++ b/Lib/test/support/interpreters.py @@ -208,7 +208,7 @@ def send(self, obj): This blocks until the object is received. """ - _channels.send(self._id, obj, wait=True) + _channels.send(self._id, obj, blocking=True) def send_nowait(self, obj): """Send the object to the channel's receiving end. @@ -219,14 +219,14 @@ def send_nowait(self, obj): # XXX Note that at the moment channel_send() only ever returns # None. This should be fixed when channel_send_wait() is added. # See bpo-32604 and gh-19829. - return _channels.send(self._id, obj, wait=False) + return _channels.send(self._id, obj, blocking=False) def send_buffer(self, obj): """Send the object's buffer to the channel's receiving end. This blocks until the object is received. """ - _channels.send_buffer(self._id, obj, wait=True) + _channels.send_buffer(self._id, obj, blocking=True) def send_buffer_nowait(self, obj): """Send the object's buffer to the channel's receiving end. @@ -234,7 +234,7 @@ def send_buffer_nowait(self, obj): If the object is immediately received then return True (else False). Otherwise this is the same as send(). """ - return _channels.send_buffer(self._id, obj, wait=False) + return _channels.send_buffer(self._id, obj, blocking=False) def close(self): _channels.close(self._id, send=True) diff --git a/Lib/test/test__xxinterpchannels.py b/Lib/test/test__xxinterpchannels.py index 1f36fe3df0ae6e..b356453d141e4e 100644 --- a/Lib/test/test__xxinterpchannels.py +++ b/Lib/test/test__xxinterpchannels.py @@ -197,7 +197,7 @@ def run_action(cid, action, end, state, *, hideclosed=True): def _run_action(cid, action, end, state): if action == 'use': if end == 'send': - channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', blocking=False) return state.incr() elif end == 'recv': if not state.pending: @@ -340,7 +340,7 @@ def test_shareable(self): chan = channels.create() obj = channels.create() - channels.send(chan, obj, wait=False) + channels.send(chan, obj, blocking=False) got = channels.recv(chan) self.assertEqual(got, obj) @@ -398,7 +398,7 @@ def test_channel_list_interpreters_basic(self): """Test basic listing channel interpreters.""" interp0 = interpreters.get_main() cid = channels.create() - channels.send(cid, "send", wait=False) + channels.send(cid, "send", blocking=False) # Test for a channel that has one end associated to an interpreter. send_interps = channels.list_interpreters(cid, send=True) recv_interps = channels.list_interpreters(cid, send=False) @@ -424,10 +424,10 @@ def test_channel_list_interpreters_multiple(self): interp3 = interpreters.create() cid = channels.create() - channels.send(cid, "send", wait=False) + channels.send(cid, "send", blocking=False) _run_output(interp1, dedent(f""" import _xxinterpchannels as _channels - _channels.send({cid}, "send", wait=False) + _channels.send({cid}, "send", blocking=False) """)) _run_output(interp2, dedent(f""" import _xxinterpchannels as _channels @@ -447,7 +447,7 @@ def test_channel_list_interpreters_destroyed(self): interp0 = interpreters.get_main() interp1 = interpreters.create() cid = channels.create() - channels.send(cid, "send", wait=False) + channels.send(cid, "send", blocking=False) _run_output(interp1, dedent(f""" import _xxinterpchannels as _channels obj = _channels.recv({cid}) @@ -473,12 +473,12 @@ def test_channel_list_interpreters_released(self): interp1 = interpreters.create() interp2 = interpreters.create() cid = channels.create() - channels.send(cid, "data", wait=False) + channels.send(cid, "data", blocking=False) _run_output(interp1, dedent(f""" import _xxinterpchannels as _channels obj = _channels.recv({cid}) """)) - channels.send(cid, "data", wait=False) + channels.send(cid, "data", blocking=False) _run_output(interp2, dedent(f""" import _xxinterpchannels as _channels obj = _channels.recv({cid}) @@ -514,7 +514,7 @@ def test_channel_list_interpreters_closed(self): interp1 = interpreters.create() cid = channels.create() # Put something in the channel so that it's not empty. - channels.send(cid, "send", wait=False) + channels.send(cid, "send", blocking=False) # Check initial state. send_interps = channels.list_interpreters(cid, send=True) @@ -536,7 +536,7 @@ def test_channel_list_interpreters_closed_send_end(self): interp1 = interpreters.create() cid = channels.create() # Put something in the channel so that it's not empty. - channels.send(cid, "send", wait=False) + channels.send(cid, "send", blocking=False) # Check initial state. send_interps = channels.list_interpreters(cid, send=True) @@ -570,7 +570,7 @@ def test_channel_list_interpreters_closed_send_end(self): def test_send_recv_main(self): cid = channels.create() orig = b'spam' - channels.send(cid, orig, wait=False) + channels.send(cid, orig, blocking=False) obj = channels.recv(cid) self.assertEqual(obj, orig) @@ -582,7 +582,7 @@ def test_send_recv_same_interpreter(self): import _xxinterpchannels as _channels cid = _channels.create() orig = b'spam' - _channels.send(cid, orig, wait=False) + _channels.send(cid, orig, blocking=False) obj = _channels.recv(cid) assert obj is not orig assert obj == orig @@ -593,7 +593,7 @@ def test_send_recv_different_interpreters(self): id1 = interpreters.create() out = _run_output(id1, dedent(f""" import _xxinterpchannels as _channels - _channels.send({cid}, b'spam', wait=False) + _channels.send({cid}, b'spam', blocking=False) """)) obj = channels.recv(cid) @@ -659,10 +659,10 @@ def test_recv_default(self): default = object() cid = channels.create() obj1 = channels.recv(cid, default) - channels.send(cid, None, wait=False) - channels.send(cid, 1, wait=False) - channels.send(cid, b'spam', wait=False) - channels.send(cid, b'eggs', wait=False) + channels.send(cid, None, blocking=False) + channels.send(cid, 1, blocking=False) + channels.send(cid, b'spam', blocking=False) + channels.send(cid, b'eggs', blocking=False) obj2 = channels.recv(cid, default) obj3 = channels.recv(cid, default) obj4 = channels.recv(cid) @@ -682,7 +682,7 @@ def test_recv_sending_interp_destroyed(self): interp = interpreters.create() interpreters.run_string(interp, dedent(f""" import _xxinterpchannels as _channels - _channels.send({cid1}, b'spam', wait=False) + _channels.send({cid1}, b'spam', blocking=False) """)) interpreters.destroy(interp) @@ -695,9 +695,9 @@ def test_recv_sending_interp_destroyed(self): interp = interpreters.create() interpreters.run_string(interp, dedent(f""" import _xxinterpchannels as _channels - _channels.send({cid2}, b'spam', wait=False) + _channels.send({cid2}, b'spam', blocking=False) """)) - channels.send(cid2, b'eggs', wait=False) + channels.send(cid2, b'eggs', blocking=False) interpreters.destroy(interp) channels.recv(cid2) @@ -709,7 +709,7 @@ def test_recv_sending_interp_destroyed(self): def test_send_buffer(self): buf = bytearray(b'spamspamspam') cid = channels.create() - channels.send_buffer(cid, buf, wait=False) + channels.send_buffer(cid, buf, blocking=False) obj = channels.recv(cid) self.assertIsNot(obj, buf) @@ -731,7 +731,7 @@ def test_allowed_types(self): ] for obj in objects: with self.subTest(obj): - channels.send(cid, obj, wait=False) + channels.send(cid, obj, blocking=False) got = channels.recv(cid) self.assertEqual(got, obj) @@ -747,7 +747,7 @@ def test_run_string_arg_unresolved(self): out = _run_output(interp, dedent(""" import _xxinterpchannels as _channels print(cid.end) - _channels.send(cid, b'spam', wait=False) + _channels.send(cid, b'spam', blocking=False) """), dict(cid=cid.send)) obj = channels.recv(cid) @@ -767,7 +767,7 @@ def test_run_string_arg_resolved(self): out = _run_output(interp, dedent(""" import _xxinterpchannels as _channels print(chan.id.end) - _channels.send(chan.id, b'spam', wait=False) + _channels.send(chan.id, b'spam', blocking=False) """), dict(chan=cid.send)) obj = channels.recv(cid) @@ -779,7 +779,7 @@ def test_run_string_arg_resolved(self): def test_close_single_user(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', blocking=False) channels.recv(cid) channels.close(cid) @@ -794,7 +794,7 @@ def test_close_multiple_users(self): id2 = interpreters.create() interpreters.run_string(id1, dedent(f""" import _xxinterpchannels as _channels - _channels.send({cid}, b'spam', wait=False) + _channels.send({cid}, b'spam', blocking=False) """)) interpreters.run_string(id2, dedent(f""" import _xxinterpchannels as _channels @@ -814,7 +814,7 @@ def test_close_multiple_users(self): def test_close_multiple_times(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', blocking=False) channels.recv(cid) channels.close(cid) @@ -831,7 +831,7 @@ def test_close_empty(self): for send, recv in tests: with self.subTest((send, recv)): cid = channels.create() - channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', blocking=False) channels.recv(cid) channels.close(cid, send=send, recv=recv) @@ -842,31 +842,31 @@ def test_close_empty(self): def test_close_defaults_with_unused_items(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) - channels.send(cid, b'ham', wait=False) + channels.send(cid, b'spam', blocking=False) + channels.send(cid, b'ham', blocking=False) with self.assertRaises(channels.ChannelNotEmptyError): channels.close(cid) channels.recv(cid) - channels.send(cid, b'eggs', wait=False) + channels.send(cid, b'eggs', blocking=False) def test_close_recv_with_unused_items_unforced(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) - channels.send(cid, b'ham', wait=False) + channels.send(cid, b'spam', blocking=False) + channels.send(cid, b'ham', blocking=False) with self.assertRaises(channels.ChannelNotEmptyError): channels.close(cid, recv=True) channels.recv(cid) - channels.send(cid, b'eggs', wait=False) + channels.send(cid, b'eggs', blocking=False) channels.recv(cid) channels.recv(cid) channels.close(cid, recv=True) def test_close_send_with_unused_items_unforced(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) - channels.send(cid, b'ham', wait=False) + channels.send(cid, b'spam', blocking=False) + channels.send(cid, b'ham', blocking=False) channels.close(cid, send=True) with self.assertRaises(channels.ChannelClosedError): @@ -878,21 +878,21 @@ def test_close_send_with_unused_items_unforced(self): def test_close_both_with_unused_items_unforced(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) - channels.send(cid, b'ham', wait=False) + channels.send(cid, b'spam', blocking=False) + channels.send(cid, b'ham', blocking=False) with self.assertRaises(channels.ChannelNotEmptyError): channels.close(cid, recv=True, send=True) channels.recv(cid) - channels.send(cid, b'eggs', wait=False) + channels.send(cid, b'eggs', blocking=False) channels.recv(cid) channels.recv(cid) channels.close(cid, recv=True) def test_close_recv_with_unused_items_forced(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) - channels.send(cid, b'ham', wait=False) + channels.send(cid, b'spam', blocking=False) + channels.send(cid, b'ham', blocking=False) channels.close(cid, recv=True, force=True) with self.assertRaises(channels.ChannelClosedError): @@ -902,8 +902,8 @@ def test_close_recv_with_unused_items_forced(self): def test_close_send_with_unused_items_forced(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) - channels.send(cid, b'ham', wait=False) + channels.send(cid, b'spam', blocking=False) + channels.send(cid, b'ham', blocking=False) channels.close(cid, send=True, force=True) with self.assertRaises(channels.ChannelClosedError): @@ -913,8 +913,8 @@ def test_close_send_with_unused_items_forced(self): def test_close_both_with_unused_items_forced(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) - channels.send(cid, b'ham', wait=False) + channels.send(cid, b'spam', blocking=False) + channels.send(cid, b'ham', blocking=False) channels.close(cid, send=True, recv=True, force=True) with self.assertRaises(channels.ChannelClosedError): @@ -933,7 +933,7 @@ def test_close_never_used(self): def test_close_by_unassociated_interp(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', blocking=False) interp = interpreters.create() interpreters.run_string(interp, dedent(f""" import _xxinterpchannels as _channels @@ -946,9 +946,9 @@ def test_close_by_unassociated_interp(self): def test_close_used_multiple_times_by_single_user(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) - channels.send(cid, b'spam', wait=False) - channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', blocking=False) + channels.send(cid, b'spam', blocking=False) + channels.send(cid, b'spam', blocking=False) channels.recv(cid) channels.close(cid, force=True) @@ -1020,7 +1020,7 @@ class ChannelReleaseTests(TestBase): def test_single_user(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', blocking=False) channels.recv(cid) channels.release(cid, send=True, recv=True) @@ -1035,7 +1035,7 @@ def test_multiple_users(self): id2 = interpreters.create() interpreters.run_string(id1, dedent(f""" import _xxinterpchannels as _channels - _channels.send({cid}, b'spam', wait=False) + _channels.send({cid}, b'spam', blocking=False) """)) out = _run_output(id2, dedent(f""" import _xxinterpchannels as _channels @@ -1051,7 +1051,7 @@ def test_multiple_users(self): def test_no_kwargs(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', blocking=False) channels.recv(cid) channels.release(cid) @@ -1062,7 +1062,7 @@ def test_no_kwargs(self): def test_multiple_times(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', blocking=False) channels.recv(cid) channels.release(cid, send=True, recv=True) @@ -1071,8 +1071,8 @@ def test_multiple_times(self): def test_with_unused_items(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) - channels.send(cid, b'ham', wait=False) + channels.send(cid, b'spam', blocking=False) + channels.send(cid, b'ham', blocking=False) channels.release(cid, send=True, recv=True) with self.assertRaises(channels.ChannelClosedError): @@ -1089,7 +1089,7 @@ def test_never_used(self): def test_by_unassociated_interp(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', blocking=False) interp = interpreters.create() interpreters.run_string(interp, dedent(f""" import _xxinterpchannels as _channels @@ -1108,7 +1108,7 @@ def test_close_if_unassociated(self): interp = interpreters.create() interpreters.run_string(interp, dedent(f""" import _xxinterpchannels as _channels - obj = _channels.send({cid}, b'spam', wait=False) + obj = _channels.send({cid}, b'spam', blocking=False) _channels.release({cid}) """)) @@ -1118,9 +1118,9 @@ def test_close_if_unassociated(self): def test_partially(self): # XXX Is partial close too weird/confusing? cid = channels.create() - channels.send(cid, None, wait=False) + channels.send(cid, None, blocking=False) channels.recv(cid) - channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', blocking=False) channels.release(cid, send=True) obj = channels.recv(cid) @@ -1128,9 +1128,9 @@ def test_partially(self): def test_used_multiple_times_by_single_user(self): cid = channels.create() - channels.send(cid, b'spam', wait=False) - channels.send(cid, b'spam', wait=False) - channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', blocking=False) + channels.send(cid, b'spam', blocking=False) + channels.send(cid, b'spam', blocking=False) channels.recv(cid) channels.release(cid, send=True, recv=True) @@ -1215,7 +1215,7 @@ def _new_channel(self, creator): cid = _xxsubchannels.create() # We purposefully send back an int to avoid tying the # channel to the other interpreter. - _xxsubchannels.send({ch}, int(cid), wait=False) + _xxsubchannels.send({ch}, int(cid), blocking=False) del _xxsubinterpreters """) self._cid = channels.recv(ch) @@ -1445,8 +1445,8 @@ def run_action(self, fix, action, *, hideclosed=True): {repr(fix.state)}, hideclosed={hideclosed}, ) - channels.send({_cid}, result.pending.to_bytes(1, 'little'), wait=False) - channels.send({_cid}, b'X' if result.closed else b'', wait=False) + channels.send({_cid}, result.pending.to_bytes(1, 'little'), blocking=False) + channels.send({_cid}, b'X' if result.closed else b'', blocking=False) """) result = ChannelState( pending=int.from_bytes(channels.recv(_cid), 'little'), @@ -1493,7 +1493,7 @@ def _assert_closed_in_interp(self, fix, interp=None): """) run_interp(interp.id, """ with helpers.expect_channel_closed(): - channels.send(cid, b'spam', wait=False) + channels.send(cid, b'spam', blocking=False) """) run_interp(interp.id, """ with helpers.expect_channel_closed(): diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index bb9244f5c60e02..bc8cd0e2cff4c1 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -2512,21 +2512,21 @@ static PyObject * channel_send(PyObject *self, PyObject *args, PyObject *kwds) { // XXX Add a timeout arg. - static char *kwlist[] = {"cid", "obj", "wait", NULL}; + static char *kwlist[] = {"cid", "obj", "blocking", NULL}; int64_t cid; struct channel_id_converter_data cid_data = { .module = self, }; PyObject *obj; - int wait = 1; + int blocking = 1; if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$p:channel_send", kwlist, channel_id_converter, &cid_data, &obj, - &wait)) { + &blocking)) { return NULL; } cid = cid_data.cid; - if (wait) { + if (blocking) { PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { PyErr_NoMemory(); @@ -2556,7 +2556,7 @@ channel_send(PyObject *self, PyObject *args, PyObject *kwds) } PyDoc_STRVAR(channel_send_doc, -"channel_send(cid, obj, wait=True)\n\ +"channel_send(cid, obj, blocking=True)\n\ \n\ Add the object's data to the channel's queue.\n\ By default this waits for the object to be received."); @@ -2564,17 +2564,17 @@ By default this waits for the object to be received."); static PyObject * channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"cid", "obj", "wait", NULL}; + static char *kwlist[] = {"cid", "obj", "blocking", NULL}; int64_t cid; struct channel_id_converter_data cid_data = { .module = self, }; PyObject *obj; - int wait = 1; + int blocking = 1; if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$p:channel_send_buffer", kwlist, channel_id_converter, &cid_data, &obj, - &wait)) { + &blocking)) { return NULL; } cid = cid_data.cid; @@ -2584,7 +2584,7 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) return NULL; } - if (wait) { + if (blocking) { PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { Py_DECREF(tempobj); @@ -2617,7 +2617,7 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) } PyDoc_STRVAR(channel_send_buffer_doc, -"channel_send_buffer(cid, obj, wait=True)\n\ +"channel_send_buffer(cid, obj, blocking=True)\n\ \n\ Add the object's buffer to the channel's queue.\n\ By default this waits for the object to be received."); From b85a681aef97b88fb1fc73ee8180b96be667d5c7 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 10 Oct 2023 02:48:06 -0600 Subject: [PATCH 7/7] Fix the tests. --- Lib/test/test__xxinterpchannels.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/test/test__xxinterpchannels.py b/Lib/test/test__xxinterpchannels.py index b356453d141e4e..ff01a339c0008e 100644 --- a/Lib/test/test__xxinterpchannels.py +++ b/Lib/test/test__xxinterpchannels.py @@ -24,8 +24,7 @@ def recv_wait(cid): while True: try: - return channels.recv({cid}) - break + return channels.recv(cid) except channels.ChannelEmptyError: time.sleep(0.1)