Skip to content

Commit af57bf2

Browse files
Reset client queue upon disconnection (Fixes #414) (#415)
* Reset client queue upon disconnection (Fixes #414) * fix queue mocks in client unit tests
1 parent fa99e01 commit af57bf2

File tree

4 files changed

+106
-114
lines changed

4 files changed

+106
-114
lines changed

src/engineio/async_client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,12 @@ def create_event(self):
214214

215215
async def _reset(self):
216216
super()._reset()
217+
while True: # pragma: no cover
218+
try:
219+
self.queue.get_nowait()
220+
self.queue.task_done()
221+
except self.queue_empty:
222+
break
217223
if not self.external_http: # pragma: no cover
218224
if self.http and not self.http.closed:
219225
await self.http.close()

src/engineio/client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,15 @@ def create_event(self, *args, **kwargs):
173173
"""Create an event object."""
174174
return threading.Event(*args, **kwargs)
175175

176+
def _reset(self):
177+
super()._reset()
178+
while True: # pragma: no cover
179+
try:
180+
self.queue.get_nowait()
181+
self.queue.task_done()
182+
except self.queue_empty:
183+
break
184+
176185
def _connect_polling(self, url, headers, engineio_path):
177186
"""Establish a long-polling connection to the Engine.IO server."""
178187
if requests is None: # pragma: no cover

tests/async/test_client.py

Lines changed: 47 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@
1616

1717

1818
class TestAsyncClient:
19+
def mock_queue(self, client):
20+
client.queue = mock.MagicMock()
21+
client.queue_empty = RuntimeError
22+
client.queue.get_nowait.side_effect = client.queue_empty
23+
client.queue.get = mock.AsyncMock()
24+
client.queue.put = mock.AsyncMock()
25+
client.queue.join = mock.AsyncMock()
26+
1927
async def test_is_asyncio_based(self):
2028
c = async_client.AsyncClient()
2129
assert c.is_asyncio_based()
@@ -143,9 +151,7 @@ async def test_disconnect_polling(self):
143151
base_client.connected_clients.append(c)
144152
c.state = 'connected'
145153
c.current_transport = 'polling'
146-
c.queue = mock.MagicMock()
147-
c.queue.put = mock.AsyncMock()
148-
c.queue.join = mock.AsyncMock()
154+
self.mock_queue(c)
149155
c.read_loop_task = mock.AsyncMock()()
150156
c.ws = mock.MagicMock()
151157
c.ws.close = mock.AsyncMock()
@@ -162,9 +168,7 @@ async def test_disconnect_websocket(self):
162168
base_client.connected_clients.append(c)
163169
c.state = 'connected'
164170
c.current_transport = 'websocket'
165-
c.queue = mock.MagicMock()
166-
c.queue.put = mock.AsyncMock()
167-
c.queue.join = mock.AsyncMock()
171+
self.mock_queue(c)
168172
c.read_loop_task = mock.AsyncMock()()
169173
c.ws = mock.MagicMock()
170174
c.ws.close = mock.AsyncMock()
@@ -181,9 +185,7 @@ async def test_disconnect_polling_abort(self):
181185
base_client.connected_clients.append(c)
182186
c.state = 'connected'
183187
c.current_transport = 'polling'
184-
c.queue = mock.MagicMock()
185-
c.queue.put = mock.AsyncMock()
186-
c.queue.join = mock.AsyncMock()
188+
self.mock_queue(c)
187189
c.read_loop_task = mock.AsyncMock()()
188190
c.ws = mock.MagicMock()
189191
c.ws.close = mock.AsyncMock()
@@ -197,9 +199,7 @@ async def test_disconnect_websocket_abort(self):
197199
base_client.connected_clients.append(c)
198200
c.state = 'connected'
199201
c.current_transport = 'websocket'
200-
c.queue = mock.MagicMock()
201-
c.queue.put = mock.AsyncMock()
202-
c.queue.join = mock.AsyncMock()
202+
self.mock_queue(c)
203203
c.read_loop_task = mock.AsyncMock()()
204204
c.ws = mock.MagicMock()
205205
c.ws.close = mock.AsyncMock()
@@ -1014,8 +1014,7 @@ async def test_read_loop_polling_no_response(self, _time):
10141014
c.ping_timeout = 5
10151015
c.state = 'connected'
10161016
c.base_url = 'http://foo'
1017-
c.queue = mock.MagicMock()
1018-
c.queue.put = mock.AsyncMock()
1017+
self.mock_queue(c)
10191018
c._send_request = mock.AsyncMock(return_value=None)
10201019
c._trigger_event = mock.AsyncMock()
10211020
c.write_loop_task = mock.AsyncMock()()
@@ -1036,8 +1035,7 @@ async def test_read_loop_polling_bad_status(self, _time):
10361035
c.ping_timeout = 5
10371036
c.state = 'connected'
10381037
c.base_url = 'http://foo'
1039-
c.queue = mock.MagicMock()
1040-
c.queue.put = mock.AsyncMock()
1038+
self.mock_queue(c)
10411039
c._send_request = mock.AsyncMock()
10421040
c._send_request.return_value.status = 400
10431041
c.write_loop_task = mock.AsyncMock()()
@@ -1055,8 +1053,7 @@ async def test_read_loop_polling_bad_packet(self, _time):
10551053
c.ping_timeout = 60
10561054
c.state = 'connected'
10571055
c.base_url = 'http://foo'
1058-
c.queue = mock.MagicMock()
1059-
c.queue.put = mock.AsyncMock()
1056+
self.mock_queue(c)
10601057
c._send_request = mock.AsyncMock()
10611058
c._send_request.return_value.status = 200
10621059
c._send_request.return_value.read = mock.AsyncMock(return_value=b'foo')
@@ -1074,8 +1071,7 @@ async def test_read_loop_polling(self):
10741071
c.ping_timeout = 5
10751072
c.state = 'connected'
10761073
c.base_url = 'http://foo'
1077-
c.queue = mock.MagicMock()
1078-
c.queue.put = mock.AsyncMock()
1074+
self.mock_queue(c)
10791075
c._send_request = mock.AsyncMock()
10801076
c._send_request.side_effect = [
10811077
mock.MagicMock(
@@ -1114,8 +1110,7 @@ async def test_read_loop_websocket_timeout(self):
11141110
c.ping_timeout = 2
11151111
c.base_url = 'ws://foo'
11161112
c.state = 'connected'
1117-
c.queue = mock.MagicMock()
1118-
c.queue.put = mock.AsyncMock()
1113+
self.mock_queue(c)
11191114
c.ws = mock.MagicMock()
11201115
c.ws.receive = mock.AsyncMock(side_effect=asyncio.TimeoutError())
11211116
c.write_loop_task = mock.AsyncMock()()
@@ -1129,8 +1124,7 @@ async def test_read_loop_websocket_no_response(self):
11291124
c.ping_timeout = 2
11301125
c.base_url = 'ws://foo'
11311126
c.state = 'connected'
1132-
c.queue = mock.MagicMock()
1133-
c.queue.put = mock.AsyncMock()
1127+
self.mock_queue(c)
11341128
c.ws = mock.MagicMock()
11351129
c.ws.receive = mock.AsyncMock(
11361130
side_effect=aiohttp.client_exceptions.ServerDisconnectedError()
@@ -1146,8 +1140,7 @@ async def test_read_loop_websocket_unexpected_error(self):
11461140
c.ping_timeout = 2
11471141
c.base_url = 'ws://foo'
11481142
c.state = 'connected'
1149-
c.queue = mock.MagicMock()
1150-
c.queue.put = mock.AsyncMock()
1143+
self.mock_queue(c)
11511144
c.ws = mock.MagicMock()
11521145
c.ws.receive = mock.AsyncMock(side_effect=ValueError)
11531146
c.write_loop_task = mock.AsyncMock()()
@@ -1161,8 +1154,7 @@ async def test_read_loop_websocket(self):
11611154
c.ping_timeout = 2
11621155
c.base_url = 'ws://foo'
11631156
c.state = 'connected'
1164-
c.queue = mock.MagicMock()
1165-
c.queue.put = mock.AsyncMock()
1157+
self.mock_queue(c)
11661158
c.ws = mock.MagicMock()
11671159
c.ws.receive = mock.AsyncMock(
11681160
side_effect=[
@@ -1188,7 +1180,7 @@ async def test_write_loop_no_packets(self):
11881180
c.state = 'connected'
11891181
c.ping_interval = 1
11901182
c.ping_timeout = 2
1191-
c.queue = mock.MagicMock()
1183+
self.mock_queue(c)
11921184
c.queue.get = mock.AsyncMock(return_value=None)
11931185
await c._write_loop()
11941186
c.queue.task_done.assert_called_once_with()
@@ -1199,9 +1191,8 @@ async def test_write_loop_empty_queue(self):
11991191
c.state = 'connected'
12001192
c.ping_interval = 1
12011193
c.ping_timeout = 2
1202-
c.queue = mock.MagicMock()
1203-
c.queue_empty = RuntimeError
1204-
c.queue.get = mock.AsyncMock(side_effect=RuntimeError)
1194+
self.mock_queue(c)
1195+
c.queue.get = mock.AsyncMock(side_effect=c.queue_empty)
12051196
await c._write_loop()
12061197
c.queue.get.assert_awaited_once_with()
12071198

@@ -1212,15 +1203,14 @@ async def test_write_loop_polling_one_packet(self):
12121203
c.ping_interval = 1
12131204
c.ping_timeout = 2
12141205
c.current_transport = 'polling'
1215-
c.queue = mock.MagicMock()
1216-
c.queue_empty = RuntimeError
1206+
self.mock_queue(c)
12171207
c.queue.get = mock.AsyncMock(
12181208
side_effect=[
12191209
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
1220-
RuntimeError,
1210+
c.queue_empty,
12211211
]
12221212
)
1223-
c.queue.get_nowait = mock.MagicMock(side_effect=RuntimeError)
1213+
c.queue.get_nowait = mock.MagicMock(side_effect=c.queue_empty)
12241214
c._send_request = mock.AsyncMock()
12251215
c._send_request.return_value.status = 200
12261216
await c._write_loop()
@@ -1243,19 +1233,18 @@ async def test_write_loop_polling_three_packets(self):
12431233
c.ping_interval = 1
12441234
c.ping_timeout = 2
12451235
c.current_transport = 'polling'
1246-
c.queue = mock.MagicMock()
1247-
c.queue_empty = RuntimeError
1236+
self.mock_queue(c)
12481237
c.queue.get = mock.AsyncMock(
12491238
side_effect=[
12501239
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
1251-
RuntimeError,
1240+
c.queue_empty,
12521241
]
12531242
)
12541243
c.queue.get_nowait = mock.MagicMock(
12551244
side_effect=[
12561245
packet.Packet(packet.PING),
12571246
packet.Packet(packet.NOOP),
1258-
RuntimeError,
1247+
c.queue_empty,
12591248
]
12601249
)
12611250
c._send_request = mock.AsyncMock()
@@ -1284,12 +1273,11 @@ async def test_write_loop_polling_two_packets_done(self):
12841273
c.ping_interval = 1
12851274
c.ping_timeout = 2
12861275
c.current_transport = 'polling'
1287-
c.queue = mock.MagicMock()
1288-
c.queue_empty = RuntimeError
1276+
self.mock_queue(c)
12891277
c.queue.get = mock.AsyncMock(
12901278
side_effect=[
12911279
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
1292-
RuntimeError,
1280+
c.queue_empty,
12931281
]
12941282
)
12951283
c.queue.get_nowait = mock.MagicMock(
@@ -1321,12 +1309,11 @@ async def test_write_loop_polling_bad_connection(self):
13211309
c.ping_interval = 1
13221310
c.ping_timeout = 2
13231311
c.current_transport = 'polling'
1324-
c.queue = mock.MagicMock()
1325-
c.queue_empty = RuntimeError
1312+
self.mock_queue(c)
13261313
c.queue.get = mock.AsyncMock(
13271314
side_effect=[packet.Packet(packet.MESSAGE, {'foo': 'bar'})]
13281315
)
1329-
c.queue.get_nowait = mock.MagicMock(side_effect=[RuntimeError])
1316+
c.queue.get_nowait = mock.MagicMock(side_effect=[c.queue_empty])
13301317
c._send_request = mock.AsyncMock(return_value=None)
13311318
await c._write_loop()
13321319
assert c.queue.task_done.call_count == 1
@@ -1349,12 +1336,11 @@ async def test_write_loop_polling_bad_status(self):
13491336
c.ping_interval = 1
13501337
c.ping_timeout = 2
13511338
c.current_transport = 'polling'
1352-
c.queue = mock.MagicMock()
1353-
c.queue_empty = RuntimeError
1339+
self.mock_queue(c)
13541340
c.queue.get = mock.AsyncMock(
13551341
side_effect=[packet.Packet(packet.MESSAGE, {'foo': 'bar'})]
13561342
)
1357-
c.queue.get_nowait = mock.MagicMock(side_effect=[RuntimeError])
1343+
c.queue.get_nowait = mock.MagicMock(side_effect=[c.queue_empty])
13581344
c._send_request = mock.AsyncMock()
13591345
c._send_request.return_value.status = 500
13601346
await c._write_loop()
@@ -1378,15 +1364,14 @@ async def test_write_loop_websocket_one_packet(self):
13781364
c.ping_interval = 1
13791365
c.ping_timeout = 2
13801366
c.current_transport = 'websocket'
1381-
c.queue = mock.MagicMock()
1382-
c.queue_empty = RuntimeError
1367+
self.mock_queue(c)
13831368
c.queue.get = mock.AsyncMock(
13841369
side_effect=[
13851370
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
1386-
RuntimeError,
1371+
c.queue_empty,
13871372
]
13881373
)
1389-
c.queue.get_nowait = mock.MagicMock(side_effect=[RuntimeError])
1374+
c.queue.get_nowait = mock.MagicMock(side_effect=[c.queue_empty])
13901375
c.ws = mock.MagicMock()
13911376
c.ws.send_str = mock.AsyncMock()
13921377
await c._write_loop()
@@ -1400,19 +1385,18 @@ async def test_write_loop_websocket_three_packets(self):
14001385
c.ping_interval = 1
14011386
c.ping_timeout = 2
14021387
c.current_transport = 'websocket'
1403-
c.queue = mock.MagicMock()
1404-
c.queue_empty = RuntimeError
1388+
self.mock_queue(c)
14051389
c.queue.get = mock.AsyncMock(
14061390
side_effect=[
14071391
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
1408-
RuntimeError,
1392+
c.queue_empty,
14091393
]
14101394
)
14111395
c.queue.get_nowait = mock.MagicMock(
14121396
side_effect=[
14131397
packet.Packet(packet.PING),
14141398
packet.Packet(packet.NOOP),
1415-
RuntimeError,
1399+
c.queue_empty,
14161400
]
14171401
)
14181402
c.ws = mock.MagicMock()
@@ -1430,12 +1414,11 @@ async def test_write_loop_websocket_one_packet_binary(self):
14301414
c.ping_interval = 1
14311415
c.ping_timeout = 2
14321416
c.current_transport = 'websocket'
1433-
c.queue = mock.MagicMock()
1434-
c.queue_empty = RuntimeError
1417+
self.mock_queue(c)
14351418
c.queue.get = mock.AsyncMock(
1436-
side_effect=[packet.Packet(packet.MESSAGE, b'foo'), RuntimeError]
1419+
side_effect=[packet.Packet(packet.MESSAGE, b'foo'), c.queue_empty]
14371420
)
1438-
c.queue.get_nowait = mock.MagicMock(side_effect=[RuntimeError])
1421+
c.queue.get_nowait = mock.MagicMock(side_effect=[c.queue_empty])
14391422
c.ws = mock.MagicMock()
14401423
c.ws.send_bytes = mock.AsyncMock()
14411424
await c._write_loop()
@@ -1449,15 +1432,14 @@ async def test_write_loop_websocket_bad_connection(self):
14491432
c.ping_interval = 1
14501433
c.ping_timeout = 2
14511434
c.current_transport = 'websocket'
1452-
c.queue = mock.MagicMock()
1453-
c.queue_empty = RuntimeError
1435+
self.mock_queue(c)
14541436
c.queue.get = mock.AsyncMock(
14551437
side_effect=[
14561438
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
1457-
RuntimeError,
1439+
c.queue_empty,
14581440
]
14591441
)
1460-
c.queue.get_nowait = mock.MagicMock(side_effect=[RuntimeError])
1442+
c.queue.get_nowait = mock.MagicMock(side_effect=[c.queue_empty])
14611443
c.ws = mock.MagicMock()
14621444
c.ws.send_str = mock.AsyncMock(
14631445
side_effect=aiohttp.client_exceptions.ServerDisconnectedError()

0 commit comments

Comments
 (0)