Skip to content

Commit 524d275

Browse files
bpo-47029: Fix BrokenPipeError in multiprocessing.Queue at garbage collection and explicit close (GH-31913)
(cherry picked from commit dfb1b9d) Co-authored-by: Géry Ogam <[email protected]>
1 parent 187cb95 commit 524d275

File tree

2 files changed

+15
-12
lines changed

2 files changed

+15
-12
lines changed

Lib/multiprocessing/queues.py

+11-12
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,10 @@ def put_nowait(self, obj):
139139

140140
def close(self):
141141
self._closed = True
142-
try:
143-
self._reader.close()
144-
finally:
145-
close = self._close
146-
if close:
147-
self._close = None
148-
close()
142+
close = self._close
143+
if close:
144+
self._close = None
145+
close()
149146

150147
def join_thread(self):
151148
debug('Queue.join_thread()')
@@ -169,8 +166,9 @@ def _start_thread(self):
169166
self._thread = threading.Thread(
170167
target=Queue._feed,
171168
args=(self._buffer, self._notempty, self._send_bytes,
172-
self._wlock, self._writer.close, self._ignore_epipe,
173-
self._on_queue_feeder_error, self._sem),
169+
self._wlock, self._reader.close, self._writer.close,
170+
self._ignore_epipe, self._on_queue_feeder_error,
171+
self._sem),
174172
name='QueueFeederThread'
175173
)
176174
self._thread.daemon = True
@@ -211,8 +209,8 @@ def _finalize_close(buffer, notempty):
211209
notempty.notify()
212210

213211
@staticmethod
214-
def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
215-
onerror, queue_sem):
212+
def _feed(buffer, notempty, send_bytes, writelock, reader_close,
213+
writer_close, ignore_epipe, onerror, queue_sem):
216214
debug('starting thread to feed data to pipe')
217215
nacquire = notempty.acquire
218216
nrelease = notempty.release
@@ -238,7 +236,8 @@ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
238236
obj = bpopleft()
239237
if obj is sentinel:
240238
debug('feeder thread got sentinel -- exiting')
241-
close()
239+
reader_close()
240+
writer_close()
242241
return
243242

244243
# serialize the data before acquiring the lock
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Always close the read end of the pipe used by :class:`multiprocessing.Queue`
2+
*after* the last write of buffered data to the write end of the pipe to avoid
3+
:exc:`BrokenPipeError` at garbage collection and at
4+
:meth:`multiprocessing.Queue.close` calls. Patch by Géry Ogam.

0 commit comments

Comments
 (0)