Skip to content

Commit ca0b9ed

Browse files
authored
Merge pull request #749 from robsdedude/bolt-utc-datetimes-4.4
UTC encoding patch for Bolt 4.4 and 4.3
2 parents 923bc6d + 52f54df commit ca0b9ed

19 files changed

+571
-75
lines changed

neo4j/data.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
hydrate_date, dehydrate_date,
3535
hydrate_time, dehydrate_time,
3636
hydrate_datetime, dehydrate_datetime,
37+
hydrate_datetime_v2, dehydrate_datetime_v2,
3738
hydrate_duration, dehydrate_duration, dehydrate_timedelta,
3839
)
3940

@@ -268,7 +269,7 @@ def transform(self, x):
268269
class DataHydrator:
269270
# TODO: extend DataTransformer
270271

271-
def __init__(self):
272+
def __init__(self, patch_utc=False):
272273
super(DataHydrator, self).__init__()
273274
self.graph = Graph()
274275
self.graph_hydrator = Graph.Hydrator(self.graph)
@@ -282,11 +283,19 @@ def __init__(self):
282283
b"D": hydrate_date,
283284
b"T": hydrate_time, # time zone offset
284285
b"t": hydrate_time, # no time zone
285-
b"F": hydrate_datetime, # time zone offset
286-
b"f": hydrate_datetime, # time zone name
287286
b"d": hydrate_datetime, # no time zone
288287
b"E": hydrate_duration,
289288
}
289+
if not patch_utc:
290+
self.hydration_functions.update({
291+
b"F": hydrate_datetime, # time zone offset
292+
b"f": hydrate_datetime, # time zone name
293+
})
294+
else:
295+
self.hydration_functions.update({
296+
b"I": hydrate_datetime_v2, # time zone offset
297+
b"i": hydrate_datetime_v2, # time zone name
298+
})
290299

