Skip to content

gh-84570: Send-Wait Fixes for _xxinterpchannels #111006

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 17, 2023
15 changes: 15 additions & 0 deletions Include/internal/pycore_pythread.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@ extern int _PyThread_at_fork_reinit(PyThread_type_lock *lock);
#endif /* HAVE_FORK */


// unset: -1 seconds, in nanoseconds
#define PyThread_UNSET_TIMEOUT ((_PyTime_t)(-1 * 1000 * 1000 * 1000))

/* Helper to acquire an interruptible lock with a timeout. If the lock acquire
* is interrupted, signal handlers are run, and if they raise an exception,
* PY_LOCK_INTR is returned. Otherwise, PY_LOCK_ACQUIRED or PY_LOCK_FAILURE
* are returned, depending on whether the lock can be acquired within the
* timeout.
*/
// Exported for the _xxinterpchannels module.
PyAPI_FUNC(PyLockStatus) PyThread_acquire_lock_timed_with_retries(
PyThread_type_lock,
PY_TIMEOUT_T microseconds);


#ifdef __cplusplus
}
#endif
Expand Down
217 changes: 173 additions & 44 deletions Lib/test/test__xxinterpchannels.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,62 @@ def test_channel_list_interpreters_closed_send_end(self):
with self.assertRaises(channels.ChannelClosedError):
channels.list_interpreters(cid, send=False)

####################
def test_allowed_types(self):
cid = channels.create()
objects = [
None,
'spam',
b'spam',
42,
]
for obj in objects:
with self.subTest(obj):
channels.send(cid, obj, blocking=False)
got = channels.recv(cid)

self.assertEqual(got, obj)
self.assertIs(type(got), type(obj))
# XXX Check the following?
#self.assertIsNot(got, obj)
# XXX What about between interpreters?

def test_run_string_arg_unresolved(self):
cid = channels.create()
interp = interpreters.create()

out = _run_output(interp, dedent("""
import _xxinterpchannels as _channels
print(cid.end)
_channels.send(cid, b'spam', blocking=False)
"""),
dict(cid=cid.send))
obj = channels.recv(cid)

self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')

# XXX For now there is no high-level channel into which the
# sent channel ID can be converted...
# Note: this test caused crashes on some buildbots (bpo-33615).
@unittest.skip('disabled until high-level channels exist')
def test_run_string_arg_resolved(self):
cid = channels.create()
cid = channels._channel_id(cid, _resolve=True)
interp = interpreters.create()

out = _run_output(interp, dedent("""
import _xxinterpchannels as _channels
print(chan.id.end)
_channels.send(chan.id, b'spam', blocking=False)
"""),
dict(chan=cid.send))
obj = channels.recv(cid)

self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')

#-------------------
# send/recv

def test_send_recv_main(self):
cid = channels.create()
Expand Down Expand Up @@ -705,6 +760,9 @@ def test_recv_sending_interp_destroyed(self):
channels.recv(cid2)
del cid2

#-------------------
# send_buffer

def test_send_buffer(self):
buf = bytearray(b'spamspamspam')
cid = channels.create()
Expand All @@ -720,60 +778,131 @@ def test_send_buffer(self):
obj[4:8] = b'ham.'
self.assertEqual(obj, buf)

def test_allowed_types(self):
#-------------------
# send with waiting

def build_send_waiter(self, obj, *, buffer=False):
# We want a long enough sleep that send() actually has to wait.

if buffer:
send = channels.send_buffer
else:
send = channels.send

cid = channels.create()
objects = [
None,
'spam',
b'spam',
42,
]
for obj in objects:
with self.subTest(obj):
channels.send(cid, obj, blocking=False)
got = channels.recv(cid)
try:
started = time.monotonic()
send(cid, obj, blocking=False)
stopped = time.monotonic()
channels.recv(cid)
finally:
channels.destroy(cid)
delay = stopped - started # seconds
delay *= 3

self.assertEqual(got, obj)
self.assertIs(type(got), type(obj))
# XXX Check the following?
#self.assertIsNot(got, obj)
# XXX What about between interpreters?
def wait():
time.sleep(delay)
return wait

def test_run_string_arg_unresolved(self):
def test_send_blocking_waiting(self):
received = None
obj = b'spam'
wait = self.build_send_waiter(obj)
cid = channels.create()
interp = interpreters.create()
def f():
nonlocal received
wait()
received = recv_wait(cid)
t = threading.Thread(target=f)
t.start()
channels.send(cid, obj, blocking=True)
t.join()

out = _run_output(interp, dedent("""
import _xxinterpchannels as _channels
print(cid.end)
_channels.send(cid, b'spam', blocking=False)
"""),
dict(cid=cid.send))
obj = channels.recv(cid)
self.assertEqual(received, obj)

self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')
def test_send_buffer_blocking_waiting(self):
received = None
obj = bytearray(b'spam')
wait = self.build_send_waiter(obj, buffer=True)
cid = channels.create()
def f():
nonlocal received
wait()
received = recv_wait(cid)
t = threading.Thread(target=f)
t.start()
channels.send_buffer(cid, obj, blocking=True)
t.join()

