Skip to content

Commit 5e3b2b6

Browse files
authored
Merge pull request #102 from neo4j/1.1-routing-optimisations
Quicker unpacking
2 parents 76cf1a9 + 8c73f51 commit 5e3b2b6

File tree

2 files changed

+113
-81
lines changed

2 files changed

+113
-81
lines changed

neo4j/v1/bolt.py

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,7 @@ def fill(self):
9292
ready_to_read, _, _ = select((self.socket,), (), (), 0)
9393
received = self.socket.recv(65539)
9494
if received:
95-
if __debug__:
96-
log_debug("S: b%r", received)
95+
log_debug("S: b%r", received)
9796
self.buffer[len(self.buffer):] = received
9897
else:
9998
if ready_to_read is not None:
@@ -174,8 +173,7 @@ def send(self):
174173
""" Send all queued messages to the server.
175174
"""
176175
data = self.raw.getvalue()
177-
if __debug__:
178-
log_debug("C: b%r", data)
176+
log_debug("C: b%r", data)
179177
self.socket.sendall(data)
180178

181179
self.raw.seek(self.raw.truncate(0))
@@ -264,8 +262,7 @@ def append(self, signature, fields=(), response=None):
264262
:arg fields: the fields of the message as a tuple
265263
:arg response: a response object to handle callbacks
266264
"""
267-
if __debug__:
268-
log_info("C: %s %s", message_names[signature], " ".join(map(repr, fields)))
265+
log_info("C: %s %r", message_names[signature], fields)
269266

270267
self.packer.pack_struct_header(len(fields), signature)
271268
for field in fields:
@@ -329,29 +326,36 @@ def fetch(self):
329326
self.defunct = True
330327
self.close()
331328
raise
332-
# Unpack from the raw byte stream and call the relevant message handler(s)
333-
self.unpacker.load(message_data)
334-
size, signature = self.unpacker.unpack_structure_header()
335-
fields = [self.unpacker.unpack() for _ in range(size)]
336329

337-
if __debug__:
338-
log_info("S: %s %r", message_names[signature], fields)
330+
unpacker = self.unpacker
331+
unpacker.load(message_data)
332+
size, signature = unpacker.unpack_structure_header()
333+
if size > 1:
334+
raise ProtocolError("Expected one field")
339335

340336
if signature == SUCCESS:
337+
metadata = unpacker.unpack_map()
338+
log_info("S: SUCCESS (%r)", metadata)
341339
response = self.responses.popleft()
342340
response.complete = True
343-
response.on_success(*fields)
341+
response.on_success(metadata or {})
344342
elif signature == RECORD:
343+
data = unpacker.unpack_list()
344+
log_info("S: RECORD (%r)", data)
345345
response = self.responses[0]
346-
response.on_record(*fields)
346+
response.on_record(data or [])
347347
elif signature == IGNORED:
348+
metadata = unpacker.unpack_map()
349+
log_info("S: IGNORED (%r)", metadata)
348350
response = self.responses.popleft()
349351
response.complete = True
350-
response.on_ignored(*fields)
352+
response.on_ignored(metadata or {})
351353
elif signature == FAILURE:
354+
metadata = unpacker.unpack_map()
355+
log_info("S: FAILURE (%r)", metadata)
352356
response = self.responses.popleft()
353357
response.complete = True
354-
response.on_failure(*fields)
358+
response.on_failure(metadata or {})
355359
else:
356360
raise ProtocolError("Unexpected response message with signature %02X" % signature)
357361

@@ -365,8 +369,7 @@ def close(self):
365369
""" Close the connection.
366370
"""
367371
if not self.closed:
368-
if __debug__:
369-
log_info("~~ [CLOSE]")
372+
log_info("~~ [CLOSE]")
370373
self.channel.socket.close()
371374
self.closed = True
372375

@@ -476,7 +479,7 @@ def connect(address, ssl_context=None, **config):
476479
# Establish a connection to the host and port specified
477480
# Catches refused connections see:
478481
# https://docs.python.org/2/library/errno.html
479-
if __debug__: log_info("~~ [CONNECT] %s", address)
482+
log_info("~~ [CONNECT] %s", address)
480483
try:
481484
s = create_connection(address)
482485
except SocketError as error:
@@ -488,7 +491,7 @@ def connect(address, ssl_context=None, **config):
488491
# Secure the connection if an SSL context has been provided
489492
if ssl_context and SSL_AVAILABLE:
490493
host, port = address
491-
if __debug__: log_info("~~ [SECURE] %s", host)
494+
log_info("~~ [SECURE] %s", host)
492495
try:
493496
s = ssl_context.wrap_socket(s, server_hostname=host if HAS_SNI else None)
494497
except SSLError as cause:
@@ -514,9 +517,9 @@ def connect(address, ssl_context=None, **config):
514517
# Send details of the protocol versions supported
515518
supported_versions = [1, 0, 0, 0]
516519
handshake = [MAGIC_PREAMBLE] + supported_versions
517-
if __debug__: log_info("C: [HANDSHAKE] 0x%X %r", MAGIC_PREAMBLE, supported_versions)
520+
log_info("C: [HANDSHAKE] 0x%X %r", MAGIC_PREAMBLE, supported_versions)
518521
data = b"".join(struct_pack(">I", num) for num in handshake)
519-
if __debug__: log_debug("C: b%r", data)
522+
log_debug("C: b%r", data)
520523
s.sendall(data)
521524

522525
# Handle the handshake response
@@ -531,15 +534,15 @@ def connect(address, ssl_context=None, **config):
531534
log_error("S: [CLOSE]")
532535
raise ProtocolError("Connection to %r closed without handshake response" % (address,))
533536
if data_size == 4:
534-
if __debug__: log_debug("S: b%r", data)
537+
log_debug("S: b%r", data)
535538
else:
536539
# Some other garbled data has been received
537540
log_error("S: @*#!")
538541
raise ProtocolError("Expected four byte handshake response, received %r instead" % data)
539542
agreed_version, = struct_unpack(">I", data)
540-
if __debug__: log_info("S: [HANDSHAKE] %d", agreed_version)
543+
log_info("S: [HANDSHAKE] %d", agreed_version)
541544
if agreed_version == 0:
542-
if __debug__: log_info("~~ [CLOSE]")
545+
log_info("~~ [CLOSE]")
543546
s.shutdown(SHUT_RDWR)
544547
s.close()
545548
elif agreed_version == 1:

neo4j/v1/packstream.py

Lines changed: 85 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -708,64 +708,12 @@ def unpack(self):
708708
return self.read(byte_size).decode(ENCODING)
709709

710710
# List
711-
elif marker_high == 0x90:
712-
size = marker & 0x0F
713-
return [unpack() for _ in range(size)]
714-
elif marker == 0xD4: # LIST_8:
715-
size = UNPACKED_UINT_8[self.read_bytes(1)]
716-
return [unpack() for _ in range(size)]
717-
elif marker == 0xD5: # LIST_16:
718-
size = UNPACKED_UINT_16[self.read_bytes(2)]
719-
return [unpack() for _ in range(size)]
720-
elif marker == 0xD6: # LIST_32:
721-
size = struct_unpack(UINT_32_STRUCT, self.read(4))[0]
722-
return [unpack() for _ in range(size)]
723-
elif marker == 0xD7: # LIST_STREAM:
724-
value = []
725-
item = None
726-
while item is not EndOfStream:
727-
item = unpack()
728-
if item is not EndOfStream:
729-
value.append(item)
730-
return value
711+
elif 0x90 <= marker <= 0x9F or 0xD4 <= marker <= 0xD7:
712+
return self._unpack_list(marker)
731713

732714
# Map
733-
elif marker_high == 0xA0:
734-
size = marker & 0x0F
735-
value = {}
736-
for _ in range(size):
737-
key = unpack()
738-
value[key] = unpack()
739-
return value
740-
elif marker == 0xD8: # MAP_8:
741-
size = UNPACKED_UINT_8[self.read_bytes(1)]
742-
value = {}
743-
for _ in range(size):
744-
key = unpack()
745-
value[key] = unpack()
746-
return value
747-
elif marker == 0xD9: # MAP_16:
748-
size = UNPACKED_UINT_16[self.read_bytes(2)]
749-
value = {}
750-
for _ in range(size):
751-
key = unpack()
752-
value[key] = unpack()
753-
return value
754-
elif marker == 0xDA: # MAP_32:
755-
size = struct_unpack(UINT_32_STRUCT, self.read(4))[0]
756-
value = {}
757-
for _ in range(size):
758-
key = unpack()
759-
value[key] = unpack()
760-
return value
761-
elif marker == 0xDB: # MAP_STREAM:
762-
value = {}
763-
key = None
764-
while key is not EndOfStream:
765-
key = unpack()
766-
if key is not EndOfStream:
767-
value[key] = unpack()
768-
return value
715+
elif 0xA0 <= marker <= 0xAF or 0xD8 <= marker <= 0xDB:
716+
return self._unpack_map(marker)
769717

770718
# Structure
771719
elif marker_high == 0xB0:
@@ -793,6 +741,87 @@ def unpack(self):
793741
else:
794742
raise RuntimeError("Unknown PackStream marker %02X" % marker)
795743

744+
def unpack_list(self):
745+
marker = self.read_marker()
746+
return self._unpack_list(marker)
747+
748+
def _unpack_list(self, marker):
749+
marker_high = marker & 0xF0
750+
unpack = self.unpack
751+
if marker_high == 0x90:
752+
size = marker & 0x0F
753+
if size == 0:
754+
return []
755+
elif size == 1:
756+
return [unpack()]
757+
else:
758+
return [unpack() for _ in range(size)]
759+
elif marker == 0xD4: # LIST_8:
760+
size = UNPACKED_UINT_8[self.read_bytes(1)]
761+
return [unpack() for _ in range(size)]
762+
elif marker == 0xD5: # LIST_16:
763+
size = UNPACKED_UINT_16[self.read_bytes(2)]
764+
return [unpack() for _ in range(size)]
765+
elif marker == 0xD6: # LIST_32:
766+
size = struct_unpack(UINT_32_STRUCT, self.read_bytes(4))[0]
767+
return [unpack() for _ in range(size)]
768+
elif marker == 0xD7: # LIST_STREAM:
769+
value = []
770+
item = None
771+
while item is not EndOfStream:
772+
item = unpack()
773+
if item is not EndOfStream:
774+
value.append(item)
775+
return value
776+
else:
777+
return None
778+
779+
def unpack_map(self):
780+
marker = self.read_marker()
781+
return self._unpack_map(marker)
782+
783+
def _unpack_map(self, marker):
784+
marker_high = marker & 0xF0
785+
unpack = self.unpack
786+
if marker_high == 0xA0:
787+
size = marker & 0x0F
788+
value = {}
789+
for _ in range(size):
790+
key = unpack()
791+
value[key] = unpack()
792+
return value
793+
elif marker == 0xD8: # MAP_8:
794+
size = UNPACKED_UINT_8[self.read_bytes(1)]
795+
value = {}
796+
for _ in range(size):
797+
key = unpack()
798+
value[key] = unpack()
799+
return value
800+
elif marker == 0xD9: # MAP_16:
801+
size = UNPACKED_UINT_16[self.read_bytes(2)]
802+
value = {}
803+
for _ in range(size):
804+
key = unpack()
805+
value[key] = unpack()
806+
return value
807+
elif marker == 0xDA: # MAP_32:
808+
size = struct_unpack(UINT_32_STRUCT, self.read_bytes(4))[0]
809+
value = {}
810+
for _ in range(size):
811+
key = unpack()
812+
value[key] = unpack()
813+
return value
814+
elif marker == 0xDB: # MAP_STREAM:
815+
value = {}
816+
key = None
817+
while key is not EndOfStream:
818+
key = unpack()
819+
if key is not EndOfStream:
820+
value[key] = unpack()
821+
return value
822+
else:
823+
return None
824+
796825
def unpack_structure_header(self):
797826
marker = self.read_marker()
798827
if marker == -1:

0 commit comments

Comments
 (0)