Skip to content

Release 1.5.0 #165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8c5980d
Update version
rhpvorderman Oct 9, 2023
07584be
Create a separate function that calculates the decompressed block and…
rhpvorderman Oct 9, 2023
fed1d1d
Use a separate object with reusable buffers
rhpvorderman Oct 10, 2023
048cc1d
Fix memory leak
rhpvorderman Oct 10, 2023
795e79c
Calculate crc on the fly
rhpvorderman Oct 10, 2023
0214e12
Make sure writebuffersize is set
rhpvorderman Oct 10, 2023
1278c7b
Read smaller blocks and store less blocks in queue to reduce memory s…
rhpvorderman Oct 10, 2023
49ab675
Make block size configurable
rhpvorderman Oct 10, 2023
dff95a3
Use a smaller queue for better characteristics
rhpvorderman Oct 10, 2023
424a5b9
Combine queues and threads for threads equals 1, for greater efficiency
rhpvorderman Oct 11, 2023
1509a7a
Parametrize tests for threaded writer so both cases are tested
rhpvorderman Oct 11, 2023
1107f47
Build tuple directly rather than using Py_BuildValue
rhpvorderman Oct 11, 2023
550e010
Use meth_fastcall for compress_and_crc
rhpvorderman Oct 11, 2023
59620fa
Make sure zdict cannot be excessively large
rhpvorderman Oct 11, 2023
0d1cd10
Unblock threads at the earliest possible convenience
rhpvorderman Oct 11, 2023
8327101
Update changelog with changes for threaded writing
rhpvorderman Oct 11, 2023
0226aa0
Set buffer_size at latest position for backwards compatibility
rhpvorderman Oct 11, 2023
e8fd360
use os.urandom rather than random.randbytes
rhpvorderman Oct 11, 2023
71b7d61
Merge pull request #162 from pycompression/specialcaseonethread
rhpvorderman Oct 11, 2023
39d31e0
Add tests for oversized blocks
rhpvorderman Oct 11, 2023
c80645d
Test if too big blocks get written correctly
rhpvorderman Oct 11, 2023
d42f0db
Alter block size in test so multiple queues get tested
rhpvorderman Oct 12, 2023
dc7f6dd
Make sure negative threads are tested
rhpvorderman Oct 12, 2023
36551e5
Merge pull request #164 from pycompression/fixbuffercrash
rhpvorderman Oct 12, 2023
4abcbdd
Reword changelog to be more user oriented
rhpvorderman Oct 11, 2023
74883cf
Set version 1.5.0
rhpvorderman Oct 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ Changelog
.. This document is user facing. Please word the changes in such a way
.. that users understand how the changes affect the new version.

version 1.5.0
-----------------
+ Make a special case for threads==1 in ``igzip_threaded.open`` for writing
files. This now combines the writing and compression thread for less
overhead.
+ Maximize time spent outside the GIL for ``igzip_threaded.open`` writing.
This has decreased wallclock time significantly.

version 1.4.1
-----------------
+ Fix several errors related to unclosed files and buffers.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def build_isa_l():

setup(
name="isal",
version="1.4.1",
version="1.5.0",
description="Faster zlib and gzip compatible compression and "
"decompression by providing python bindings for the ISA-L "
"library.",
Expand Down
2 changes: 1 addition & 1 deletion src/isal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@
"__version__"
]

__version__ = "1.4.1"
__version__ = "1.5.0"
144 changes: 106 additions & 38 deletions src/isal/igzip_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@


def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
encoding=None, errors=None, newline=None, *, threads=1):
encoding=None, errors=None, newline=None, *, threads=1,
block_size=1024 * 1024):
"""
Utilize threads to read and write gzip objects and escape the GIL.
Comparable to gzip.open. This method is only usable for streamed reading
Expand All @@ -39,6 +40,8 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
:param threads: If 0 will defer to igzip.open, if < 0 will use all threads
available to the system. Reading gzip can only
use one thread.
:param block_size: Determines how large the blocks in the read/write
queues are for threaded reading and writing.
:return: An io.BufferedReader, io.BufferedWriter, or io.TextIOWrapper,
depending on the mode.
"""
Expand All @@ -61,21 +64,27 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
else:
raise TypeError("filename must be a str or bytes object, or a file")
if "r" in mode:
gzip_file = io.BufferedReader(_ThreadedGzipReader(binary_file))
gzip_file = io.BufferedReader(
_ThreadedGzipReader(binary_file, block_size=block_size))
else:
gzip_file = io.BufferedWriter(
_ThreadedGzipWriter(binary_file, compresslevel, threads),
buffer_size=1024 * 1024
_ThreadedGzipWriter(
fp=binary_file,
block_size=block_size,
level=compresslevel,
threads=threads
),
buffer_size=block_size
)
if "t" in mode:
return io.TextIOWrapper(gzip_file, encoding, errors, newline)
return gzip_file


class _ThreadedGzipReader(io.RawIOBase):
def __init__(self, fp, queue_size=4, block_size=8 * 1024 * 1024):
def __init__(self, fp, queue_size=2, block_size=1024 * 1024):
self.raw = fp
self.fileobj = igzip._IGzipReader(fp, buffersize=8 * 1024 * 1024)
self.fileobj = igzip._IGzipReader(fp, buffersize=8 * block_size)
self.pos = 0
self.read_file = False
self.queue = queue.Queue(queue_size)
Expand Down Expand Up @@ -179,35 +188,53 @@ class _ThreadedGzipWriter(io.RawIOBase):

The writer thread reads from output queues and uses the crc32_combine
function to calculate the total crc. It also writes the compressed block.

