Skip to content

Commit c58c63f

Browse files
gh-84570: Add Timeouts to SendChannel.send() and RecvChannel.recv() (gh-110567)
1 parent 7029c1a commit c58c63f

File tree

8 files changed

+202
-47
lines changed

8 files changed

+202
-47
lines changed

Include/internal/pycore_pythread.h

+6
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ extern int _PyThread_at_fork_reinit(PyThread_type_lock *lock);
8989
// unset: -1 seconds, in nanoseconds
9090
#define PyThread_UNSET_TIMEOUT ((_PyTime_t)(-1 * 1000 * 1000 * 1000))
9191

92+
// Exported for the _xxinterpchannels module.
93+
PyAPI_FUNC(int) PyThread_ParseTimeoutArg(
94+
PyObject *arg,
95+
int blocking,
96+
PY_TIMEOUT_T *timeout);
97+
9298
/* Helper to acquire an interruptible lock with a timeout. If the lock acquire
9399
* is interrupted, signal handlers are run, and if they raise an exception,
94100
* PY_LOCK_INTR is returned. Otherwise, PY_LOCK_ACQUIRED or PY_LOCK_FAILURE

Lib/test/support/interpreters.py

+15-5
Original file line numberDiff line numberDiff line change
@@ -170,15 +170,25 @@ class RecvChannel(_ChannelEnd):
170170

171171
_end = 'recv'
172172

173-
def recv(self, *, _sentinel=object(), _delay=10 / 1000): # 10 milliseconds
173+
def recv(self, timeout=None, *,
174+
_sentinel=object(),
175+
_delay=10 / 1000, # 10 milliseconds
176+
):
174177
"""Return the next object from the channel.
175178
176179
This blocks until an object has been sent, if none have been
177180
sent already.
178181
"""
182+
if timeout is not None:
183+
timeout = int(timeout)
184+
if timeout < 0:
185+
raise ValueError(f'timeout value must be non-negative')
186+
end = time.time() + timeout
179187
obj = _channels.recv(self._id, _sentinel)
180188
while obj is _sentinel:
181189
time.sleep(_delay)
190+
if timeout is not None and time.time() >= end:
191+
raise TimeoutError
182192
obj = _channels.recv(self._id, _sentinel)
183193
return obj
184194

@@ -203,12 +213,12 @@ class SendChannel(_ChannelEnd):
203213

204214
_end = 'send'
205215

206-
def send(self, obj):
216+
def send(self, obj, timeout=None):
207217
"""Send the object (i.e. its data) to the channel's receiving end.
208218
209219
This blocks until the object is received.
210220
"""
211-
_channels.send(self._id, obj, blocking=True)
221+
_channels.send(self._id, obj, timeout=timeout, blocking=True)
212222

213223
def send_nowait(self, obj):
214224
"""Send the object to the channel's receiving end.
@@ -221,12 +231,12 @@ def send_nowait(self, obj):
221231
# See bpo-32604 and gh-19829.
222232
return _channels.send(self._id, obj, blocking=False)
223233

224-
def send_buffer(self, obj):
234+
def send_buffer(self, obj, timeout=None):
225235
"""Send the object's buffer to the channel's receiving end.
226236
227237
This blocks until the object is received.
228238
"""
229-
_channels.send_buffer(self._id, obj, blocking=True)
239+
_channels.send_buffer(self._id, obj, timeout=timeout, blocking=True)
230240

231241
def send_buffer_nowait(self, obj):
232242
"""Send the object's buffer to the channel's receiving end.

Lib/test/test__xxinterpchannels.py

+108-20
Original file line numberDiff line numberDiff line change
@@ -864,22 +864,97 @@ def f():
864864

865865
self.assertEqual(received, obj)
866866

867+
def test_send_timeout(self):
868+
obj = b'spam'
869+
870+
with self.subTest('non-blocking with timeout'):
871+
cid = channels.create()
872+
with self.assertRaises(ValueError):
873+
channels.send(cid, obj, blocking=False, timeout=0.1)
874+
875+
with self.subTest('timeout hit'):
876+
cid = channels.create()
877+
with self.assertRaises(TimeoutError):
878+
channels.send(cid, obj, blocking=True, timeout=0.1)
879+
with self.assertRaises(channels.ChannelEmptyError):
880+
received = channels.recv(cid)
881+
print(repr(received))
882+
883+
with self.subTest('timeout not hit'):
884+
cid = channels.create()
885+
def f():
886+
recv_wait(cid)
887+
t = threading.Thread(target=f)
888+
t.start()
889+
channels.send(cid, obj, blocking=True, timeout=10)
890+
t.join()
891+
892+
def test_send_buffer_timeout(self):
893+
try:
894+
self._has_run_once_timeout
895+
except AttributeError:
896+
# At the moment, this test leaks a few references.
897+
# It looks like the leak originates with the addition
898+
# of _channels.send_buffer() (gh-110246), whereas the
899+
# tests were added afterward. We want this test even
900+
# if the refleak isn't fixed yet, so we skip here.
901+
raise unittest.SkipTest('temporarily skipped due to refleaks')
902+
else:
903+
self._has_run_once_timeout = True
904+
905+
obj = bytearray(b'spam')
906+
907+
with self.subTest('non-blocking with timeout'):
908+
cid = channels.create()
909+
with self.assertRaises(ValueError):
910+
channels.send_buffer(cid, obj, blocking=False, timeout=0.1)
911+
912+
with self.subTest('timeout hit'):
913+
cid = channels.create()
914+
with self.assertRaises(TimeoutError):
915+
channels.send_buffer(cid, obj, blocking=True, timeout=0.1)
916+
with self.assertRaises(channels.ChannelEmptyError):
917+
received = channels.recv(cid)
918+
print(repr(received))
919+
920+
with self.subTest('timeout not hit'):
921+
cid = channels.create()
922+
def f():
923+
recv_wait(cid)
924+
t = threading.Thread(target=f)
925+
t.start()
926+
channels.send_buffer(cid, obj, blocking=True, timeout=10)
927+
t.join()
928+
867929
def test_send_closed_while_waiting(self):
868930
obj = b'spam'
869931
wait = self.build_send_waiter(obj)
870-
cid = channels.create()
871-
def f():
872-
wait()
873-
channels.close(cid, force=True)
874-
t = threading.Thread(target=f)
875-
t.start()
876-
with self.assertRaises(channels.ChannelClosedError):
877-
channels.send(cid, obj, blocking=True)
878-
t.join()
932+
933+
with self.subTest('without timeout'):
934+
cid = channels.create()
935+
def f():
936+
wait()
937+
channels.close(cid, force=True)
938+
t = threading.Thread(target=f)
939+
t.start()
940+
with self.assertRaises(channels.ChannelClosedError):
941+
channels.send(cid, obj, blocking=True)
942+
t.join()
943+
944+
with self.subTest('with timeout'):
945+
cid = channels.create()
946+
def f():
947+
wait()
948+
channels.close(cid, force=True)
949+
t = threading.Thread(target=f)
950+
t.start()
951+
with self.assertRaises(channels.ChannelClosedError):
952+
channels.send(cid, obj, blocking=True, timeout=30)
953+
t.join()
879954

880955
def test_send_buffer_closed_while_waiting(self):
881956
try:
882-
self._has_run_once
957+
self._has_run_once_closed
883958
except AttributeError:
884959
# At the moment, this test leaks a few references.
885960
# It looks like the leak originates with the addition
@@ -888,19 +963,32 @@ def test_send_buffer_closed_while_waiting(self):
888963
# if the refleak isn't fixed yet, so we skip here.
889964
raise unittest.SkipTest('temporarily skipped due to refleaks')
890965
else:
891-
self._has_run_once = True
966+
self._has_run_once_closed = True
892967

893968
obj = bytearray(b'spam')
894969
wait = self.build_send_waiter(obj, buffer=True)
895-
cid = channels.create()
896-
def f():
897-
wait()
898-
channels.close(cid, force=True)
899-
t = threading.Thread(target=f)
900-
t.start()
901-
with self.assertRaises(channels.ChannelClosedError):
902-
channels.send_buffer(cid, obj, blocking=True)
903-
t.join()
970+
971+
with self.subTest('without timeout'):
972+
cid = channels.create()
973+
def f():
974+
wait()
975+
channels.close(cid, force=True)
976+
t = threading.Thread(target=f)
977+
t.start()
978+
with self.assertRaises(channels.ChannelClosedError):
979+
channels.send_buffer(cid, obj, blocking=True)
980+
t.join()
981+
982+
with self.subTest('with timeout'):
983+
cid = channels.create()
984+
def f():
985+
wait()
986+
channels.close(cid, force=True)
987+
t = threading.Thread(target=f)
988+
t.start()
989+
with self.assertRaises(channels.ChannelClosedError):
990+
channels.send_buffer(cid, obj, blocking=True, timeout=30)
991+
t.join()
904992

905993
#-------------------
906994
# close

Lib/test/test_interpreters.py

+5
Original file line numberDiff line numberDiff line change
@@ -1022,6 +1022,11 @@ def test_send_recv_nowait_different_interpreters(self):
10221022
self.assertEqual(obj2, b'eggs')
10231023
self.assertNotEqual(id(obj2), int(out))
10241024

1025+
def test_recv_timeout(self):
1026+
r, _ = interpreters.create_channel()
1027+
with self.assertRaises(TimeoutError):
1028+
r.recv(timeout=1)
1029+
10251030
def test_recv_channel_does_not_exist(self):
10261031
ch = interpreters.RecvChannel(1_000_000)
10271032
with self.assertRaises(interpreters.ChannelNotFoundError):

Modules/_queuemodule.c

+2
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
214214
PY_TIMEOUT_T microseconds;
215215
PyThreadState *tstate = PyThreadState_Get();
216216

217+
// XXX Use PyThread_ParseTimeoutArg().
218+
217219
if (block == 0) {
218220
/* Non-blocking */
219221
microseconds = 0;

