Skip to content

Commit ff11b12

Browse files
authored
Reconnect the connection thread if broker shuts down (#18)
* Reconnect the connection thread if broker shuts down Make the subscriber and publisher interfaces more clear and stable. Reconnect if the rabbitmq broker shuts down. Configurable parameters for how many threads the subscriber is allowed to create. Keep track of published messages in order to requeue them if broker goes down. Create a base RabbitMQ class for shared methods. * Updates according to review comments. Added default value of '#' to routing key. Changed '%s' to %r
1 parent 46c3c75 commit ff11b12

File tree

6 files changed

+595
-336
lines changed

6 files changed

+595
-336
lines changed

eiffellib/lib/__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Copyright 2020 Axis Communications AB.
2+
#
3+
# For a full list of individual contributors, please see the commit history.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.

eiffellib/lib/base_rabbitmq.py

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
# Copyright 2020 Axis Communications AB.
2+
#
3+
# For a full list of individual contributors, please see the commit history.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
"""RabbitMQ base connection."""
17+
import time
18+
import logging
19+
import threading
20+
import ssl as _ssl
21+
import pika
22+
23+
24+
_LOG = logging.getLogger(__name__)
25+
26+
27+
# pylint:disable=too-many-instance-attributes
28+
class BaseRabbitMQ:
29+
"""Base RabbitMQ connection object."""
30+
31+
_connection = None
32+
_channel = None
33+
_was_active = False
34+
_closing = False
35+
36+
should_reconnect = False
37+
active = False
38+
connection_thread = None
39+
running = False
40+
41+
# pylint:disable=too-many-arguments
42+
def __init__(self, host, port, username, password, vhost, ssl):
43+
"""Set connection parameters."""
44+
parameters = {"port": port}
45+
if ssl is True:
46+
context = _ssl.create_default_context()
47+
ssl_options = pika.SSLOptions(context, host)
48+
parameters["ssl_options"] = ssl_options
49+
if username and password:
50+
parameters["credentials"] = pika.PlainCredentials(username, password)
51+
if vhost:
52+
parameters["virtual_host"] = vhost
53+
54+
self.parameters = pika.ConnectionParameters(host, **parameters)
55+
56+
def reset_parameters(self):
57+
"""Reset parameters to default."""
58+
self.should_reconnect = False
59+
self.active = False
60+
self._closing = False
61+
self._was_active = False
62+
63+
def _setup(self, channel):
64+
"""Setup channel. Called after channel is opened.
65+
66+
:param channel: RabbitMQ channel.
67+
:type channel: :obj:`pika.channel.Channel`
68+
"""
69+
raise NotImplementedError
70+
71+
def _start(self, *args, **kwargs):
72+
"""Start up the RabbitMQ connection. Call when connection is ready."""
73+
raise NotImplementedError
74+
75+
def _cancel(self):
76+
"""Cancel the RabbitMQ connection."""
77+
raise NotImplementedError
78+
79+
def _on_start(self):
80+
"""Called just before starting the connection."""
81+
82+
def _on_connect(self):
83+
"""Called just before connecting."""
84+
85+
def reconnect(self):
86+
"""Reconnect RabbitMQ connection."""
87+
self.should_reconnect = True
88+
self.stop()
89+
90+
def connect(self):
91+
"""Connect to RabbitMQ."""
92+
self._connection = pika.SelectConnection(self.parameters,
93+
on_open_callback=self._connection_open,
94+
on_close_callback=self._connection_close,
95+
on_open_error_callback=self._connection_open_error)
96+
97+
def close_connection(self):
98+
"""Close the RabbitMQ connection."""
99+
self.active = False
100+
if self._connection.is_closing or self._connection.is_closed:
101+
_LOG.info("Connection is closing or already closed.")
102+
else:
103+
_LOG.info("Closing connection")
104+
self._connection.close()
105+
106+
def _connection_open(self, connection):
107+
"""Connection opened callback. Create a RabbitMQ channel.
108+
109+
:param connection: Connection that was just opened.
110+
:type connection: :obj:`pika.connection.Connection`
111+
"""
112+
_LOG.info("Connection opened")
113+
_LOG.info("Creating a new channel.")
114+
connection.channel(on_open_callback=self._channel_open)
115+
116+
def _connection_close(self, _, reason):
117+
"""Connection closed callback.
118+
119+
Close the connection if told to, otherwise attempt reconnect.
120+
121+
:param connection: Connection that was closed.
122+
:type connection: :obj:`pika.connection.Connection`
123+
:param reason: Reason the connection was closed.
124+
:type reason: str
125+
"""
126+
self._channel = None
127+
if self._closing:
128+
self._connection.ioloop.stop()
129+
else:
130+
_LOG.warning("Connection closed, reconnecting: %r", reason)
131+
self.reconnect()
132+
133+
def _connection_open_error(self, _, error):
134+
"""Connection error when starting callback. Reconnect.
135+
136+
:param connection: Connection that was closed.
137+
:type connection: :obj:`pika.Connection`
138+
:param error: Reason the connection was closed.
139+
:type error: str
140+
"""
141+
_LOG.error("Failed to open connection: %r", error)
142+
self.reconnect()
143+
144+
def _channel_open(self, channel):
145+
"""Channel opened callback. Set close callback and call setup channel.
146+
147+
:param channel: Channel that was just opened.
148+
:type channel: :obj:`pika.channel.Channel`
149+
"""
150+
_LOG.info("Channel opened")
151+
self._channel = channel
152+
self._channel.add_on_close_callback(self._channel_closed)
153+
self._setup(channel)
154+
155+
def _channel_closed(self, channel, reason):
156+
"""Channel closed callback. Close connection."""
157+
_LOG.warning("Channel %i was closed: %r", channel, reason)
158+
self.close_connection()
159+
160+
def close_channel(self):
161+
"""Close the RabbitMQ channel."""
162+
_LOG.info("Closing the channel.")
163+
self._channel.close()
164+
165+
def run(self):
166+
"""Reset parameters, connect to RabbitMQ and start the ioloop.
167+
168+
This is a blocking method.
169+
"""
170+
self._on_connect()
171+
self.reset_parameters()
172+
self.connect()
173+
self._connection.ioloop.start()
174+
175+
def keep_alive(self):
176+
"""Reconnect forever if the should_reconnect flag is set.
177+
178+
This is a blocking method.
179+
"""
180+
reconnect_delay = 0
181+
while True:
182+
try:
183+
self.run()
184+
except KeyboardInterrupt:
185+
self.stop()
186+
break
187+
if self.should_reconnect:
188+
self.stop()
189+
if self._was_active:
190+
reconnect_delay = 0
191+
else:
192+
reconnect_delay += 1
193+
if reconnect_delay > 30:
194+
reconnect_delay = 30
195+
_LOG.info("Reconnecting after %d seconds", reconnect_delay)
196+
time.sleep(reconnect_delay)
197+
else:
198+
break
199+
self.running = False
200+
201+
def is_alive(self):
202+
"""Check if connection is alive or not.
203+
204+
Note that this only checks the 'running' flag and does not
205+
deep-dive into the RabbitMQ connection and channel.
206+
207+
:return: Whether this connection is running or not.
208+
:rtype: bool
209+
"""
210+
return self.running
211+
212+
def wait_start(self):
213+
"""Block until connection starts."""
214+
while not self.is_alive():
215+
time.sleep(0.1)
216+
217+
def wait_close(self):
218+
"""Block until publisher closes."""
219+
while self.is_alive():
220+
time.sleep(0.1)
221+
222+
def start(self, wait=True):
223+
"""Start the RabbitMQ connection in a thread.
224+
225+
:param wait: Block until the connection starts. Defaults to True.
226+
:type wait: bool
227+
"""
228+
self._on_start()
229+
self.connection_thread = threading.Thread(target=self.keep_alive)
230+
self.connection_thread.daemon = True
231+
self.connection_thread.start()
232+
if wait:
233+
self.wait_start()
234+
235+
def stop(self):
236+
"""Stop the RabbitMQ connection."""
237+
if not self._closing:
238+
self._closing = True
239+
_LOG.info("Stopping")
240+
if self.active:
241+
self._cancel()
242+
try:
243+
# Start the ioloop again in order for the callbacks
244+
# to fire, so that we can do a clean shutdown.
245+
self._connection.ioloop.start()
246+
except RuntimeError:
247+
pass
248+
else:
249+
self._connection.ioloop.stop()
250+
_LOG.info("Stopped")
251+
252+
close = stop # Backwards compatibility

eiffellib/publishers/eiffel_publisher.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2019 Axis Communications AB.
1+
# Copyright 2020 Axis Communications AB.
22
#
33
# For a full list of individual contributors, please see the commit history.
44
#
@@ -27,9 +27,12 @@ def start(self):
2727
"""Start the pika rabbitmq connection."""
2828
raise NotImplementedError
2929

30-
def send_event(self, event):
30+
def send_event(self, event, block=True):
3131
"""Validate and send an eiffel event to server.
3232
33+
:param block: Set to True in order to block until ready.
34+
Default: True
35+
:type block: bool
3336
:param event: Event to send.
3437
:type event: :obj:`eiffellib.events.eiffel_base_event.EiffelBaseEvent`
3538
"""

0 commit comments

Comments
 (0)