Skip to content

Commit 165946f

Browse files
committed
Configurable connection buffers capacity, descriptive error message
1 parent 42729cb commit 165946f

File tree

5 files changed

+43
-30
lines changed

5 files changed

+43
-30
lines changed

neo4j/bolt/connection.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
from neo4j.packstream import Packer, Unpacker
4343
from neo4j.util import import_best as _import_best
4444
from time import clock
45-
from neo4j.config import default_config, INFINITE, TRUST_ON_FIRST_USE
45+
from neo4j.config import get_config, INFINITE, TRUST_ON_FIRST_USE
4646

4747
ChunkedInputBuffer = _import_best("neo4j.bolt._io", "neo4j.bolt.io").ChunkedInputBuffer
4848
ChunkedOutputBuffer = _import_best("neo4j.bolt._io", "neo4j.bolt.io").ChunkedOutputBuffer
@@ -170,16 +170,19 @@ def __init__(self, address, sock, error_handler, **config):
170170
self.socket = sock
171171
self.error_handler = error_handler
172172
self.server = ServerInfo(SocketAddress.from_socket(sock))
173-
self.input_buffer = ChunkedInputBuffer()
174-
self.output_buffer = ChunkedOutputBuffer()
173+
self.input_buffer = ChunkedInputBuffer(get_config(config, "input_buffer_capacity"))
174+
self.output_buffer = ChunkedOutputBuffer(
175+
get_config(config, "output_buffer_capacity"),
176+
get_config(config, "output_buffer_max_chunk_size")
177+
)
175178
self.packer = Packer(self.output_buffer)
176179
self.unpacker = Unpacker()
177180
self.responses = deque()
178-
self._max_connection_lifetime = config.get("max_connection_lifetime", default_config["max_connection_lifetime"])
181+
self._max_connection_lifetime = get_config(config, "max_connection_lifetime")
179182
self._creation_timestamp = clock()
180183

181184
# Determine the user agent and ensure it is a Unicode value
182-
user_agent = config.get("user_agent", default_config["user_agent"])
185+
user_agent = get_config(config, "user_agent")
183186
if isinstance(user_agent, bytes):
184187
user_agent = user_agent.decode("UTF-8")
185188
self.user_agent = user_agent
@@ -328,14 +331,20 @@ def _fetch(self):
328331
return len(details), 1
329332

330333
def _receive(self):
334+
err_msg = None
331335
try:
332336
received = self.input_buffer.receive_message(self.socket, 8192)
337+
except BufferError:
338+
err_msg = "Overflow. Increase \"input_buffer_capacity\" for connection {!r}"
339+
received = False
333340
except SocketError:
334341
received = False
335342
if not received:
336343
self._defunct = True
337344
self.close()
338-
raise self.Error("Failed to read from defunct connection {!r}".format(self.server.address))
345+
if not err_msg:
346+
err_msg = "Failed to read from defunct connection {!r}"
347+
raise self.Error(err_msg.format(self.server.address))
339348

340349
def _unpack(self):
341350
unpacker = self.unpacker
@@ -405,8 +414,8 @@ def __init__(self, connector, connection_error_handler, **config):
405414
self.connections = {}
406415
self.lock = RLock()
407416
self.cond = Condition(self.lock)
408-
self._max_connection_pool_size = config.get("max_connection_pool_size", default_config["max_connection_pool_size"])
409-
self._connection_acquisition_timeout = config.get("connection_acquisition_timeout", default_config["connection_acquisition_timeout"])
417+
self._max_connection_pool_size = get_config(config, "max_connection_pool_size")
418+
self._connection_acquisition_timeout = get_config(config, "connection_acquisition_timeout")
410419

411420
def __enter__(self):
412421
return self
@@ -544,10 +553,10 @@ def connect(address, ssl_context=None, error_handler=None, **config):
544553
else:
545554
raise ValueError("Unsupported address {!r}".format(address))
546555
t = s.gettimeout()
547-
s.settimeout(config.get("connection_timeout", default_config["connection_timeout"]))
556+
s.settimeout(get_config(config, "connection_timeout"))
548557
s.connect(address)
549558
s.settimeout(t)
550-
s.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1 if config.get("keep_alive", default_config["keep_alive"]) else 0)
559+
s.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1 if get_config(config, "keep_alive") else 0)
551560
except SocketTimeout:
552561
if s:
553562
try:
@@ -586,7 +595,7 @@ def connect(address, ssl_context=None, error_handler=None, **config):
586595
s.close()
587596
raise ProtocolError("When using a secure socket, the server should always "
588597
"provide a certificate")
589-
trust = config.get("trust", default_config["trust"])
598+
trust = get_config(config, "trust")
590599
if trust == TRUST_ON_FIRST_USE:
591600
from neo4j.bolt.cert import PersonalCertificateStore
592601
store = PersonalCertificateStore()

neo4j/config.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,16 @@
6464
"connection_timeout": DEFAULT_CONNECTION_TIMEOUT,
6565
"keep_alive": True,
6666

67+
# Buffer capacity:
68+
"input_buffer_capacity": 524288,
69+
"output_buffer_capacity": 1048576,
70+
"output_buffer_max_chunk_size": 16384,
71+
6772
# Routing settings:
6873
"max_retry_time": DEFAULT_MAX_RETRY_TIME,
6974
"load_balancing_strategy": DEFAULT_LOAD_BALANCING_STRATEGY
7075
}
76+
77+
def get_config(config, key, if_none=None):
78+
return config.get(key, default_config.get(key)) or if_none
79+

neo4j/v1/api.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from neo4j.exceptions import ProtocolError, ServiceUnavailable
2929
from neo4j.compat import urlparse
3030
from neo4j.exceptions import CypherError, TransientError
31-
from neo4j.config import default_config
31+
from neo4j.config import get_config
3232

3333
from .exceptions import DriverError, SessionError, SessionExpired, TransactionError
3434

@@ -144,7 +144,7 @@ class Driver(object):
144144

145145
def __init__(self, pool, **config):
146146
self._pool = pool
147-
self._max_retry_time = config.get("max_retry_time", default_config["max_retry_time"])
147+
self._max_retry_time = get_config(config, "max_retry_time")
148148

149149
def __del__(self):
150150
self.close()
@@ -227,23 +227,18 @@ class Session(object):
227227
# :class:`.Transaction` should be carried out.
228228
_bookmarks = ()
229229

230-
# Default maximum time to keep retrying failed transactions.
231-
_max_retry_time = default_config["max_retry_time"]
232-
233230
_closed = False
234231

235232
def __init__(self, acquirer, access_mode, **parameters):
236233
self._acquirer = acquirer
237234
self._default_access_mode = access_mode
238-
for key, value in parameters.items():
239-
if key == "bookmark":
240-
self._bookmarks = [value] if value else []
241-
elif key == "bookmarks":
242-
self._bookmarks = value or []
243-
elif key == "max_retry_time":
244-
self._max_retry_time = value
245-
else:
246-
pass # for compatibility
235+
# Default maximum time to keep retrying failed transactions.
236+
self._max_retry_time = get_config(parameters, "max_retry_time")
237+
if "bookmark" in parameters:
238+
value = parameters["bookmark"]
239+
self._bookmarks = [value] if value else []
240+
elif "bookmarks" in parameters:
241+
self._bookmarks = parameters["bookmarks"] or []
247242

248243
def __del__(self):
249244
try:

neo4j/v1/routing.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from neo4j.v1.exceptions import SessionExpired
3232
from neo4j.v1.security import SecurityPlan
3333
from neo4j.v1.session import BoltSession
34-
from neo4j.config import default_config, LOAD_BALANCING_STRATEGY_LEAST_CONNECTED, LOAD_BALANCING_STRATEGY_ROUND_ROBIN
34+
from neo4j.config import get_config, LOAD_BALANCING_STRATEGY_LEAST_CONNECTED, LOAD_BALANCING_STRATEGY_ROUND_ROBIN
3535

3636

3737
class OrderedSet(MutableSet):
@@ -163,7 +163,7 @@ class LoadBalancingStrategy(object):
163163

164164
@classmethod
165165
def build(cls, connection_pool, **config):
166-
load_balancing_strategy = config.get("load_balancing_strategy", default_config["load_balancing_strategy"])
166+
load_balancing_strategy = get_config(config, "load_balancing_strategy")
167167
if load_balancing_strategy == LOAD_BALANCING_STRATEGY_LEAST_CONNECTED:
168168
return LeastConnectedLoadBalancingStrategy(connection_pool)
169169
elif load_balancing_strategy == LOAD_BALANCING_STRATEGY_ROUND_ROBIN:

neo4j/v1/security.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from warnings import warn
2222

2323
from neo4j.compat.ssl import SSL_AVAILABLE, SSLContext, PROTOCOL_SSLv23, OP_NO_SSLv2, CERT_REQUIRED
24-
from neo4j.config import default_config, TRUST_ALL_CERTIFICATES, TRUST_CUSTOM_CA_SIGNED_CERTIFICATES, TRUST_ON_FIRST_USE, TRUST_SIGNED_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES
24+
from neo4j.config import get_config, TRUST_ALL_CERTIFICATES, TRUST_CUSTOM_CA_SIGNED_CERTIFICATES, TRUST_ON_FIRST_USE, TRUST_SIGNED_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES
2525

2626
ENCRYPTION_OFF = 0
2727
ENCRYPTION_ON = 1
@@ -49,10 +49,10 @@ class SecurityPlan(object):
4949

5050
@classmethod
5151
def build(cls, **config):
52-
encrypted = config.get("encrypted", default_config["encrypted"])
52+
encrypted = get_config(config, "encrypted")
5353
if encrypted is None:
5454
encrypted = _encryption_default()
55-
trust = config.get("trust", default_config["trust"])
55+
trust = get_config(config, "trust")
5656
if encrypted:
5757
if not SSL_AVAILABLE:
5858
raise RuntimeError("Bolt over TLS is only available in Python 2.7.9+ and "

0 commit comments

Comments
 (0)