Modules/_threadmodule.c

+6-5
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,15 @@ lock_acquire_parse_args(PyObject *args, PyObject *kwds,
8888
char *kwlist[] = {"blocking", "timeout", NULL};
8989
int blocking = 1;
9090
PyObject *timeout_obj = NULL;
91-
const _PyTime_t unset_timeout = _PyTime_FromSeconds(-1);
92-
93-
*timeout = unset_timeout ;
94-
9591
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|pO:acquire", kwlist,
9692
&blocking, &timeout_obj))
9793
return -1;
9894

95+
// XXX Use PyThread_ParseTimeoutArg().
96+
97+
const _PyTime_t unset_timeout = _PyTime_FromSeconds(-1);
98+
*timeout = unset_timeout;
99+
99100
if (timeout_obj
100101
&& _PyTime_FromSecondsObject(timeout,
101102
timeout_obj, _PyTime_ROUND_TIMEOUT) < 0)
@@ -108,7 +109,7 @@ lock_acquire_parse_args(PyObject *args, PyObject *kwds,
108109
}
109110
if (*timeout < 0 && *timeout != unset_timeout) {
110111
PyErr_SetString(PyExc_ValueError,
111-
"timeout value must be positive");
112+
"timeout value must be a non-negative number");
112113
return -1;
113114
}
114115
if (!blocking)

