diff --git a/eiffellib/publishers/rabbitmq_publisher.py b/eiffellib/publishers/rabbitmq_publisher.py index aa6e7fd..4c72022 100644 --- a/eiffellib/publishers/rabbitmq_publisher.py +++ b/eiffellib/publishers/rabbitmq_publisher.py @@ -1,4 +1,4 @@ -# Copyright 2020-2021 Axis Communications AB. +# Copyright 2020-2022 Axis Communications AB. # # For a full list of individual contributors, please see the commit history. # @@ -17,6 +17,7 @@ import time import logging import warnings +from threading import RLock, current_thread from copy import deepcopy import pika @@ -35,6 +36,8 @@ class RabbitMQPublisher(EiffelPublisher, BaseRabbitMQ): _nacks = 0 _delivered = 0 _last_delivered_tag = 0 + # RLock is used so that a thread can acquire a lock multiple times without blocking. + _lock = RLock() # pylint:disable=too-many-arguments def __init__(self, host, exchange, routing_key="eiffel", @@ -101,13 +104,38 @@ def _publisher_canceled(self, method_frame): def _resend_nacked_deliveries(self): """Resend all NACKed deliveries. This method loops forever.""" - deliveries = self._nacked_deliveries.copy() - if deliveries: - _LOG.info("Resending %i NACKed deliveries", len(deliveries)) - self._nacked_deliveries.clear() - for event in deliveries: - self.send_event(event) - self._connection.ioloop.call_later(1, self._resend_nacked_deliveries) + if not self.is_alive() or (self._channel is None or not self._channel.is_open): + _LOG.warning("Publisher is not ready. Retry resending NACKed deliveries in 1s") + self._connection.ioloop.call_later(1, self._resend_nacked_deliveries) + return + + # No need to acquire the lock if there are no nacked deliveries. + if not len(self._nacked_deliveries): + self._connection.ioloop.call_later(1, self._resend_nacked_deliveries) + return + + # If we cannot acquire the lock here, retry later otherwise call the send_event method. + _LOG.debug(f"[{current_thread().getName()}] Attempting to acquire '_resend_nacked_deliveries' lock") + acquired = self._lock.acquire(blocking=False) + if not acquired: + _LOG.debug(f"[{current_thread().getName()}] '_resend_nacked_deliveries' Locked") + self._connection.ioloop.call_later(1, self._resend_nacked_deliveries) + return + try: + _LOG.debug(f"[{current_thread().getName()}] '_resend_nacked_deliveries' Lock acquired") + deliveries = self._nacked_deliveries.copy() + if deliveries: + _LOG.info("Resending %i NACKed deliveries", len(deliveries)) + for event in deliveries: + # It is safe to remove the event here since if it fails delivery + # in send_event it will be re-added to _nacked_deliveries. + self._nacked_deliveries.remove(event) + # Never block in an ioloop method + self.send_event(event, block=False) + finally: + self._lock.release() + _LOG.debug(f"[{current_thread().getName()}] '_resend_nacked_deliveries' Lock released") + self._connection.ioloop.call_later(1, self._resend_nacked_deliveries) def _confirm_delivery(self, method_frame): """Confirm the delivery of events and make sure we resend NACKed events. @@ -127,25 +155,34 @@ def _confirm_delivery(self, method_frame): else: number_of_acks = delivery_tag - self._last_delivered_tag - if confirmation_type == 'ack': - self._acks += number_of_acks - elif confirmation_type == 'nack': - self._nacks += number_of_acks - - if delivery_tag == 0: - if confirmation_type == "nack": - self._nacked_deliveries.extend(self._deliveries.values()) - self._deliveries.clear() - else: - for tag in range(self._last_delivered_tag + 1, delivery_tag + 1): + # Since _resend_nacked_deliveries runs in a thread we must protect this + # part that modifies class attributes. + _LOG.debug(f"[{current_thread().getName()}] Attempting to acquire '_confirm_delivery' lock") + with self._lock: + _LOG.debug(f"[{current_thread().getName()}] '_confirm_delivery' Lock acquired") + if confirmation_type == 'ack': + self._acks += number_of_acks + elif confirmation_type == 'nack': + self._nacks += number_of_acks + + if delivery_tag == 0: if confirmation_type == "nack": - self._nacked_deliveries.append(self._deliveries[tag]) - self._deliveries.pop(tag) - self._last_delivered_tag = delivery_tag - - _LOG.debug('Published %i messages, %i have yet to be confirmed, ' - '%i were acked and %i were nacked', self._acks+self._nacks, - len(self._deliveries), self._acks, self._nacks) + self._nacked_deliveries.extend(self._deliveries.values()) + self._deliveries.clear() + else: + for tag in range(self._last_delivered_tag + 1, delivery_tag + 1): + if confirmation_type == "nack": + self._nacked_deliveries.append(self._deliveries[tag]) + try: + self._deliveries.pop(tag) + except KeyError: + _LOG.warning("KeyError when attempting to pop tag %i") + self._last_delivered_tag = delivery_tag + + _LOG.debug('Published %i messages, %i have yet to be confirmed, ' + '%i were acked and %i were nacked', self._acks+self._nacks, + len(self._deliveries), self._acks, self._nacks) + _LOG.debug(f"[{current_thread().getName()}] '_confirm_delivery' Lock released") def send_event(self, event, block=True): """Validate and send an eiffel event to the rabbitmq server. @@ -186,17 +223,22 @@ def send_event(self, event, block=True): event.meta.add("source", source) event.validate() routing_key = self.routing_key or event.routing_key - try: - self._channel.basic_publish( - self.exchange, - routing_key, - event.serialized, - properties, - ) - except: - self._nacked_deliveries.append(event) - return - self._delivered += 1 - self._deliveries[self._delivered] = event + + _LOG.debug(f"[{current_thread().getName()}] Attempting to acquire 'send_event' lock") + with self._lock: + _LOG.debug(f"[{current_thread().getName()}] 'send_event' Lock acquired") + try: + self._channel.basic_publish( + self.exchange, + routing_key, + event.serialized, + properties, + ) + except: + self._nacked_deliveries.append(event) + return + self._delivered += 1 + self._deliveries[self._delivered] = event + _LOG.debug(f"[{current_thread().getName()}] 'send_event' Lock released") send = send_event