Skip to content

Commit 4e1e748

Browse files
committed
Improve how connection pools operate in forked/child proceeses.
Sometimes a process with an active connection to Redis forks and creates child processes taht also want to talk to Redis. Prior to this change there were a number of potential conflicts that could cause this to fail. Retrieving a connection from the pool and releasing a connection back to the pool check the current proceeses PID. If it's different than the PID that created the pool, reset() is called to get a fresh set of connections for the current process. However in doing so, pool.disconnect() was caused which closes the file descriptors that the parent may still be using. Further when the available_connections and in_use_connections lists are reset, all of those connections inherited from the parent are GC'd and the connection's `__del__` was called, which also closed the socket and file descriptor. This change prevents pool.disconnect() from being called when a pid is changed. It also removes the `__del__` destructor from connections. Neither of these are necessary or practical. Child processes still reset() their copy of the pool when first accessed causing their own connections to be created. `ConnectionPool.disconnect()` now checks the current process ID so that a child or parent can't disconnect the other's connections. Additionally, `Connection.disconnect()` now checks the current process ID and only calls `socket.shutdown()` if `disconnect()` is called by the same process that created the connection. This allows for a child process that inherited a connection to call `Connection.disconnect()` and not shutdown the parent's copy of the socket. Fixes #863 Fixes #784 Fixes #732 Fixes #1085 Fixes #504
1 parent e24e977 commit 4e1e748

File tree

2 files changed

+49
-29
lines changed

2 files changed

+49
-29
lines changed

