Skip to content

Commit 33da0e8

Browse files
mpageblurb-it[bot]pitrou
authored
gh-114271: Fix race in Thread.join() (#114839)
There is a race between when `Thread._tstate_lock` is released[^1] in `Thread._wait_for_tstate_lock()` and when `Thread._stop()` asserts[^2] that it is unlocked. Consider the following execution involving threads A, B, and C: 1. A starts. 2. B joins A, blocking on its `_tstate_lock`. 3. C joins A, blocking on its `_tstate_lock`. 4. A finishes and releases its `_tstate_lock`. 5. B acquires A's `_tstate_lock` in `_wait_for_tstate_lock()`, releases it, but is swapped out before calling `_stop()`. 6. C is scheduled, acquires A's `_tstate_lock` in `_wait_for_tstate_lock()` but is swapped out before releasing it. 7. B is scheduled, calls `_stop()`, which asserts that A's `_tstate_lock` is not held. However, C holds it, so the assertion fails. The race can be reproduced[^3] by inserting sleeps at the appropriate points in the threading code. To do so, run the `repro_join_race.py` from the linked repo. There are two main parts to this PR: 1. `_tstate_lock` is replaced with an event that is attached to `PyThreadState`. The event is set by the runtime prior to the thread being cleared (in the same place that `_tstate_lock` was released). `Thread.join()` blocks waiting for the event to be set. 2. `_PyInterpreterState_WaitForThreads()` provides the ability to wait for all non-daemon threads to exit. To do so, an `is_daemon` predicate was added to `PyThreadState`. This field is set each time a thread is created. `threading._shutdown()` now calls into `_PyInterpreterState_WaitForThreads()` instead of waiting on `_tstate_lock`s. [^1]: https://github.com/python/cpython/blob/441affc9e7f419ef0b68f734505fa2f79fe653c7/Lib/threading.py#L1201 [^2]: https://github.com/python/cpython/blob/441affc9e7f419ef0b68f734505fa2f79fe653c7/Lib/threading.py#L1115 [^3]: mpage@8194653 --------- Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com> Co-authored-by: Antoine Pitrou <[email protected]>
1 parent 86bc40d commit 33da0e8

File tree

12 files changed

+767
-639
lines changed

12 files changed

+767
-639
lines changed

Include/cpython/pystate.h

-26
Original file line numberDiff line numberDiff line change
@@ -161,32 +161,6 @@ struct _ts {
161161
*/
162162
uintptr_t critical_section;
163163

164-
/* Called when a thread state is deleted normally, but not when it
165-
* is destroyed after fork().
166-
* Pain: to prevent rare but fatal shutdown errors (issue 18808),
167-
* Thread.join() must wait for the join'ed thread's tstate to be unlinked
168-
* from the tstate chain. That happens at the end of a thread's life,
169-
* in pystate.c.
170-
* The obvious way doesn't quite work: create a lock which the tstate
171-
* unlinking code releases, and have Thread.join() wait to acquire that
172-
* lock. The problem is that we _are_ at the end of the thread's life:
173-
* if the thread holds the last reference to the lock, decref'ing the
174-
* lock will delete the lock, and that may trigger arbitrary Python code
175-
* if there's a weakref, with a callback, to the lock. But by this time
176-
* _PyRuntime.gilstate.tstate_current is already NULL, so only the simplest
177-
* of C code can be allowed to run (in particular it must not be possible to
178-
* release the GIL).
179-
* So instead of holding the lock directly, the tstate holds a weakref to
180-
* the lock: that's the value of on_delete_data below. Decref'ing a
181-
* weakref is harmless.
182-
* on_delete points to _threadmodule.c's static release_sentinel() function.
183-
* After the tstate is unlinked, release_sentinel is called with the
184-
* weakref-to-lock (on_delete_data) argument, and release_sentinel releases
185-
* the indirectly held lock.
186-
*/
187-
void (*on_delete)(void *);
188-
void *on_delete_data;
189-
190164
int coroutine_origin_tracking_depth;
191165

192166
PyObject *async_gen_firstiter;

Include/internal/pycore_lock.h

-10
Original file line numberDiff line numberDiff line change
@@ -153,16 +153,6 @@ PyAPI_FUNC(void) PyEvent_Wait(PyEvent *evt);
153153
// and 0 if the timeout expired or thread was interrupted.
154154
PyAPI_FUNC(int) PyEvent_WaitTimed(PyEvent *evt, PyTime_t timeout_ns);
155155

156-
// A one-time event notification with reference counting.
157-
typedef struct _PyEventRc {
158-
PyEvent event;
159-
Py_ssize_t refcount;
160-
} _PyEventRc;
161-
162-
_PyEventRc *_PyEventRc_New(void);
163-
void _PyEventRc_Incref(_PyEventRc *erc);
164-
void _PyEventRc_Decref(_PyEventRc *erc);
165-
166156
// _PyRawMutex implements a word-sized mutex that that does not depend on the
167157
// parking lot API, and therefore can be used in the parking lot
168158
// implementation.

Include/internal/pycore_pythread.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ struct _pythread_runtime_state {
7878
} stubs;
7979
#endif
8080

81-
// Linked list of ThreadHandleObjects
81+
// Linked list of ThreadHandles
8282
struct llist_node handles;
8383
};
8484

Lib/test/test_audit.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ def test_threading(self):
209209
expected = [
210210
("_thread.start_new_thread", "(<test_func>, (), None)"),
211211
("test.test_func", "()"),
212-
("_thread.start_joinable_thread", "(<test_func>,)"),
212+
("_thread.start_joinable_thread", "(<test_func>, 1, None)"),
213213
("test.test_func", "()"),
214214
]
215215

Lib/test/test_concurrent_futures/test_process_pool.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,13 @@ def test_python_finalization_error(self):
201201
# QueueFeederThread.
202202
orig_start_new_thread = threading._start_joinable_thread
203203
nthread = 0
204-
def mock_start_new_thread(func, *args):
204+
def mock_start_new_thread(func, *args, **kwargs):
205205
nonlocal nthread
206206
if nthread >= 1:
207207
raise RuntimeError("can't create new thread at "
208208
"interpreter shutdown")
209209
nthread += 1
210-
return orig_start_new_thread(func, *args)
210+
return orig_start_new_thread(func, *args, **kwargs)
211211

212212
with support.swap_attr(threading, '_start_joinable_thread',
213213
mock_start_new_thread):

Lib/test/test_thread.py

+48
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,54 @@ def joiner():
289289
with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"):
290290
raise error
291291

292+
def test_join_with_timeout(self):
293+
lock = thread.allocate_lock()
294+
lock.acquire()
295+
296+
def thr():
297+
lock.acquire()
298+
299+
with threading_helper.wait_threads_exit():
300+
handle = thread.start_joinable_thread(thr)
301+
handle.join(0.1)
302+
self.assertFalse(handle.is_done())
303+
lock.release()
304+
handle.join()
305+
self.assertTrue(handle.is_done())
306+
307+
def test_join_unstarted(self):
308+
handle = thread._ThreadHandle()
309+
with self.assertRaisesRegex(RuntimeError, "thread not started"):
310+
handle.join()
311+
312+
def test_set_done_unstarted(self):
313+
handle = thread._ThreadHandle()
314+
with self.assertRaisesRegex(RuntimeError, "thread not started"):
315+
handle._set_done()
316+
317+
def test_start_duplicate_handle(self):
318+
lock = thread.allocate_lock()
319+
lock.acquire()
320+
321+
def func():
322+
lock.acquire()
323+
324+
handle = thread._ThreadHandle()
325+
with threading_helper.wait_threads_exit():
326+
thread.start_joinable_thread(func, handle=handle)
327+
with self.assertRaisesRegex(RuntimeError, "thread already started"):
328+
thread.start_joinable_thread(func, handle=handle)
329+
lock.release()
330+
handle.join()
331+
332+
def test_start_with_none_handle(self):
333+
def func():
334+
pass
335+
336+
with threading_helper.wait_threads_exit():
337+
handle = thread.start_joinable_thread(func, handle=None)
338+
handle.join()
339+
292340

293341
class Barrier:
294342
def __init__(self, num_threads):

Lib/test/test_threading.py

+1-60
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ def run(self):
408408

409409
def test_limbo_cleanup(self):
410410
# Issue 7481: Failure to start thread should cleanup the limbo map.
411-
def fail_new_thread(*args):
411+
def fail_new_thread(*args, **kwargs):
412412
raise threading.ThreadError()
413413
_start_joinable_thread = threading._start_joinable_thread
414414
threading._start_joinable_thread = fail_new_thread
@@ -912,41 +912,6 @@ def f():
912912
rc, out, err = assert_python_ok("-c", code)
913913
self.assertEqual(err, b"")
914914

915-
def test_tstate_lock(self):
916-
# Test an implementation detail of Thread objects.
917-
started = _thread.allocate_lock()
918-
finish = _thread.allocate_lock()
919-
started.acquire()
920-
finish.acquire()
921-
def f():
922-
started.release()
923-
finish.acquire()
924-
time.sleep(0.01)
925-
# The tstate lock is None until the thread is started
926-
t = threading.Thread(target=f)
927-
self.assertIs(t._tstate_lock, None)
928-
t.start()
929-
started.acquire()
930-
self.assertTrue(t.is_alive())
931-
# The tstate lock can't be acquired when the thread is running
932-
# (or suspended).
933-
tstate_lock = t._tstate_lock
934-
self.assertFalse(tstate_lock.acquire(timeout=0), False)
935-
finish.release()
936-
# When the thread ends, the state_lock can be successfully
937-
# acquired.
938-
self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False)
939-
# But is_alive() is still True: we hold _tstate_lock now, which
940-
# prevents is_alive() from knowing the thread's end-of-life C code
941-
# is done.
942-
self.assertTrue(t.is_alive())
943-
# Let is_alive() find out the C code is done.
944-
tstate_lock.release()
945-
self.assertFalse(t.is_alive())
946-
# And verify the thread disposed of _tstate_lock.
947-
self.assertIsNone(t._tstate_lock)
948-
t.join()
949-
950915
def test_repr_stopped(self):
951916
# Verify that "stopped" shows up in repr(Thread) appropriately.
952917
started = _thread.allocate_lock()
@@ -1112,30 +1077,6 @@ def checker():
11121077
self.assertEqual(threading.getprofile(), old_profile)
11131078
self.assertEqual(sys.getprofile(), old_profile)
11141079

1115-
@cpython_only
1116-
def test_shutdown_locks(self):
1117-
for daemon in (False, True):
1118-
with self.subTest(daemon=daemon):
1119-
event = threading.Event()
1120-
thread = threading.Thread(target=event.wait, daemon=daemon)
1121-
1122-
# Thread.start() must add lock to _shutdown_locks,
1123-
# but only for non-daemon thread
1124-
thread.start()
1125-
tstate_lock = thread._tstate_lock
1126-
if not daemon:
1127-
self.assertIn(tstate_lock, threading._shutdown_locks)
1128-
else:
1129-
self.assertNotIn(tstate_lock, threading._shutdown_locks)
1130-
1131-
# unblock the thread and join it
1132-
event.set()
1133-
thread.join()
1134-
1135-
# Thread._stop() must remove tstate_lock from _shutdown_locks.
1136-
# Daemon threads must never add it to _shutdown_locks.
1137-
self.assertNotIn(tstate_lock, threading._shutdown_locks)
1138-
11391080
def test_locals_at_exit(self):
11401081
# bpo-19466: thread locals must not be deleted before destructors
11411082
# are called

0 commit comments

Comments
 (0)