Skip to content

Connection thread dies when broker shuts down #16

Closed
@magnusbaeck

Description

@magnusbaeck

Description

When the RabbitMQ broker is shut down the connection thread crashes in the _run() method:

Traceback (most recent call last):
  File "/home/magnus/src/eiffel-pythonlib/eiffellib/subscribers/rabbitmq_subscriber.py", line 240, in _run
    self.connection.sleep(0.1)
  File "/home/magnus/src/eiffel-pythonlib/.env/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 852, in sleep
    self.process_data_events(time_limit)
  File "/home/magnus/src/eiffel-pythonlib/.env/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 829, in process_data_events
    self._flush_output(timer.is_ready, common_terminator)
  File "/home/magnus/src/eiffel-pythonlib/.env/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output
    raise self._closed_result.value.error
pika.exceptions.ConnectionClosedByBroker: (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
Attempting reconnection in 5s
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/home/magnus/src/eiffel-pythonlib/eiffellib/subscribers/rabbitmq_subscriber.py", line 240, in _run
    self.connection.sleep(0.1)
  File "/home/magnus/src/eiffel-pythonlib/.env/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 852, in sleep
    self.process_data_events(time_limit)
  File "/home/magnus/src/eiffel-pythonlib/.env/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 829, in process_data_events
    self._flush_output(timer.is_ready, common_terminator)
  File "/home/magnus/src/eiffel-pythonlib/.env/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output
    raise self._closed_result.value.error
pika.exceptions.ConnectionClosedByBroker: (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/home/magnus/src/eiffel-pythonlib/eiffellib/subscribers/rabbitmq_subscriber.py", line 249, in _run
    self.connection.sleep(5)
  File "/home/magnus/src/eiffel-pythonlib/.env/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 852, in sleep
    self.process_data_events(time_limit)
  File "/home/magnus/src/eiffel-pythonlib/.env/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 828, in process_data_events
    with _IoloopTimerContext(time_limit, self._impl) as timer:
  File "/home/magnus/src/eiffel-pythonlib/.env/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 191, in __enter__
    self._duration, self._callback_result.signal_once)
  File "/home/magnus/src/eiffel-pythonlib/.env/lib/python3.6/site-packages/pika/adapters/base_connection.py", line 205, in _adapter_call_later
    return self._nbio.call_later(delay, callback)
  File "/home/magnus/src/eiffel-pythonlib/.env/lib/python3.6/site-packages/pika/adapters/utils/selector_ioloop_adapter.py", line 222, in call_later
    return _TimerHandle(self._loop.call_later(delay, callback), self._loop)
  File "/home/magnus/src/eiffel-pythonlib/.env/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 444, in call_later
    return self._timer.call_later(delay, callback)
  File "/home/magnus/src/eiffel-pythonlib/.env/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 258, in call_later
    heapq.heappush(self._timeout_heap, timeout)
TypeError: heap argument must be a list

https://stackoverflow.com/q/55570743/414355 suggests that this could be caused by attempting to access the same Pika connection from multiple threads, but I'm not sure that's the case here. Not starting the delegator thread doesn't make a difference, and with an empty queue it seems all connection/channel operations are done from the connection thread.

This happens reliably with the example program from README.rst, RabbitMQ 3.8.2, Python 3.6.9, and the following Python state:

$ pip freeze
attrs==19.3.0
-e [email protected]:magnusbaeck/eiffel-pythonlib@7b4eb839ae39ba9d165bd19df86f3eb6cbe3233f#egg=eiffellib
jsonschema==3.0.0a2
pika==1.0.1
pkg-resources==0.0.0
pyrsistent==0.15.6
six==1.13.0

Motivation

Broker restarts are routine and should obviously be handled gracefully by all clients.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions