From c06d0f1b702f344d6562a717adef3bc5997e9c7f Mon Sep 17 00:00:00 2001 From: Sam Gross Date: Thu, 6 Mar 2025 22:46:38 +0000 Subject: [PATCH 1/2] gh-110206: Fix multiprocessing test_notify_all The test could deadlock trying join on the worker processes due to a combination of behaviors: * The use of `assertReachesEventually` did not ensure that workers actually woken.release() because the SyncManager's Semaphore does not implement get_value. * This mean that the test could finish and the variable "sleeping" would got out of scope and be collected. This unregisters the proxy leading to failures in the worker or possibly the manager. * The subsequent call to `p.join()` during cleanUp therefore never finished. This takes two approaches to fix this: 1) Use woken.acquire() to ensure that the workers actually finish calling woken.release() 2) Wait until the workers finish during the test, while cond, sleeping, and woken are still valid. --- Lib/test/_test_multiprocessing.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 67ab8f098f700b..e4c4db8ee05e48 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1694,18 +1694,19 @@ def test_notify_all(self): woken = self.Semaphore(0) # start some threads/processes which will timeout + workers = [] for i in range(3): p = self.Process(target=self.f, args=(cond, sleeping, woken, TIMEOUT1)) p.daemon = True p.start() - self.addCleanup(p.join) + workers.append(p) t = threading.Thread(target=self.f, args=(cond, sleeping, woken, TIMEOUT1)) t.daemon = True t.start() - self.addCleanup(t.join) + workers.append(t) # wait for them all to sleep for i in range(6): @@ -1724,12 +1725,12 @@ def test_notify_all(self): p = self.Process(target=self.f, args=(cond, sleeping, woken)) p.daemon = True p.start() - self.addCleanup(p.join) + workers.append(t) t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) t.daemon = True t.start() - self.addCleanup(t.join) + workers.append(t) # wait for them to all sleep for i in range(6): @@ -1745,11 +1746,17 @@ def test_notify_all(self): cond.release() # check they have all woken - self.assertReachesEventually(lambda: get_value(woken), 6) + for i in range(6): + woken.acquire() + self.assertReturnsIfImplemented(0, get_value, woken) # check state is not mucked up self.check_invariant(cond) + for w in workers: + # NOTE: join_process and join_thread are the same + threading_helper.join_thread(w) + def test_notify_n(self): cond = self.Condition() sleeping = self.Semaphore(0) From 19c049d4160bb08be4e6903102747146ba15b8e4 Mon Sep 17 00:00:00 2001 From: Sam Gross Date: Thu, 6 Mar 2025 19:40:35 -0500 Subject: [PATCH 2/2] Fix typo --- Lib/test/_test_multiprocessing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index e4c4db8ee05e48..10c17b2dc7d32b 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1725,7 +1725,7 @@ def test_notify_all(self): p = self.Process(target=self.f, args=(cond, sleeping, woken)) p.daemon = True p.start() - workers.append(t) + workers.append(p) t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) t.daemon = True