Skip to content

Commit 4287963

Browse files
authored
better thread-safety for ConnectionPool (#1270)
Better thread and fork safety for ConnectionPool and BlockingConnectionPool
1 parent 09a17ea commit 4287963

File tree

5 files changed

+155
-49
lines changed

5 files changed

+155
-49
lines changed

CHANGES

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@
2020
for backwards compatibility. #1196
2121
* Slightly optimized command packing. Thanks @Deneby67. #1255
2222
* Added support for the TYPE argument to SCAN. Thanks @netocp. #1220
23+
* Better thread and fork safety in ConnectionPool and
24+
BlockingConnectionPool. Added better locking to synchronize critical
25+
sections rather than relying on CPython-specific implementation details
26+
relating to atomic operations. Adjusted how the pools identify and
27+
deal with a fork. Added a ChildDeadlockedError exception that is
28+
raised by child processes in the very unlikely chance that a deadlock
29+
is encountered. Thanks @gmbnomis, @mdellweg, @yht804421715. #1270,
30+
#1138, #1178, #906, #1262
2331
* 3.3.11
2432
* Further fix for the SSLError -> TimeoutError mapping to work
2533
on obscure releases of Python 2.7.

redis/__init__.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from redis.exceptions import (
1111
AuthenticationError,
1212
BusyLoadingError,
13+
ChildDeadlockedError,
1314
ConnectionError,
1415
DataError,
1516
InvalidResponse,
@@ -33,9 +34,24 @@ def int_or_str(value):
3334
VERSION = tuple(map(int_or_str, __version__.split('.')))
3435

3536
__all__ = [
36-
'Redis', 'StrictRedis', 'ConnectionPool', 'BlockingConnectionPool',
37-
'Connection', 'SSLConnection', 'UnixDomainSocketConnection', 'from_url',
38-
'AuthenticationError', 'BusyLoadingError', 'ConnectionError', 'DataError',
39-
'InvalidResponse', 'PubSubError', 'ReadOnlyError', 'RedisError',
40-
'ResponseError', 'TimeoutError', 'WatchError'
37+
'AuthenticationError',
38+
'BlockingConnectionPool',
39+
'BusyLoadingError',
40+
'ChildDeadlockedError',
41+
'Connection',
42+
'ConnectionError',
43+
'ConnectionPool',
44+
'DataError',
45+
'from_url',
46+
'InvalidResponse',
47+
'PubSubError',
48+
'ReadOnlyError',
49+
'Redis',
50+
'RedisError',
51+
'ResponseError',
52+
'SSLConnection',
53+
'StrictRedis',
54+
'TimeoutError',
55+
'UnixDomainSocketConnection',
56+
'WatchError',
4157
]

redis/connection.py

Lines changed: 120 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from redis.exceptions import (
1919
AuthenticationError,
2020
BusyLoadingError,
21+
ChildDeadlockedError,
2122
ConnectionError,
2223
DataError,
2324
ExecAbortError,
@@ -1069,6 +1070,15 @@ def __init__(self, connection_class=Connection, max_connections=None,
10691070
self.connection_kwargs = connection_kwargs
10701071
self.max_connections = max_connections
10711072

1073+
# a lock to protect the critical section in _checkpid().
1074+
# this lock is acquired when the process id changes, such as
1075+
# after a fork. during this time, multiple threads in the child
1076+
# process could attempt to acquire this lock. the first thread
1077+
# to acquire the lock will reset the data structures and lock
1078+
# object of this pool. subsequent threads acquiring this lock
1079+
# will notice the first thread already did the work and simply
1080+
# release the lock.
1081+
self._fork_lock = threading.Lock()
10721082
self.reset()
10731083

10741084
def __repr__(self):
@@ -1084,50 +1094,107 @@ def __eq__(self, other):
10841094
)
10851095

10861096
def reset(self):
1087-
self.pid = os.getpid()
1097+
self._lock = threading.RLock()
10881098
self._created_connections = 0
10891099
self._available_connections = []
10901100
self._in_use_connections = set()
1091-
self._check_lock = threading.Lock()
1101+
1102+
# this must be the last operation in this method. while reset() is
1103+
# called when holding _fork_lock, other threads in this process
1104+
# can call _checkpid() which compares self.pid and os.getpid() without
1105+
# holding any lock (for performance reasons). keeping this assignment
1106+
# as the last operation ensures that those other threads will also
1107+
# notice a pid difference and block waiting for the first thread to
1108+
# release _fork_lock. when each of these threads eventually acquire
1109+
# _fork_lock, they will notice that another thread already called
1110+
# reset() and they will immediately release _fork_lock and continue on.
1111+
self.pid = os.getpid()
10921112

10931113
def _checkpid(self):
1114+
# _checkpid() attempts to keep ConnectionPool fork-safe on modern
1115+
# systems. this is called by all ConnectionPool methods that
1116+
# manipulate the pool's state such as get_connection() and release().
1117+
#
1118+
# _checkpid() determines whether the process has forked by comparing
1119+
# the current process id to the process id saved on the ConnectionPool
1120+
# instance. if these values are the same, _checkpid() simply returns.
1121+
#
1122+
# when the process ids differ, _checkpid() assumes that the process
1123+
# has forked and that we're now running in the child process. the child
1124+
# process cannot use the parent's file descriptors (e.g., sockets).
1125+
# therefore, when _checkpid() sees the process id change, it calls
1126+
# reset() in order to reinitialize the child's ConnectionPool. this
1127+
# will cause the child to make all new connection objects.
1128+
#
1129+
# _checkpid() is protected by self._fork_lock to ensure that multiple
1130+
# threads in the child process do not call reset() multiple times.
1131+
#
1132+
# there is an extremely small chance this could fail in the following
1133+
# scenario:
1134+
# 1. process A calls _checkpid() for the first time and acquires
1135+
# self._fork_lock.
1136+
# 2. while holding self._fork_lock, process A forks (the fork()
1137+
# could happen in a different thread owned by process A)
1138+
# 3. process B (the forked child process) inherits the
1139+
# ConnectionPool's state from the parent. that state includes
1140+
# a locked _fork_lock. process B will not be notified when
1141+
# process A releases the _fork_lock and will thus never be
1142+
# able to acquire the _fork_lock.
1143+
#
1144+
# to mitigate this possible deadlock, _checkpid() will only wait 5
1145+
# seconds to acquire _fork_lock. if _fork_lock cannot be acquired in
1146+
# that time it is assumed that the child is deadlocked and a
1147+
# redis.ChildDeadlockedError error is raised.
10941148
if self.pid != os.getpid():
1095-
with self._check_lock:
1096-
if self.pid == os.getpid():
1097-
# another thread already did the work while we waited
1098-
# on the lock.
1099-
return
1100-
self.reset()
1149+
# python 2.7 doesn't support a timeout option to lock.acquire()
1150+
# we have to mimic lock timeouts ourselves.
1151+
timeout_at = time() + 5
1152+
acquired = False
1153+
while time() < timeout_at:
1154+
acquired = self._fork_lock.acquire(False)
1155+
if acquired:
1156+
break
1157+
if not acquired:
1158+
raise ChildDeadlockedError
1159+
# reset() the instance for the new process if another thread
1160+
# hasn't already done so
1161+
try:
1162+
if self.pid != os.getpid():
1163+
self.reset()
1164+
finally:
1165+
self._fork_lock.release()
11011166

11021167
def get_connection(self, command_name, *keys, **options):
11031168
"Get a connection from the pool"
11041169
self._checkpid()
1105-
try:
1106-
connection = self._available_connections.pop()
1107-
except IndexError:
1108-
connection = self.make_connection()
1109-
self._in_use_connections.add(connection)
1110-
try:
1111-
# ensure this connection is connected to Redis
1112-
connection.connect()
1113-
# connections that the pool provides should be ready to send
1114-
# a command. if not, the connection was either returned to the
1115-
# pool before all data has been read or the socket has been
1116-
# closed. either way, reconnect and verify everything is good.
1170+
with self._lock:
11171171
try:
1118-
if connection.can_read():
1119-
raise ConnectionError('Connection has data')
1120-
except ConnectionError:
1121-
connection.disconnect()
1172+
connection = self._available_connections.pop()
1173+
except IndexError:
1174+
connection = self.make_connection()
1175+
self._in_use_connections.add(connection)
1176+
try:
1177+
# ensure this connection is connected to Redis
11221178
connection.connect()
1123-
if connection.can_read():
1124-
raise ConnectionError('Connection not ready')
1125-
except: # noqa: E722
1126-
# release the connection back to the pool so that we don't leak it
1127-
self.release(connection)
1128-
raise
1129-
1130-
return connection
1179+
# connections that the pool provides should be ready to send
1180+
# a command. if not, the connection was either returned to the
1181+
# pool before all data has been read or the socket has been
1182+
# closed. either way, reconnect and verify everything is good.
1183+
try:
1184+
if connection.can_read():
1185+
raise ConnectionError('Connection has data')
1186+
except ConnectionError:
1187+
connection.disconnect()
1188+
connection.connect()
1189+
if connection.can_read():
1190+
raise ConnectionError('Connection not ready')
1191+
except: # noqa: E722
1192+
# release the connection back to the pool so that we don't
1193+
# leak it
1194+
self.release(connection)
1195+
raise
1196+
1197+
return connection
11311198

11321199
def get_encoder(self):
11331200
"Return an encoder based on encoding settings"
@@ -1148,18 +1215,20 @@ def make_connection(self):
11481215
def release(self, connection):
11491216
"Releases the connection back to the pool"
11501217
self._checkpid()
1151-
if connection.pid != self.pid:
1152-
return
1153-
self._in_use_connections.remove(connection)
1154-
self._available_connections.append(connection)
1218+
with self._lock:
1219+
if connection.pid != self.pid:
1220+
return
1221+
self._in_use_connections.remove(connection)
1222+
self._available_connections.append(connection)
11551223

11561224
def disconnect(self):
11571225
"Disconnects all connections in the pool"
11581226
self._checkpid()
1159-
all_conns = chain(self._available_connections,
1160-
self._in_use_connections)
1161-
for connection in all_conns:
1162-
connection.disconnect()
1227+
with self._lock:
1228+
all_conns = chain(self._available_connections,
1229+
self._in_use_connections)
1230+
for connection in all_conns:
1231+
connection.disconnect()
11631232

11641233

11651234
class BlockingConnectionPool(ConnectionPool):
@@ -1207,9 +1276,6 @@ def __init__(self, max_connections=50, timeout=20,
12071276
**connection_kwargs)
12081277

12091278
def reset(self):
1210-
self.pid = os.getpid()
1211-
self._check_lock = threading.Lock()
1212-
12131279
# Create and fill up a thread safe queue with ``None`` values.
12141280
self.pool = self.queue_class(self.max_connections)
12151281
while True:
@@ -1222,6 +1288,17 @@ def reset(self):
12221288
# disconnect them later.
12231289
self._connections = []
12241290

1291+
# this must be the last operation in this method. while reset() is
1292+
# called when holding _fork_lock, other threads in this process
1293+
# can call _checkpid() which compares self.pid and os.getpid() without
1294+
# holding any lock (for performance reasons). keeping this assignment
1295+
# as the last operation ensures that those other threads will also
1296+
# notice a pid difference and block waiting for the first thread to
1297+
# release _fork_lock. when each of these threads eventually acquire
1298+
# _fork_lock, they will notice that another thread already called
1299+
# reset() and they will immediately release _fork_lock and continue on.
1300+
self.pid = os.getpid()
1301+
12251302
def make_connection(self):
12261303
"Make a fresh connection."
12271304
connection = self.connection_class(**self.connection_kwargs)

redis/exceptions.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,8 @@ class LockError(RedisError, ValueError):
6767
class LockNotOwnedError(LockError):
6868
"Error trying to extend or release a lock that is (no longer) owned"
6969
pass
70+
71+
72+
class ChildDeadlockedError(Exception):
73+
"Error indicating that a child process is deadlocked after a fork()"
74+
pass

tests/test_multiprocessing.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class TestMultiprocessing(object):
2222
# See issue #1085 for details.
2323

2424
# use a multi-connection client as that's the only type that is
25-
# actuall fork/process-safe
25+
# actually fork/process-safe
2626
@pytest.fixture()
2727
def r(self, request):
2828
return _get_client(

0 commit comments

Comments
 (0)