diff --git a/neo4j/io/__init__.py b/neo4j/io/__init__.py index 862895d7f..658733db2 100644 --- a/neo4j/io/__init__.py +++ b/neo4j/io/__init__.py @@ -509,13 +509,14 @@ def _append(self, signature, fields=(), response=None): self.responses.append(response) def _send_all(self): - data = self.outbox.view() - if data: - try: + try: + with self.outbox.view() as data: + if not data: + return self.socket.sendall(data) - except OSError as error: - self._set_defunct_write(error) - self.outbox.clear() + except OSError as error: + self._set_defunct_write(error) + self.outbox.clear() def send_all(self): """ Send all queued messages to the server. diff --git a/neo4j/io/_common.py b/neo4j/io/_common.py index a3f079108..f11ddd8e0 100644 --- a/neo4j/io/_common.py +++ b/neo4j/io/_common.py @@ -114,22 +114,21 @@ def _chunk_data(self): ) num_chunks = num_full_chunks + bool(chunk_rest) - data_view = memoryview(self._raw_data) - header_start = len(self._chunked_data) - data_start = header_start + 2 - raw_data_start = 0 - for i in range(num_chunks): - chunk_size = min(data_len - raw_data_start, - self._max_chunk_size) - self._chunked_data[header_start:data_start] = struct_pack( - ">H", chunk_size - ) - self._chunked_data[data_start:(data_start + chunk_size)] = \ - data_view[raw_data_start:(raw_data_start + chunk_size)] - header_start += chunk_size + 2 + with memoryview(self._raw_data) as data_view: + header_start = len(self._chunked_data) data_start = header_start + 2 - raw_data_start += chunk_size - del data_view + raw_data_start = 0 + for i in range(num_chunks): + chunk_size = min(data_len - raw_data_start, + self._max_chunk_size) + self._chunked_data[header_start:data_start] = struct_pack( + ">H", chunk_size + ) + self._chunked_data[data_start:(data_start + chunk_size)] = \ + data_view[raw_data_start:(raw_data_start + chunk_size)] + header_start += chunk_size + 2 + data_start = header_start + 2 + raw_data_start += chunk_size self._raw_data.clear() def wrap_message(self): diff --git a/neo4j/packstream.py b/neo4j/packstream.py index 406d761e4..f45ac0ab1 100644 --- a/neo4j/packstream.py +++ b/neo4j/packstream.py @@ -292,13 +292,13 @@ def _unpack(self): # Bytes elif marker == 0xCC: size, = struct_unpack(">B", self.read(1)) - return self.read(size).tobytes() + return self.read(size) elif marker == 0xCD: size, = struct_unpack(">H", self.read(2)) - return self.read(size).tobytes() + return self.read(size) elif marker == 0xCE: size, = struct_unpack(">I", self.read(4)) - return self.read(size).tobytes() + return self.read(size) else: marker_high = marker & 0xF0 @@ -424,7 +424,7 @@ def unpack_structure_header(self): def _unpack_structure_header(self, marker): marker_high = marker & 0xF0 if marker_high == 0xB0: # TINY_STRUCT - signature = self.read(1).tobytes() + signature = bytes(self.read(1)) return marker & 0x0F, signature else: raise ValueError("Expected structure, found marker %02X" % marker) @@ -448,11 +448,10 @@ def reset(self): self.p = 0 def read(self, n=1): - view = memoryview(self.data) q = self.p + n - subview = view[self.p:q] + sub_data = self.data[self.p:q] self.p = q - return subview + return sub_data def read_u8(self): if self.used - self.p >= 1: @@ -477,9 +476,9 @@ def receive(self, sock, n_bytes): end = self.used + n_bytes if end > len(self.data): self.data += bytearray(end - len(self.data)) - view = memoryview(self.data) - while self.used < end: - n = sock.recv_into(view[self.used:end], end - self.used) - if n == 0: - raise OSError("No data") - self.used += n + with memoryview(self.data) as view: + while self.used < end: + n = sock.recv_into(view[self.used:end], end - self.used) + if n == 0: + raise OSError("No data") + self.used += n diff --git a/neo4j/work/pipelining.py b/neo4j/work/pipelining.py index ccca84202..8bd069767 100644 --- a/neo4j/work/pipelining.py +++ b/neo4j/work/pipelining.py @@ -48,7 +48,8 @@ def __init__(self, pool, config): def push(self, statement, parameters=None): self._connection.run(statement, parameters) self._connection.pull(on_records=self._data.extend) - output_buffer_size = len(self._connection.outbox.view()) + with self._connection.outbox.view() as view: + output_buffer_size = len(view) if output_buffer_size >= self._flush_every: self._connection.send_all() diff --git a/tests/unit/io/test__common.py b/tests/unit/io/test__common.py index 3b61c7103..b3af3c991 100644 --- a/tests/unit/io/test__common.py +++ b/tests/unit/io/test__common.py @@ -22,11 +22,15 @@ )) def test_outbox_chunking(chunk_size, data, result): outbox = Outbox(max_chunk_size=chunk_size) - assert bytes(outbox.view()) == b"" + with outbox.view() as view: + assert bytes(view) == b"" for d in data: outbox.write(d) - assert bytes(outbox.view()) == result + with outbox.view() as view: + assert bytes(view) == result # make sure this works multiple times - assert bytes(outbox.view()) == result + with outbox.view() as view: + assert bytes(view) == result outbox.clear() - assert bytes(outbox.view()) == b"" + with outbox.view() as view: + assert bytes(view) == b""