redis/connection.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -471,12 +471,6 @@ def __init__(self, host='localhost', port=6379, db=0, password=None,
471471
def __repr__(self):
472472
return self.description_format % self._description_args
473473

474-
def __del__(self):
475-
try:
476-
self.disconnect()
477-
except Exception:
478-
pass
479-
480474
def register_connect_callback(self, callback):
481475
self._connect_callbacks.append(callback)
482476

@@ -580,7 +574,8 @@ def disconnect(self):
580574
if self._sock is None:
581575
return
582576
try:
583-
self._sock.shutdown(socket.SHUT_RDWR)
577+
if os.getpid() == self.pid:
578+
self._sock.shutdown(socket.SHUT_RDWR)
584579
self._sock.close()
585580
except socket.error:
586581
pass
@@ -973,7 +968,6 @@ def _checkpid(self):
973968
# another thread already did the work while we waited
974969
# on the lock.
975970
return
976-
self.disconnect()
977971
self.reset()
978972

979973
def get_connection(self, command_name, *keys, **options):
@@ -1012,6 +1006,7 @@ def release(self, connection):
10121006

10131007
def disconnect(self):
10141008
"Disconnects all connections in the pool"
1009+
self._checkpid()
10151010
all_conns = chain(self._available_connections,
10161011
self._in_use_connections)
10171012
for connection in all_conns:
@@ -1133,5 +1128,6 @@ def release(self, connection):
11331128

11341129
def disconnect(self):
11351130
"Disconnects all connections in the pool."
1131+
self._checkpid()
11361132
for connection in self._connections:
11371133
connection.disconnect()

tests/test_multiprocessing.py

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,17 @@ class TestMultiprocessing(object):
1818
# Test connection sharing between forks.
1919
# See issue #1085 for details.
2020

21-
def test_connection(self):
21+
def test_close_connection_in_child(self):
22+
"""
23+
A connection owned by a parent and closed by a child doesn't
24+
destroy the file descriptors so a parent can still use it.
25+
"""
2226
conn = Connection()
23-
assert conn.send_command('ping') is None
27+
conn.send_command('ping')
2428
assert conn.read_response() == b'PONG'
2529

2630
def target(conn):
27-
assert conn.send_command('ping') is None
31+
conn.send_command('ping')
2832
assert conn.read_response() == b'PONG'
2933
conn.disconnect()
3034

@@ -33,20 +37,29 @@ def target(conn):
3337
proc.join(3)
3438
assert proc.exitcode is 0
3539

36-
# Check that connection is still alive after fork process has exited.
37-
with pytest.raises(ConnectionError):
38-
assert conn.send_command('ping') is None
39-
assert conn.read_response() == b'PONG'
40+
# The connection was created in the parent but disconnected in the
41+
# child. The child called socket.close() but did not call
42+
# socket.shutdown() because it wasn't the "owning" process.
43+
# Therefore the connection still works in the parent.
44+
conn.send_command('ping')
45+
assert conn.read_response() == b'PONG'
4046

41-
def test_close_connection_in_main(self):
47+
def test_close_connection_in_parent(self):
48+
"""
49+
A connection owned by a parent is unusable by a child if the parent
50+
(the owning process) closes the connection.
51+
"""
4252
conn = Connection()
43-
assert conn.send_command('ping') is None
53+
conn.send_command('ping')
4454
assert conn.read_response() == b'PONG'
4555

4656
def target(conn, ev):
4757
ev.wait()
48-
assert conn.send_command('ping') is None
49-
assert conn.read_response() == b'PONG'
58+
# the parent closed the connection. because it also created the
59+
# connection, the connection is shutdown and the child
60+
# cannot use it.
61+
with pytest.raises(ConnectionError):
62+
conn.send_command('ping')
5063

5164
ev = multiprocessing.Event()
5265
proc = multiprocessing.Process(target=target, args=(conn, ev))
@@ -56,21 +69,27 @@ def target(conn, ev):
5669
ev.set()
5770

5871
proc.join(3)
59-
assert proc.exitcode is 1
72+
assert proc.exitcode is 0
6073

6174
@pytest.mark.parametrize('max_connections', [1, 2, None])
6275
def test_pool(self, max_connections):
76+
"""
77+
A child will create its own connections when using a pool created
78+
by a parent.
79+
"""
6380
pool = ConnectionPool.from_url('redis://localhost',
6481
max_connections=max_connections)
6582

6683
conn = pool.get_connection('ping')
84+
main_conn_pid = conn.pid
6785
with exit_callback(pool.release, conn):
68-
assert conn.send_command('ping') is None
86+
conn.send_command('ping')
6987
assert conn.read_response() == b'PONG'
7088

7189
def target(pool):
7290
with exit_callback(pool.disconnect):
7391
conn = pool.get_connection('ping')
92+
assert conn.pid != main_conn_pid
7493
with exit_callback(pool.release, conn):
7594
assert conn.send_command('ping') is None
7695
assert conn.read_response() == b'PONG'
@@ -80,15 +99,19 @@ def target(pool):
8099
proc.join(3)
81100
assert proc.exitcode is 0
82101

83-
# Check that connection is still alive after fork process has exited.
102+
# Check that connection is still alive after fork process has exited
103+
# and disconnected the connections in its pool
84104
conn = pool.get_connection('ping')
85105
with exit_callback(pool.release, conn):
86-
with pytest.raises(ConnectionError):
87-
assert conn.send_command('ping') is None
88-
assert conn.read_response() == b'PONG'
106+
assert conn.send_command('ping') is None
107+
assert conn.read_response() == b'PONG'
89108

90109
@pytest.mark.parametrize('max_connections', [1, 2, None])
91110
def test_close_pool_in_main(self, max_connections):
111+
"""
112+
A child process that uses the same pool as its parent isn't affected
113+
when the parent disconnects all connections within the pool.
114+
"""
92115
pool = ConnectionPool.from_url('redis://localhost',
93116
max_connections=max_connections)
94117

@@ -115,12 +138,13 @@ def target(pool, disconnect_event):
115138
proc.join(3)
116139
assert proc.exitcode is 0
117140

118-
def test_redis(self, r):
141+
def test_redis_client(self, r):
142+
"A redis client created in a parent can also be used in a child"
119143
assert r.ping() is True
120144

121-
def target(redis):
122-
assert redis.ping() is True
123-
del redis
145+
def target(client):
146+
assert client.ping() is True
147+
del client
124148

125149
proc = multiprocessing.Process(target=target, args=(r,))
126150
proc.start()

0 commit comments

Comments
 (0)