291300
def hydrate(self, values):
292301
""" Convert PackStream values into native values.
@@ -320,10 +329,10 @@ class DataDehydrator:
320329
# TODO: extend DataTransformer
321330

322331
@classmethod
323-
def fix_parameters(cls, parameters):
332+
def fix_parameters(cls, parameters, patch_utc=False):
324333
if not parameters:
325334
return {}
326-
dehydrator = cls()
335+
dehydrator = cls(patch_utc=patch_utc)
327336
try:
328337
dehydrated, = dehydrator.dehydrate([parameters])
329338
except TypeError as error:
@@ -332,19 +341,28 @@ def fix_parameters(cls, parameters):
332341
else:
333342
return dehydrated
334343

335-
def __init__(self):
344+
def __init__(self, patch_utc=False):
336345
self.dehydration_functions = {}
337346
self.dehydration_functions.update({
338347
Point: dehydrate_point,
339348
Date: dehydrate_date,
340349
date: dehydrate_date,
341350
Time: dehydrate_time,
342351
time: dehydrate_time,
343-
DateTime: dehydrate_datetime,
344-
datetime: dehydrate_datetime,
345352
Duration: dehydrate_duration,
346353
timedelta: dehydrate_timedelta,
347354
})
355+
if not patch_utc:
356+
self.dehydration_functions.update({
357+
DateTime: dehydrate_datetime,
358+
datetime: dehydrate_datetime,
359+
})
360+
else:
361+
self.dehydration_functions.update({
362+
DateTime: dehydrate_datetime_v2,
363+
datetime: dehydrate_datetime_v2,
364+
})
365+
348366
# Allow dehydration from any direct Point subclass
349367
self.dehydration_functions.update({cls: dehydrate_point for cls in Point.__subclasses__()})
350368

neo4j/io/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=No
147147
# configuration hint that exists. Therefore, all hints can be stored at
148148
# connection level. This might change in the future.
149149
self.configuration_hints = {}
150+
# back ported protocol patches negotiated with the server
151+
self.bolt_patches = set()
150152
self.outbox = Outbox()
151153
self.inbox = Inbox(self.socket, on_error=self._set_defunct_read)
152154
self.packer = Packer(self.outbox)

neo4j/io/_bolt4.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,20 @@ class Bolt4x3(Bolt4x2):
368368

369369
PROTOCOL_VERSION = Version(4, 3)
370370

371+
def get_base_headers(self):
372+
""" Bolt 4.1 passes the routing context, originally taken from
373+
the URI, into the connection initialisation message. This
374+
enables server-side routing to propagate the same behaviour
375+
through its driver.
376+
"""
377+
headers = {
378+
"user_agent": self.user_agent,
379+
"patch_bolt": ["utc"]
380+
}
381+
if self.routing_context is not None:
382+
headers["routing"] = self.routing_context
383+
return headers
384+
371385
def route(self, database=None, imp_user=None, bookmarks=None):
372386
if imp_user is not None:
373387
raise ConfigurationError(
@@ -394,6 +408,7 @@ def route(self, database=None, imp_user=None, bookmarks=None):
394408

395409
def hello(self):
396410
def on_success(metadata):
411+
# configuration hints
397412
self.configuration_hints.update(metadata.pop("hints", {}))
398413
self.server_info.update(metadata)
399414
if "connection.recv_timeout_seconds" in self.configuration_hints:
@@ -407,6 +422,8 @@ def on_success(metadata):
407422
"connection.recv_timeout_seconds (%r). Make sure "
408423
"the server and network is set up correctly.",
409424
self.local_port, recv_timeout)
425+
# bolt patch handshake
426+
self.bolt_patches.update(set(metadata.pop("patch_bolt", ())))
410427

411428
headers = self.get_base_headers()
412429
headers.update(self.auth_dict)

neo4j/time/hydration.py

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,34 @@ def hydrate_datetime(seconds, nanoseconds, tz=None):
132132
return zone.localize(t)
133133

134134

135+
def hydrate_datetime_v2(seconds, nanoseconds, tz=None):
136+
""" Hydrator for `DateTime` and `LocalDateTime` values.
137+
138+
:param seconds:
139+
:param nanoseconds:
140+
:param tz:
141+
:return: datetime
142+
"""
143+
import pytz
144+
145+
minutes, seconds = map(int, divmod(seconds, 60))
146+
hours, minutes = map(int, divmod(minutes, 60))
147+
days, hours = map(int, divmod(hours, 24))
148+
t = DateTime.combine(
149+
Date.from_ordinal(get_date_unix_epoch_ordinal() + days),
150+
Time(hours, minutes, seconds, nanoseconds)
151+
)
152+
if tz is None:
153+
return t
154+
if isinstance(tz, int):
155+
tz_offset_minutes, tz_offset_seconds = divmod(tz, 60)
156+
zone = pytz.FixedOffset(tz_offset_minutes)
157+
else:
158+
zone = pytz.timezone(tz)
159+
t = t.replace(tzinfo=pytz.UTC)
160+
return t.as_timezone(zone)
161+
162+
135163
def dehydrate_datetime(value):
136164
""" Dehydrator for `datetime` values.
137165
@@ -167,8 +195,57 @@ def seconds_and_nanoseconds(dt):
167195
else:
168196
# with time offset
169197
seconds, nanoseconds = seconds_and_nanoseconds(value)
170-
return Structure(b"F", seconds, nanoseconds,
171-
int(tz.utcoffset(value).total_seconds()))
198+
offset = tz.utcoffset(value)
199+
if offset.microseconds:
200+
raise ValueError("Bolt protocol does not support sub-second "
201+
"UTC offsets.")
202+
offset_seconds = offset.days * 86400 + offset.seconds
203+
return Structure(b"F", seconds, nanoseconds, offset_seconds)
204+
205+
206+
def dehydrate_datetime_v2(value):
207+
""" Dehydrator for `datetime` values.
208+
209+
:param value:
210+
:type value: datetime
211+
:return:
212+
"""
213+
214+
import pytz
215+
216+
def seconds_and_nanoseconds(dt):
217+
if isinstance(dt, datetime):
218+
dt = DateTime.from_native(dt)
219+
dt = dt.astimezone(pytz.UTC)
220+
utc_epoch = DateTime(1970, 1, 1, tzinfo=pytz.UTC)
221+
dt_clock_time = dt.to_clock_time()
222+
utc_epoch_clock_time = utc_epoch.to_clock_time()
223+
t = dt_clock_time - utc_epoch_clock_time
224+
return t.seconds, t.nanoseconds
225+
226+
tz = value.tzinfo
227+
if tz is None:
228+
# without time zone
229+
value = pytz.UTC.localize(value)
230+
seconds, nanoseconds = seconds_and_nanoseconds(value)
231+
return Structure(b"d", seconds, nanoseconds)
232+
elif hasattr(tz, "zone") and tz.zone and isinstance(tz.zone, str):
233+
# with named pytz time zone
234+
seconds, nanoseconds = seconds_and_nanoseconds(value)
235+
return Structure(b"i", seconds, nanoseconds, tz.zone)
236+
elif hasattr(tz, "key") and tz.key and isinstance(tz.key, str):
237+
# with named zoneinfo (Python 3.9+) time zone
238+
seconds, nanoseconds = seconds_and_nanoseconds(value)
239+
return Structure(b"i", seconds, nanoseconds, tz.key)
240+
else:
241+
# with time offset
242+
seconds, nanoseconds = seconds_and_nanoseconds(value)
243+
offset = tz.utcoffset(value)
244+
if offset.microseconds:
245+
raise ValueError("Bolt protocol does not support sub-second "
246+
"UTC offsets.")
247+
offset_seconds = offset.days * 86400 + offset.seconds
248+
return Structure(b"I", seconds, nanoseconds, offset_seconds)
172249

