diff --git a/CHANGELOG.rst b/CHANGELOG.rst index bdd0f54..f59dea5 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,11 +7,10 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. -version 0.5.1 +version 0.5.1-dev ----------------- -+ Fix a bug where ``gzip_ng_threaded.open`` could - cause a hang when the program exited and the program was not used with a - context manager. ++ Threaded reading and writing do no longer block exiting when an exception + occurs in the main thread. version 0.5.0 ----------------- diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py index 8a7e54a..a1cd918 100644 --- a/src/zlib_ng/gzip_ng_threaded.py +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -100,7 +100,9 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024): self.block_size = block_size self.worker = threading.Thread(target=self._decompress) self._closed = False - self.running = False + self.running = True + self._calling_thread = threading.current_thread() + self.worker.start() def _check_closed(self, msg=None): if self._closed: @@ -109,7 +111,7 @@ def _check_closed(self, msg=None): def _decompress(self): block_size = self.block_size block_queue = self.queue - while self.running: + while self.running and self._calling_thread.is_alive(): try: data = self.fileobj.read(block_size) except Exception as e: @@ -117,26 +119,15 @@ def _decompress(self): return if not data: return - while self.running: + while self.running and self._calling_thread.is_alive(): try: block_queue.put(data, timeout=0.05) break except queue.Full: pass - def _start(self): - if not self.running: - self.running = True - self.worker.start() - - def _stop(self): - if self.running: - self.running = False - self.worker.join() - def readinto(self, b): self._check_closed() - self._start() result = self.buffer.readinto(b) if result == 0: while True: @@ -164,7 +155,8 @@ def tell(self) -> int: def close(self) -> None: if self._closed: return - self._stop() + self.running = False + self.worker.join() self.fileobj.close() if self.closefd: self.raw.close() @@ -224,6 +216,7 @@ def __init__(self, if "b" not in mode: mode += "b" self.lock = threading.Lock() + self._calling_thread = threading.current_thread() self.exception: Optional[Exception] = None self.level = level self.previous_block = b"" @@ -261,6 +254,7 @@ def __init__(self, self.raw, self.closefd = open_as_binary_stream(filename, mode) self._closed = False self._write_gzip_header() + self.start() def _check_closed(self, msg=None): if self._closed: @@ -283,24 +277,21 @@ def _write_gzip_header(self): self.raw.write(struct.pack( "BBBBIBB", magic1, magic2, method, flags, mtime, os, xfl)) - def _start(self): - if not self.running: - self.running = True - self.output_worker.start() - for worker in self.compression_workers: - worker.start() + def start(self): + self.running = True + self.output_worker.start() + for worker in self.compression_workers: + worker.start() def stop(self): """Stop, but do not care for remaining work""" - if self.running: - self.running = False - for worker in self.compression_workers: - worker.join() - self.output_worker.join() + self.running = False + for worker in self.compression_workers: + worker.join() + self.output_worker.join() def write(self, b) -> int: self._check_closed() - self._start() with self.lock: if self.exception: raise self.exception @@ -360,7 +351,7 @@ def _compress(self, index: int): in_queue = self.input_queues[index] out_queue = self.output_queues[index] compressor: zlib_ng._ParallelCompress = self.compressors[index] - while True: + while self._calling_thread.is_alive(): try: data, zdict = in_queue.get(timeout=0.05) except queue.Empty: @@ -383,7 +374,7 @@ def _write(self): fp = self.raw total_crc = 0 size = 0 - while True: + while self._calling_thread.is_alive(): out_index = index % self.threads output_queue = output_queues[out_index] try: @@ -408,7 +399,7 @@ def _compress_and_write(self): size = 0 in_queue = self.input_queues[0] compressor = self.compressors[0] - while True: + while self._calling_thread.is_alive(): try: data, zdict = in_queue.get(timeout=0.05) except queue.Empty: diff --git a/tests/test_gzip_ng_threaded.py b/tests/test_gzip_ng_threaded.py index 1a0a5a8..7ed06de 100644 --- a/tests/test_gzip_ng_threaded.py +++ b/tests/test_gzip_ng_threaded.py @@ -105,7 +105,6 @@ def test_threaded_write_error(threads): threads=threads, block_size=8 * 1024) # Bypass the write method which should not allow blocks larger than # block_size. - f._start() f.input_queues[0].put((os.urandom(1024 * 64), b"")) with pytest.raises(OverflowError) as error: f.close()