Skip to content

Commit 64f3cf9

Browse files
Reset client queue upon disconnection (Fixes #414)
1 parent fa99e01 commit 64f3cf9

File tree

3 files changed

+38
-0
lines changed

3 files changed

+38
-0
lines changed

src/engineio/async_client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,12 @@ def create_event(self):
214214

215215
async def _reset(self):
216216
super()._reset()
217+
while True: # pragma: no cover
218+
try:
219+
self.queue.get_nowait()
220+
self.queue.task_done()
221+
except self.queue_empty:
222+
break
217223
if not self.external_http: # pragma: no cover
218224
if self.http and not self.http.closed:
219225
await self.http.close()

src/engineio/client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,15 @@ def create_event(self, *args, **kwargs):
173173
"""Create an event object."""
174174
return threading.Event(*args, **kwargs)
175175

176+
def _reset(self):
177+
super()._reset()
178+
while True: # pragma: no cover
179+
try:
180+
self.queue.get_nowait()
181+
self.queue.task_done()
182+
except self.queue_empty:
183+
break
184+
176185
def _connect_polling(self, url, headers, engineio_path):
177186
"""Establish a long-polling connection to the Engine.IO server."""
178187
if requests is None: # pragma: no cover

tests/common/test_client.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ def test_disconnect_polling(self):
210210
c.state = 'connected'
211211
c.current_transport = 'polling'
212212
c.queue = mock.MagicMock()
213+
c.queue.get_nowait.side_effect = c.queue_empty
213214
c.read_loop_task = mock.MagicMock()
214215
c.ws = mock.MagicMock()
215216
c._trigger_event = mock.MagicMock()
@@ -226,6 +227,7 @@ def test_disconnect_websocket(self):
226227
c.state = 'connected'
227228
c.current_transport = 'websocket'
228229
c.queue = mock.MagicMock()
230+
c.queue.get_nowait.side_effect = c.queue_empty
229231
c.read_loop_task = mock.MagicMock()
230232
c.ws = mock.MagicMock()
231233
c._trigger_event = mock.MagicMock()
@@ -242,6 +244,7 @@ def test_disconnect_polling_abort(self):
242244
c.state = 'connected'
243245
c.current_transport = 'polling'
244246
c.queue = mock.MagicMock()
247+
c.queue.get_nowait.side_effect = c.queue_empty
245248
c.read_loop_task = mock.MagicMock()
246249
c.ws = mock.MagicMock()
247250
c.disconnect(abort=True)
@@ -256,6 +259,7 @@ def test_disconnect_websocket_abort(self):
256259
c.state = 'connected'
257260
c.current_transport = 'websocket'
258261
c.queue = mock.MagicMock()
262+
c.queue.get_nowait.side_effect = c.queue_empty
259263
c.read_loop_task = mock.MagicMock()
260264
c.ws = mock.MagicMock()
261265
c.disconnect(abort=True)
@@ -1333,6 +1337,7 @@ def test_read_loop_polling_no_response(self, _time):
13331337
c.state = 'connected'
13341338
c.base_url = 'http://foo'
13351339
c.queue = mock.MagicMock()
1340+
c.queue.get_nowait.side_effect = c.queue_empty
13361341
c._send_request = mock.MagicMock(return_value=None)
13371342
c._trigger_event = mock.MagicMock()
13381343
c.write_loop_task = mock.MagicMock()
@@ -1354,6 +1359,7 @@ def test_read_loop_polling_bad_status(self, _time):
13541359
c.state = 'connected'
13551360
c.base_url = 'http://foo'
13561361
c.queue = mock.MagicMock()
1362+
c.queue.get_nowait.side_effect = c.queue_empty
13571363
c._send_request = mock.MagicMock()
13581364
c._send_request.return_value.status_code = 400
13591365
c.write_loop_task = mock.MagicMock()
@@ -1373,6 +1379,7 @@ def test_read_loop_polling_bad_packet(self, _time):
13731379
c.state = 'connected'
13741380
c.base_url = 'http://foo'
13751381
c.queue = mock.MagicMock()
1382+
c.queue.get_nowait.side_effect = c.queue_empty
13761383
c._send_request = mock.MagicMock()
13771384
c._send_request.return_value.status_code = 200
13781385
c._send_request.return_value.content = b'foo'
@@ -1392,6 +1399,7 @@ def test_read_loop_polling(self):
13921399
c.state = 'connected'
13931400
c.base_url = 'http://foo'
13941401
c.queue = mock.MagicMock()
1402+
c.queue.get_nowait.side_effect = c.queue_empty
13951403
c._send_request = mock.MagicMock()
13961404
c._send_request.side_effect = [
13971405
mock.MagicMock(
@@ -1426,6 +1434,7 @@ def test_read_loop_websocket_timeout(self):
14261434
c = client.Client()
14271435
c.state = 'connected'
14281436
c.queue = mock.MagicMock()
1437+
c.queue.get_nowait.side_effect = c.queue_empty
14291438
c.ws = mock.MagicMock()
14301439
c.ws.recv.side_effect = websocket.WebSocketTimeoutException
14311440
c.write_loop_task = mock.MagicMock()
@@ -1438,6 +1447,7 @@ def test_read_loop_websocket_no_response(self):
14381447
c = client.Client()
14391448
c.state = 'connected'
14401449
c.queue = mock.MagicMock()
1450+
c.queue.get_nowait.side_effect = c.queue_empty
14411451
c.ws = mock.MagicMock()
14421452
c.ws.recv.side_effect = websocket.WebSocketConnectionClosedException
14431453
c.write_loop_task = mock.MagicMock()
@@ -1450,6 +1460,7 @@ def test_read_loop_websocket_unexpected_error(self):
14501460
c = client.Client()
14511461
c.state = 'connected'
14521462
c.queue = mock.MagicMock()
1463+
c.queue.get_nowait.side_effect = c.queue_empty
14531464
c.ws = mock.MagicMock()
14541465
c.ws.recv.side_effect = ValueError
14551466
c.write_loop_task = mock.MagicMock()
@@ -1464,6 +1475,7 @@ def test_read_loop_websocket(self):
14641475
c.ping_timeout = 2
14651476
c.state = 'connected'
14661477
c.queue = mock.MagicMock()
1478+
c.queue.get_nowait.side_effect = c.queue_empty
14671479
c.ws = mock.MagicMock()
14681480
c.ws.recv.side_effect = [
14691481
packet.Packet(packet.PING).encode(),
@@ -1489,6 +1501,7 @@ def test_write_loop_no_packets(self):
14891501
c.ping_interval = 1
14901502
c.ping_timeout = 2
14911503
c.queue = mock.MagicMock()
1504+
c.queue.get_nowait.side_effect = c.queue_empty
14921505
c.queue.get.return_value = None
14931506
c._write_loop()
14941507
c.queue.task_done.assert_called_once_with()
@@ -1501,6 +1514,7 @@ def test_write_loop_empty_queue(self):
15011514
c.ping_timeout = 2
15021515
c.queue = mock.MagicMock()
15031516
c.queue_empty = RuntimeError
1517+
c.queue.get_nowait.side_effect = c.queue_empty
15041518
c.queue.get.side_effect = RuntimeError
15051519
c._write_loop()
15061520
c.queue.get.assert_called_once_with(timeout=7)
@@ -1514,6 +1528,7 @@ def test_write_loop_polling_one_packet(self):
15141528
c.current_transport = 'polling'
15151529
c.queue = mock.MagicMock()
15161530
c.queue_empty = RuntimeError
1531+
c.queue.get_nowait.side_effect = c.queue_empty
15171532
c.queue.get.side_effect = [
15181533
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
15191534
RuntimeError,
@@ -1543,6 +1558,7 @@ def test_write_loop_polling_three_packets(self):
15431558
c.current_transport = 'polling'
15441559
c.queue = mock.MagicMock()
15451560
c.queue_empty = RuntimeError
1561+
c.queue.get_nowait.side_effect = c.queue_empty
15461562
c.queue.get.side_effect = [
15471563
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
15481564
packet.Packet(packet.PING),
@@ -1578,6 +1594,7 @@ def test_write_loop_polling_two_packets_done(self):
15781594
c.current_transport = 'polling'
15791595
c.queue = mock.MagicMock()
15801596
c.queue_empty = RuntimeError
1597+
c.queue.get_nowait.side_effect = c.queue_empty
15811598
c.queue.get.side_effect = [
15821599
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
15831600
packet.Packet(packet.PING),
@@ -1612,6 +1629,7 @@ def test_write_loop_polling_bad_connection(self):
16121629
c.current_transport = 'polling'
16131630
c.queue = mock.MagicMock()
16141631
c.queue_empty = RuntimeError
1632+
c.queue.get_nowait.side_effect = c.queue_empty
16151633
c.queue.get.side_effect = [
16161634
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
16171635
RuntimeError,
@@ -1641,6 +1659,7 @@ def test_write_loop_polling_bad_status(self):
16411659
c.current_transport = 'polling'
16421660
c.queue = mock.MagicMock()
16431661
c.queue_empty = RuntimeError
1662+
c.queue.get_nowait.side_effect = c.queue_empty
16441663
c.queue.get.side_effect = [
16451664
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
16461665
RuntimeError,
@@ -1670,6 +1689,7 @@ def test_write_loop_websocket_one_packet(self):
16701689
c.current_transport = 'websocket'
16711690
c.queue = mock.MagicMock()
16721691
c.queue_empty = RuntimeError
1692+
c.queue.get_nowait.side_effect = c.queue_empty
16731693
c.queue.get.side_effect = [
16741694
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
16751695
RuntimeError,
@@ -1690,6 +1710,7 @@ def test_write_loop_websocket_three_packets(self):
16901710
c.current_transport = 'websocket'
16911711
c.queue = mock.MagicMock()
16921712
c.queue_empty = RuntimeError
1713+
c.queue.get_nowait.side_effect = c.queue_empty
16931714
c.queue.get.side_effect = [
16941715
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
16951716
packet.Packet(packet.PING),
@@ -1714,6 +1735,7 @@ def test_write_loop_websocket_one_packet_binary(self):
17141735
c.current_transport = 'websocket'
17151736
c.queue = mock.MagicMock()
17161737
c.queue_empty = RuntimeError
1738+
c.queue.get_nowait.side_effect = c.queue_empty
17171739
c.queue.get.side_effect = [
17181740
packet.Packet(packet.MESSAGE, b'foo'),
17191741
RuntimeError,
@@ -1734,6 +1756,7 @@ def test_write_loop_websocket_bad_connection(self):
17341756
c.current_transport = 'websocket'
17351757
c.queue = mock.MagicMock()
17361758
c.queue_empty = RuntimeError
1759+
c.queue.get_nowait.side_effect = c.queue_empty
17371760
c.queue.get.side_effect = [
17381761
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
17391762
RuntimeError,

0 commit comments

Comments
 (0)