# XXX For now there is no high-level channel into which the
# sent channel ID can be converted...
# Note: this test caused crashes on some buildbots (bpo-33615).
@unittest.skip('disabled until high-level channels exist')
def test_run_string_arg_resolved(self):
self.assertEqual(received, obj)

def test_send_blocking_no_wait(self):
received = None
obj = b'spam'
cid = channels.create()
cid = channels._channel_id(cid, _resolve=True)
interp = interpreters.create()
def f():
nonlocal received
received = recv_wait(cid)
t = threading.Thread(target=f)
t.start()
channels.send(cid, obj, blocking=True)
t.join()

out = _run_output(interp, dedent("""
import _xxinterpchannels as _channels
print(chan.id.end)
_channels.send(chan.id, b'spam', blocking=False)
"""),
dict(chan=cid.send))
obj = channels.recv(cid)
self.assertEqual(received, obj)

self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')
def test_send_buffer_blocking_no_wait(self):
received = None
obj = bytearray(b'spam')
cid = channels.create()
def f():
nonlocal received
received = recv_wait(cid)
t = threading.Thread(target=f)
t.start()
channels.send_buffer(cid, obj, blocking=True)
t.join()

self.assertEqual(received, obj)

def test_send_closed_while_waiting(self):
obj = b'spam'
wait = self.build_send_waiter(obj)
cid = channels.create()
def f():
wait()
channels.close(cid, force=True)
t = threading.Thread(target=f)
t.start()
with self.assertRaises(channels.ChannelClosedError):
channels.send(cid, obj, blocking=True)
t.join()

def test_send_buffer_closed_while_waiting(self):
try:
self._has_run_once
except AttributeError:
# At the moment, this test leaks a few references.
# It looks like the leak originates with the addition
# of _channels.send_buffer() (gh-110246), whereas the
# tests were added afterward. We want this test even
# if the refleak isn't fixed yet, so we skip here.
raise unittest.SkipTest('temporarily skipped due to refleaks')
else:
self._has_run_once = True

obj = bytearray(b'spam')
wait = self.build_send_waiter(obj, buffer=True)
cid = channels.create()
def f():
wait()
channels.close(cid, force=True)
t = threading.Thread(target=f)
t.start()
with self.assertRaises(channels.ChannelClosedError):
channels.send_buffer(cid, obj, blocking=True)
t.join()

#-------------------
# close

def test_close_single_user(self):
Expand Down
52 changes: 2 additions & 50 deletions Modules/_threadmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
/* Interface to Sjoerd's portable C thread library */

#include "Python.h"
#include "pycore_ceval.h" // _PyEval_MakePendingCalls()
#include "pycore_dict.h" // _PyDict_Pop()
#include "pycore_interp.h" // _PyInterpreterState.threads.count
#include "pycore_moduleobject.h" // _PyModule_GetState()
Expand Down Expand Up @@ -76,57 +75,10 @@ lock_dealloc(lockobject *self)
Py_DECREF(tp);
}

/* Helper to acquire an interruptible lock with a timeout. If the lock acquire
* is interrupted, signal handlers are run, and if they raise an exception,
* PY_LOCK_INTR is returned. Otherwise, PY_LOCK_ACQUIRED or PY_LOCK_FAILURE
* are returned, depending on whether the lock can be acquired within the
* timeout.
*/
static PyLockStatus
static inline PyLockStatus
acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
{
PyThreadState *tstate = _PyThreadState_GET();
_PyTime_t endtime = 0;
if (timeout > 0) {
endtime = _PyDeadline_Init(timeout);
}

PyLockStatus r;
do {
_PyTime_t microseconds;
microseconds = _PyTime_AsMicroseconds(timeout, _PyTime_ROUND_CEILING);

/* first a simple non-blocking try without releasing the GIL */
r = PyThread_acquire_lock_timed(lock, 0, 0);
if (r == PY_LOCK_FAILURE && microseconds != 0) {
Py_BEGIN_ALLOW_THREADS
r = PyThread_acquire_lock_timed(lock, microseconds, 1);
Py_END_ALLOW_THREADS
}

if (r == PY_LOCK_INTR) {
/* Run signal handlers if we were interrupted. Propagate
* exceptions from signal handlers, such as KeyboardInterrupt, by
* passing up PY_LOCK_INTR. */
if (_PyEval_MakePendingCalls(tstate) < 0) {
return PY_LOCK_INTR;
}

/* If we're using a timeout, recompute the timeout after processing
* signals, since those can take time. */
if (timeout > 0) {
timeout = _PyDeadline_Get(endtime);

/* Check for negative values, since those mean block forever.
*/
if (timeout < 0) {
r = PY_LOCK_FAILURE;
}
}
}
} while (r == PY_LOCK_INTR); /* Retry if we were interrupted. */

return r;
return PyThread_acquire_lock_timed_with_retries(lock, timeout);
}

static int
Expand Down
Loading