Modules/_xxinterpchannelsmodule.c

+26-17
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,8 @@ add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared,
242242
}
243243

244244
static int
245-
wait_for_lock(PyThread_type_lock mutex)
245+
wait_for_lock(PyThread_type_lock mutex, PY_TIMEOUT_T timeout)
246246
{
247-
PY_TIMEOUT_T timeout = PyThread_UNSET_TIMEOUT;
248247
PyLockStatus res = PyThread_acquire_lock_timed_with_retries(mutex, timeout);
249248
if (res == PY_LOCK_INTR) {
250249
/* KeyboardInterrupt, etc. */
@@ -1883,7 +1882,8 @@ _channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting)
18831882
}
18841883

18851884
static int
1886-
_channel_send_wait(_channels *channels, int64_t cid, PyObject *obj)
1885+
_channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
1886+
PY_TIMEOUT_T timeout)
18871887
{
18881888
// We use a stack variable here, so we must ensure that &waiting
18891889
// is not held by any channel item at the point this function exits.
@@ -1901,7 +1901,7 @@ _channel_send_wait(_channels *channels, int64_t cid, PyObject *obj)
19011901
}
19021902

19031903
/* Wait until the object is received. */
1904-
if (wait_for_lock(waiting.mutex) < 0) {
1904+
if (wait_for_lock(waiting.mutex, timeout) < 0) {
19051905
assert(PyErr_Occurred());
19061906
_waiting_finish_releasing(&waiting);
19071907
/* The send() call is failing now, so make sure the item
@@ -2816,25 +2816,29 @@ receive end.");
28162816
static PyObject *
28172817
channel_send(PyObject *self, PyObject *args, PyObject *kwds)
28182818
{
2819-
// XXX Add a timeout arg.
2820-
static char *kwlist[] = {"cid", "obj", "blocking", NULL};
2821-
int64_t cid;
2819+
static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL};
28222820
struct channel_id_converter_data cid_data = {
28232821
.module = self,
28242822
};
28252823
PyObject *obj;
28262824
int blocking = 1;
2827-
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$p:channel_send", kwlist,
2825+
PyObject *timeout_obj = NULL;
2826+
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$pO:channel_send", kwlist,
28282827
channel_id_converter, &cid_data, &obj,
2829-
&blocking)) {
2828+
&blocking, &timeout_obj)) {
2829+
return NULL;
2830+
}
2831+
2832+
int64_t cid = cid_data.cid;
2833+
PY_TIMEOUT_T timeout;
2834+
if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) {
28302835
return NULL;
28312836
}
2832-
cid = cid_data.cid;
28332837

28342838
/* Queue up the object. */
28352839
int err = 0;
28362840
if (blocking) {
2837-
err = _channel_send_wait(&_globals.channels, cid, obj);
2841+
err = _channel_send_wait(&_globals.channels, cid, obj, timeout);
28382842
}
28392843
else {
28402844
err = _channel_send(&_globals.channels, cid, obj, NULL);
@@ -2855,20 +2859,25 @@ By default this waits for the object to be received.");
28552859
static PyObject *
28562860
channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
28572861
{
2858-
static char *kwlist[] = {"cid", "obj", "blocking", NULL};
2859-
int64_t cid;
2862+
static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL};
28602863
struct channel_id_converter_data cid_data = {
28612864
.module = self,
28622865
};
28632866
PyObject *obj;
28642867
int blocking = 1;
2868+
PyObject *timeout_obj = NULL;
28652869
if (!PyArg_ParseTupleAndKeywords(args, kwds,
2866-
"O&O|$p:channel_send_buffer", kwlist,
2870+
"O&O|$pO:channel_send_buffer", kwlist,
28672871
channel_id_converter, &cid_data, &obj,
2868-
&blocking)) {
2872+
&blocking, &timeout_obj)) {
2873+
return NULL;
2874+
}
2875+
2876+
int64_t cid = cid_data.cid;
2877+
PY_TIMEOUT_T timeout;
2878+
if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) {
28692879
return NULL;
28702880
}
2871-
cid = cid_data.cid;
28722881

28732882
PyObject *tempobj = PyMemoryView_FromObject(obj);
28742883
if (tempobj == NULL) {
@@ -2878,7 +2887,7 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
28782887
/* Queue up the object. */
28792888
int err = 0;
28802889
if (blocking) {
2881-
err = _channel_send_wait(&_globals.channels, cid, tempobj);
2890+
err = _channel_send_wait(&_globals.channels, cid, tempobj, timeout);
28822891
}
28832892
else {
28842893
err = _channel_send(&_globals.channels, cid, tempobj, NULL);

0 commit comments

Comments
 (0)