From b437e8ba4917c77b33e14ca732cd9ce3988d611f Mon Sep 17 00:00:00 2001 From: marun Date: Fri, 7 Jun 2019 21:19:59 +0900 Subject: [PATCH 01/17] bpo-37193: remove the thread which finished process request from threads list --- Lib/socketserver.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Lib/socketserver.py b/Lib/socketserver.py index 905df9319e2fa8..3c53b8cb2b238c 100644 --- a/Lib/socketserver.py +++ b/Lib/socketserver.py @@ -652,6 +652,11 @@ def process_request_thread(self, request, client_address): self.handle_error(request, client_address) finally: self.shutdown_request(request) + t = threading.current_thread() + try: + self._threads.remove(t) + except AttributeError: + pass def process_request(self, request, client_address): """Start a new thread to process the request.""" From 94a7bdf20084e13ae84c664d196cfecb669c3f60 Mon Sep 17 00:00:00 2001 From: marun Date: Thu, 20 Jun 2019 18:52:58 +0900 Subject: [PATCH 02/17] rename variable t to thread. --- Lib/socketserver.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/socketserver.py b/Lib/socketserver.py index 3c53b8cb2b238c..97d6c9f50d0709 100644 --- a/Lib/socketserver.py +++ b/Lib/socketserver.py @@ -652,9 +652,9 @@ def process_request_thread(self, request, client_address): self.handle_error(request, client_address) finally: self.shutdown_request(request) - t = threading.current_thread() + thread = threading.current_thread() try: - self._threads.remove(t) + self._threads.remove(thread) except AttributeError: pass From 16142cf37845483ee2f1ae14e4735a95392c11b6 Mon Sep 17 00:00:00 2001 From: marun Date: Thu, 20 Jun 2019 18:55:17 +0900 Subject: [PATCH 03/17] don't remove thread from list if it is daemon. --- Lib/socketserver.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Lib/socketserver.py b/Lib/socketserver.py index 97d6c9f50d0709..d534dc9b3a6165 100644 --- a/Lib/socketserver.py +++ b/Lib/socketserver.py @@ -654,7 +654,8 @@ def process_request_thread(self, request, client_address): self.shutdown_request(request) thread = threading.current_thread() try: - self._threads.remove(thread) + if not thread.daemon: + self._threads.remove(thread) except AttributeError: pass From e6fa24e3db36ef69559e3583b0ea064137befd5d Mon Sep 17 00:00:00 2001 From: marun Date: Sat, 22 Jun 2019 00:28:41 +0900 Subject: [PATCH 04/17] use lock to protect self._threads. --- Lib/socketserver.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/Lib/socketserver.py b/Lib/socketserver.py index d534dc9b3a6165..f7fc86e0432017 100644 --- a/Lib/socketserver.py +++ b/Lib/socketserver.py @@ -639,6 +639,7 @@ class ThreadingMixIn: # For non-daemonic threads, list of threading.Threading objects # used by server_close() to wait for all threads completion. _threads = None + _threads_lock = threading.Lock() def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. @@ -653,11 +654,9 @@ def process_request_thread(self, request, client_address): finally: self.shutdown_request(request) thread = threading.current_thread() - try: - if not thread.daemon: + with self._threads_lock: + if self._threads and not thread.daemon: self._threads.remove(thread) - except AttributeError: - pass def process_request(self, request, client_address): """Start a new thread to process the request.""" @@ -665,16 +664,18 @@ def process_request(self, request, client_address): args = (request, client_address)) t.daemon = self.daemon_threads if not t.daemon and self.block_on_close: - if self._threads is None: - self._threads = [] - self._threads.append(t) + with self._threads_lock: + if self._threads is None: + self._threads = [] + self._threads.append(t) t.start() def server_close(self): super().server_close() if self.block_on_close: threads = self._threads - self._threads = None + with self._threads_lock: + self._threads = None if threads: for thread in threads: thread.join() From a70251792fe457af981928076267705ab2484deb Mon Sep 17 00:00:00 2001 From: marun Date: Sat, 22 Jun 2019 01:05:51 +0900 Subject: [PATCH 05/17] use finally block in case of exception from shutdown_request(). --- Lib/socketserver.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/Lib/socketserver.py b/Lib/socketserver.py index f7fc86e0432017..81a1ef4f0ca457 100644 --- a/Lib/socketserver.py +++ b/Lib/socketserver.py @@ -652,11 +652,13 @@ def process_request_thread(self, request, client_address): except Exception: self.handle_error(request, client_address) finally: - self.shutdown_request(request) - thread = threading.current_thread() - with self._threads_lock: - if self._threads and not thread.daemon: - self._threads.remove(thread) + try: + self.shutdown_request(request) + finally: + thread = threading.current_thread() + with self._threads_lock: + if self._threads and not thread.daemon: + self._threads.remove(thread) def process_request(self, request, client_address): """Start a new thread to process the request.""" From 9daa4178dbe0d7bc7628870897a9f684af62b2db Mon Sep 17 00:00:00 2001 From: marun Date: Tue, 25 Jun 2019 16:42:44 +0900 Subject: [PATCH 06/17] check "not thread.daemon" before lock to avoid holding the lock if it's unnecessary. --- Lib/socketserver.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Lib/socketserver.py b/Lib/socketserver.py index 81a1ef4f0ca457..55929920c276c1 100644 --- a/Lib/socketserver.py +++ b/Lib/socketserver.py @@ -656,9 +656,10 @@ def process_request_thread(self, request, client_address): self.shutdown_request(request) finally: thread = threading.current_thread() - with self._threads_lock: - if self._threads and not thread.daemon: - self._threads.remove(thread) + if not thread.daemon: + with self._threads_lock: + if self._threads is not None: + self._threads.remove(thread) def process_request(self, request, client_address): """Start a new thread to process the request.""" From 287cc8cbd18a532dc22b1cc2a35da616ddd33610 Mon Sep 17 00:00:00 2001 From: marun Date: Tue, 25 Jun 2019 17:01:01 +0900 Subject: [PATCH 07/17] fix the place of _threads_lock. --- Lib/socketserver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/socketserver.py b/Lib/socketserver.py index 55929920c276c1..87b76dcf85883f 100644 --- a/Lib/socketserver.py +++ b/Lib/socketserver.py @@ -676,8 +676,8 @@ def process_request(self, request, client_address): def server_close(self): super().server_close() if self.block_on_close: - threads = self._threads with self._threads_lock: + threads = self._threads self._threads = None if threads: for thread in threads: From 68abb0de2e8db7212f21c5a0da78db26dc2ab02c Mon Sep 17 00:00:00 2001 From: marun Date: Tue, 25 Jun 2019 18:36:11 +0900 Subject: [PATCH 08/17] separate code to remove a current thread into a function. --- Lib/socketserver.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/Lib/socketserver.py b/Lib/socketserver.py index 87b76dcf85883f..4829a1e4e44394 100644 --- a/Lib/socketserver.py +++ b/Lib/socketserver.py @@ -655,11 +655,15 @@ def process_request_thread(self, request, client_address): try: self.shutdown_request(request) finally: - thread = threading.current_thread() - if not thread.daemon: - with self._threads_lock: - if self._threads is not None: - self._threads.remove(thread) + self._remove_thread() + + def _remove_thread(self): + """Remove a current thread from threads list.""" + thread = threading.current_thread() + if not thread.daemon: + with self._threads_lock: + if self._threads is not None: + self._threads.remove(thread) def process_request(self, request, client_address): """Start a new thread to process the request.""" From c7286d1cccad60cafe651df48a95e2fb99680a9a Mon Sep 17 00:00:00 2001 From: marun Date: Tue, 25 Jun 2019 18:40:53 +0900 Subject: [PATCH 09/17] check ValueError when removing thread. --- Lib/socketserver.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Lib/socketserver.py b/Lib/socketserver.py index 4829a1e4e44394..221cc010201fec 100644 --- a/Lib/socketserver.py +++ b/Lib/socketserver.py @@ -663,7 +663,10 @@ def _remove_thread(self): if not thread.daemon: with self._threads_lock: if self._threads is not None: - self._threads.remove(thread) + try: + self._threads.remove(thread) + except ValueError: + pass def process_request(self, request, client_address): """Start a new thread to process the request.""" From 6ac217cb1e7fdb2d9f8749278da4d4bd748f4996 Mon Sep 17 00:00:00 2001 From: marun Date: Tue, 25 Jun 2019 22:08:52 +0900 Subject: [PATCH 10/17] fix wrong code which all instance shared same lock. --- Lib/socketserver.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Lib/socketserver.py b/Lib/socketserver.py index 221cc010201fec..3daad694859efb 100644 --- a/Lib/socketserver.py +++ b/Lib/socketserver.py @@ -639,7 +639,7 @@ class ThreadingMixIn: # For non-daemonic threads, list of threading.Threading objects # used by server_close() to wait for all threads completion. _threads = None - _threads_lock = threading.Lock() + _threads_lock = None def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. @@ -661,6 +661,8 @@ def _remove_thread(self): """Remove a current thread from threads list.""" thread = threading.current_thread() if not thread.daemon: + if self._threads_lock is None: + self._threads_lock = threading.Lock() with self._threads_lock: if self._threads is not None: try: @@ -674,6 +676,8 @@ def process_request(self, request, client_address): args = (request, client_address)) t.daemon = self.daemon_threads if not t.daemon and self.block_on_close: + if self._threads_lock is None: + self._threads_lock = threading.Lock() with self._threads_lock: if self._threads is None: self._threads = [] @@ -683,6 +687,8 @@ def process_request(self, request, client_address): def server_close(self): super().server_close() if self.block_on_close: + if self._threads_lock is None: + self._threads_lock = threading.Lock() with self._threads_lock: threads = self._threads self._threads = None From 5ca61a26eba867f3a3b841749e3cb9e388edebd2 Mon Sep 17 00:00:00 2001 From: "Jason R. Coombs" Date: Fri, 12 Jun 2020 20:01:49 -0400 Subject: [PATCH 11/17] Extract thread management into a _Threads class to encapsulate atomic operations and separate concerns. --- Lib/socketserver.py | 70 +++++++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/Lib/socketserver.py b/Lib/socketserver.py index 3daad694859efb..783dfa306e0eba 100644 --- a/Lib/socketserver.py +++ b/Lib/socketserver.py @@ -128,6 +128,7 @@ class will essentially render the service "deaf" while one request is import os import sys import threading +import contextlib from io import BufferedIOBase from time import monotonic as time @@ -628,6 +629,38 @@ def server_close(self): self.collect_children(blocking=self.block_on_close) +class _Threads(list): + def __init__(self): + self._lock = threading.Lock() + + def append(self, thread): + if thread.daemon: + return + with self._lock: + super().append(thread) + + def remove(self, thread): + with self._lock: + # suppress ValueError even though unexpected + with contextlib.suppress(ValueError): + super().remove(thread) + + def remove_current(self): + """Remove a current non-daemon thread.""" + thread = threading.current_thread() + if not thread.daemon: + self.remove(thread) + + def pop_all(self): + with self._lock: + self[:], result = [], self[:] + return result + + def join(self): + for thread in self.pop_all(): + thread.join() + + class ThreadingMixIn: """Mix-in class to handle each request in a new thread.""" @@ -636,10 +669,9 @@ class ThreadingMixIn: daemon_threads = False # If true, server_close() waits until all non-daemonic threads terminate. block_on_close = True - # For non-daemonic threads, list of threading.Threading objects + # Threads object # used by server_close() to wait for all threads completion. _threads = None - _threads_lock = None def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. @@ -655,46 +687,22 @@ def process_request_thread(self, request, client_address): try: self.shutdown_request(request) finally: - self._remove_thread() - - def _remove_thread(self): - """Remove a current thread from threads list.""" - thread = threading.current_thread() - if not thread.daemon: - if self._threads_lock is None: - self._threads_lock = threading.Lock() - with self._threads_lock: - if self._threads is not None: - try: - self._threads.remove(thread) - except ValueError: - pass + self._threads.remove_current() def process_request(self, request, client_address): """Start a new thread to process the request.""" + vars(self).setdefault('_threads', _Threads()) t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads - if not t.daemon and self.block_on_close: - if self._threads_lock is None: - self._threads_lock = threading.Lock() - with self._threads_lock: - if self._threads is None: - self._threads = [] - self._threads.append(t) + if self.block_on_close: + self._threads.append(t) t.start() def server_close(self): super().server_close() if self.block_on_close: - if self._threads_lock is None: - self._threads_lock = threading.Lock() - with self._threads_lock: - threads = self._threads - self._threads = None - if threads: - for thread in threads: - thread.join() + self._threads.join() if hasattr(os, "fork"): From fcd11664ee313ce6b82c7c2e428680e0392bc7b5 Mon Sep 17 00:00:00 2001 From: "Jason R. Coombs" Date: Fri, 12 Jun 2020 20:54:03 -0400 Subject: [PATCH 12/17] Replace multiple references of 'block_on_close' with one, avoiding the possibility that 'block_on_close' could change during the course of processing requests. Now, there's exactly one _threads object with behavior fixed for the duration. --- Lib/socketserver.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/Lib/socketserver.py b/Lib/socketserver.py index 783dfa306e0eba..074c95569bb279 100644 --- a/Lib/socketserver.py +++ b/Lib/socketserver.py @@ -641,7 +641,7 @@ def append(self, thread): def remove(self, thread): with self._lock: - # suppress ValueError even though unexpected + # should not happen, but safe to ignore with contextlib.suppress(ValueError): super().remove(thread) @@ -661,6 +661,17 @@ def join(self): thread.join() +class _NoThreads: + def append(self, thread): + pass + + def join(self): + pass + + def remove_current(self): + pass + + class ThreadingMixIn: """Mix-in class to handle each request in a new thread.""" @@ -691,18 +702,19 @@ def process_request_thread(self, request, client_address): def process_request(self, request, client_address): """Start a new thread to process the request.""" - vars(self).setdefault('_threads', _Threads()) + vars(self).setdefault( + '_threads', + _Threads() if self.block_on_close else _NoThreads(), + ) t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads - if self.block_on_close: - self._threads.append(t) + self._threads.append(t) t.start() def server_close(self): super().server_close() - if self.block_on_close: - self._threads.join() + self._threads.join() if hasattr(os, "fork"): From cc6e532f8d317d53178e2b86cef74ce457c9c268 Mon Sep 17 00:00:00 2001 From: "Jason R. Coombs" Date: Fri, 12 Jun 2020 21:02:57 -0400 Subject: [PATCH 13/17] Add docstrings to private classes. --- Lib/socketserver.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Lib/socketserver.py b/Lib/socketserver.py index 074c95569bb279..ff48498a452ab2 100644 --- a/Lib/socketserver.py +++ b/Lib/socketserver.py @@ -630,6 +630,9 @@ def server_close(self): class _Threads(list): + """ + Joinable list of all non-daemon threads. + """ def __init__(self): self._lock = threading.Lock() @@ -662,6 +665,9 @@ def join(self): class _NoThreads: + """ + Degenerate version of _Threads. + """ def append(self, thread): pass From b107b913c17f8eab25be0c1c24edfd25bfbed6a9 Mon Sep 17 00:00:00 2001 From: "Jason R. Coombs" Date: Fri, 12 Jun 2020 21:17:08 -0400 Subject: [PATCH 14/17] Add test to ensure that a ThreadingTCPServer can be closed without serving any requests. --- Lib/test/test_socketserver.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py index 8aed4b61a23744..61c0918d5ead22 100644 --- a/Lib/test/test_socketserver.py +++ b/Lib/test/test_socketserver.py @@ -276,6 +276,13 @@ class MyHandler(socketserver.StreamRequestHandler): t.join() s.server_close() + def test_close_immediately(self): + class MyServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + pass + + server = MyServer((HOST, 0), lambda: None) + server.server_close() + def test_tcpserver_bind_leak(self): # Issue #22435: the server socket wouldn't be closed if bind()/listen() # failed. From a1aff7673d947fe63e2f303b1766686602c9e4dc Mon Sep 17 00:00:00 2001 From: "Jason R. Coombs" Date: Fri, 12 Jun 2020 21:19:25 -0400 Subject: [PATCH 15/17] Use _NoThreads as the default value. Fixes AttributeError when server is closed without serving any requests. --- Lib/socketserver.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/Lib/socketserver.py b/Lib/socketserver.py index ff48498a452ab2..afddbb1b76202c 100644 --- a/Lib/socketserver.py +++ b/Lib/socketserver.py @@ -688,7 +688,7 @@ class ThreadingMixIn: block_on_close = True # Threads object # used by server_close() to wait for all threads completion. - _threads = None + _threads = _NoThreads() def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. @@ -708,10 +708,8 @@ def process_request_thread(self, request, client_address): def process_request(self, request, client_address): """Start a new thread to process the request.""" - vars(self).setdefault( - '_threads', - _Threads() if self.block_on_close else _NoThreads(), - ) + if self.block_on_close: + vars(self).setdefault('_threads', _Threads()) t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads From d99817e9ec1428c1ce237d5bc01c6234486ade7c Mon Sep 17 00:00:00 2001 From: "Jason R. Coombs" Date: Fri, 12 Jun 2020 21:23:37 -0400 Subject: [PATCH 16/17] Add blurb --- .../next/Library/2020-06-12-21-23-20.bpo-37193.wJximU.rst | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2020-06-12-21-23-20.bpo-37193.wJximU.rst diff --git a/Misc/NEWS.d/next/Library/2020-06-12-21-23-20.bpo-37193.wJximU.rst b/Misc/NEWS.d/next/Library/2020-06-12-21-23-20.bpo-37193.wJximU.rst new file mode 100644 index 00000000000000..fbf56d3194cd22 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-06-12-21-23-20.bpo-37193.wJximU.rst @@ -0,0 +1,2 @@ +Fixed memory leak in ``socketserver.ThreadingMixIn`` introduced in Python +3.7. From 7d1f367941c411ce01c4bf5ec5b020680e7c5313 Mon Sep 17 00:00:00 2001 From: "Jason R. Coombs" Date: Sat, 13 Jun 2020 10:07:13 -0400 Subject: [PATCH 17/17] Add test capturing failure. --- Lib/test/test_socketserver.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py index 8aed4b61a23744..94943d66455d15 100644 --- a/Lib/test/test_socketserver.py +++ b/Lib/test/test_socketserver.py @@ -490,6 +490,23 @@ def shutdown_request(self, request): self.assertEqual(server.shutdown_called, 1) server.server_close() + def test_threads_reaped(self): + """ + In #37193, users reported a memory leak + due to the saving of every request thread. Ensure that the + threads are cleaned up after the requests complete. + """ + class MyServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + pass + + server = MyServer((HOST, 0), socketserver.StreamRequestHandler) + for n in range(10): + with socket.create_connection(server.server_address): + server.handle_request() + [thread.join() for thread in server._threads] + self.assertEqual(len(server._threads), 0) + server.server_close() + if __name__ == "__main__": unittest.main()