173250

174251
def hydrate_duration(months, days, seconds, nanoseconds):

neo4j/work/result.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,10 @@ def _run(self, query, parameters, db, imp_user, access_mode, bookmarks,
7575
query_metadata = getattr(query, "metadata", None)
7676
query_timeout = getattr(query, "timeout", None)
7777

78-
parameters = DataDehydrator.fix_parameters(dict(parameters or {}, **kwparameters))
78+
parameters = DataDehydrator.fix_parameters(
79+
dict(parameters or {}, **kwparameters),
80+
patch_utc="utc" in self._connection.bolt_patches
81+
)
7982

8083
self._metadata = {
8184
"query": query_text,

neo4j/work/simple.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ def run(self, query, parameters=None, **kwparameters):
206206
protocol_version = cx.PROTOCOL_VERSION
207207
server_info = cx.server_info
208208

209-
hydrant = DataHydrator()
209+
hydrant = DataHydrator(patch_utc="utc" in cx.bolt_patches)
210210

211211
self._autoResult = Result(
212212
cx, hydrant, self._config.fetch_size, self._result_closed,

neo4j/work/transaction.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,12 @@ def run(self, query, parameters=None, **kwparameters):
127127
# have any qid to fetch in batches.
128128
self._results[-1]._buffer_all()
129129

130+
hydrant = DataHydrator(
131+
patch_utc="utc" in self._connection.bolt_patches
132+
)
133+
130134
result = Result(
131-
self._connection, DataHydrator(), self._fetch_size,
135+
self._connection, hydrant, self._fetch_size,
132136
self._result_on_closed_handler,
133137
self._error_handler
134138
)

testkit/build.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
"""
2-
Executed in Go driver container.
2+
Executed in driver container.
33
Responsible for building driver and test backend.
44
"""
5+
6+
57
import subprocess
8+
import sys
69

710

811
def run(args, env=None):
9-
subprocess.run(args, universal_newlines=True, stderr=subprocess.STDOUT,
10-
check=True, env=env)
12+
subprocess.run(args, universal_newlines=True, stdout=sys.stdout,
13+
stderr=sys.stderr, check=True, env=env)
1114

1215

1316
if __name__ == "__main__":
1417
run(["python", "setup.py", "build"])
18+
run(["python", "-m", "pip", "install", "-U", "pip"])
19+
run(["python", "-m", "pip", "install", "-Ur",
20+
"testkitbackend/requirements.txt"])

testkitbackend/_driver_logger.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright (c) "Neo4j"
2+
# Neo4j Sweden AB [http://neo4j.com]
3+
#
4+
# This file is part of Neo4j.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
19+
import io
20+
import logging
21+
import sys
22+
23+
24+
buffer_handler = logging.StreamHandler(io.StringIO())
25+
buffer_handler.setLevel(logging.DEBUG)
26+
27+
handler = logging.StreamHandler(sys.stdout)
28+
handler.setLevel(logging.DEBUG)
29+
logging.getLogger("neo4j").addHandler(handler)
30+
logging.getLogger("neo4j").addHandler(buffer_handler)
31+
logging.getLogger("neo4j").setLevel(logging.DEBUG)
32+
33+
log = logging.getLogger("testkitbackend")
34+
log.addHandler(handler)
35+
log.setLevel(logging.DEBUG)
36+
37+
38+
__all__ = [
39+
"buffer_handler",
40+
"log",
41+
]

0 commit comments

Comments
 (0)