When only one thread is requested, only the input queue is used and
compressing and output is handled in one thread.
"""
def __init__(self,
fp: BinaryIO,
level: int = isal_zlib.ISAL_DEFAULT_COMPRESSION,
threads: int = 1,
queue_size: int = 2):
if level < 0 or level > 3:
raise ValueError(
f"Invalid compression level, "
f"level should be between 0 and 3: {level}")
queue_size: int = 1,
block_size: int = 1024 * 1024,
):
self.lock = threading.Lock()
self.exception: Optional[Exception] = None
self.raw = fp
self.level = level
self.previous_block = b""
self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [
queue.Queue(queue_size) for _ in range(threads)]
self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [
queue.Queue(queue_size) for _ in range(threads)]
self.index = 0
# Deflating random data results in an output a little larger than the
# input. Making the output buffer 10% larger is sufficient overkill.
compress_buffer_size = block_size + max(block_size // 10, 500)
self.block_size = block_size
self.compressors: List[isal_zlib._ParallelCompress] = [
isal_zlib._ParallelCompress(buffersize=compress_buffer_size,
level=level) for _ in range(threads)
]
if threads > 1:
self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [
queue.Queue(queue_size) for _ in range(threads)]
self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [
queue.Queue(queue_size) for _ in range(threads)]
self.output_worker = threading.Thread(target=self._write)
self.compression_workers = [
threading.Thread(target=self._compress, args=(i,))
for i in range(threads)
]
elif threads == 1:
self.input_queues = [queue.Queue(queue_size)]
self.output_queues = []
self.compression_workers = []
self.output_worker = threading.Thread(
target=self._compress_and_write)
else:
raise ValueError(f"threads should be at least 1, got {threads}")
self.threads = threads
self.index = 0
self._crc = 0
self.running = False
self._size = 0
self.output_worker = threading.Thread(target=self._write)
self.compression_workers = [
threading.Thread(target=self._compress, args=(i,))
for i in range(threads)
]
self._closed = False
self._write_gzip_header()
self.start()
Expand Down Expand Up @@ -246,8 +273,19 @@ def write(self, b) -> int:
with self.lock:
if self.exception:
raise self.exception
index = self.index
length = b.nbytes if isinstance(b, memoryview) else len(b)
if length > self.block_size:
# write smaller chunks and return the result
memview = memoryview(b)
start = 0
total_written = 0
while start < length:
total_written += self.write(
memview[start:start+self.block_size])
start += self.block_size
return total_written
data = bytes(b)
index = self.index
zdict = memoryview(self.previous_block)[-DEFLATE_WINDOW_SIZE:]
self.previous_block = data
self.index += 1
Expand Down Expand Up @@ -289,6 +327,7 @@ def closed(self) -> bool:
def _compress(self, index: int):
in_queue = self.input_queues[index]
out_queue = self.output_queues[index]
compressor: isal_zlib._ParallelCompress = self.compressors[index]
while True:
try:
data, zdict = in_queue.get(timeout=0.05)
Expand All @@ -297,23 +336,11 @@ def _compress(self, index: int):
return
continue
try:
compressor = isal_zlib.compressobj(
self.level, wbits=-15, zdict=zdict)
compressed = compressor.compress(data) + compressor.flush(
isal_zlib.Z_SYNC_FLUSH)
crc = isal_zlib.crc32(data)
compressed, crc = compressor.compress_and_crc(data, zdict)
except Exception as e:
with self.lock:
self.exception = e
# Abort everything and empty the queue
in_queue.task_done()
self.running = False
while True:
try:
_ = in_queue.get(timeout=0.05)
in_queue.task_done()
except queue.Empty:
return
in_queue.task_done()
self._set_error_and_empty_queue(e, in_queue)
return
data_length = len(data)
out_queue.put((compressed, crc, data_length))
in_queue.task_done()
Expand Down Expand Up @@ -341,5 +368,46 @@ def _write(self):
output_queue.task_done()
index += 1

def _compress_and_write(self):
if not self.threads == 1:
raise SystemError("Compress_and_write is for one thread only")
fp = self.raw
total_crc = 0
size = 0
in_queue = self.input_queues[0]
compressor = self.compressors[0]
while True:
try:
data, zdict = in_queue.get(timeout=0.05)
except queue.Empty:
if not self.running:
self._crc = total_crc
self._size = size
return
continue
try:
compressed, crc = compressor.compress_and_crc(data, zdict)
except Exception as e:
in_queue.task_done()
self._set_error_and_empty_queue(e, in_queue)
return
data_length = len(data)
total_crc = isal_zlib.crc32_combine(total_crc, crc, data_length)
size += data_length
fp.write(compressed)
in_queue.task_done()

def _set_error_and_empty_queue(self, error, q):
with self.lock:
self.exception = error
# Abort everything and empty the queue
self.running = False
while True:
try:
_ = q.get(timeout=0.05)
q.task_done()
except queue.Empty:
return

def writable(self) -> bool:
return True
4 changes: 4 additions & 0 deletions src/isal/isal_zlib.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ def adler32(__data, __value: int = 1) -> int: ...
def crc32(__data, __value: int = 0) -> int: ...
def crc32_combine(__crc1: int, __crc2: int, __crc2_length: int) -> int: ...

class _ParallelCompress:
def __init__(self, buffersize: int, level: int): ...
def compress_and_crc(self, __data, __zdict) -> typing.Tuple[bytes, int]: ...

def compress(__data,
level: int = ISAL_DEFAULT_COMPRESSION,
wbits: int = MAX_WBITS) -> bytes: ...
Expand Down
Loading