From 7f2c60a4e08c63cb1a7c3faa8c824616373921e5 Mon Sep 17 00:00:00 2001 From: Tobias Persson Date: Tue, 11 Jan 2022 14:38:54 +0100 Subject: [PATCH] Add a method for waiting for unpublished events Also update the README to reflect this update. --- README.rst | 1 + docs/examples.rst | 6 ++---- eiffellib/publishers/eiffel_publisher.py | 11 ++++++++++- eiffellib/publishers/rabbitmq_publisher.py | 23 +++++++++++++++++++++- 4 files changed, 35 insertions(+), 6 deletions(-) diff --git a/README.rst b/README.rst index a4466a6..c25ea33 100644 --- a/README.rst +++ b/README.rst @@ -81,6 +81,7 @@ Publishing an event ACTIVITY_TRIGGERED = EiffelActivityTriggeredEvent() ACTIVITY_TRIGGERED.data.add("name", "Test activity") PUBLISHER.send_event(ACTIVITY_TRIGGERED) + PUBLISHER.wait_for_unpublished_events() Deprecation of routing key -------------------------- diff --git a/docs/examples.rst b/docs/examples.rst index b5ce6d0..76bdb95 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -24,7 +24,6 @@ This code snippet will subscribe to an ActivityStarted in order to publish an Ac .. code-block:: python - import time from eiffellib.subscribers import RabbitMQSubscriber from eiffellib.publishers import RabbitMQPublisher from eiffellib.events import (EiffelActivityTriggeredEvent, @@ -68,7 +67,7 @@ This code snippet will subscribe to an ActivityStarted in order to publish an Ac PUBLISHER.send_event(ACTIVITY_STARTED) # Wait for event to be received by 'callback'. - time.sleep(1) + PUBLISHER.wait_for_unpublished_events() Activity -------- @@ -80,7 +79,6 @@ An activity is just a callable which will send ActivityTriggered, Started and Fi .. code-block:: python import os - import time from eiffellib.subscribers import RabbitMQSubscriber from eiffellib.publishers import RabbitMQPublisher from eiffellib.events import EiffelAnnouncementPublishedEvent @@ -117,4 +115,4 @@ An activity is just a callable which will send ActivityTriggered, Started and Fi PUBLISHER.send_event(ANNOUNCEMENT) # Wait for event to be received by 'callback'. - time.sleep(1) + PUBLISHER.wait_for_unpublished_events() diff --git a/eiffellib/publishers/eiffel_publisher.py b/eiffellib/publishers/eiffel_publisher.py index a91d18c..491872b 100644 --- a/eiffellib/publishers/eiffel_publisher.py +++ b/eiffellib/publishers/eiffel_publisher.py @@ -1,4 +1,4 @@ -# Copyright 2020 Axis Communications AB. +# Copyright 2020-2022 Axis Communications AB. # # For a full list of individual contributors, please see the commit history. # @@ -46,6 +46,15 @@ def send(self, msg): """ raise NotImplementedError + def wait_for_unpublished_events(timeout=60): + """Wait for all unpublished events to become published. + + :raises TimeoutError: If the timeout is reached, this will be raised. + :param timeout: A timeout, in seconds, to wait before exiting. + :type timeout: int + """ + raise NotImplementedError + def close(self): """Close down publisher. Override if special actions are required.""" self.running = False diff --git a/eiffellib/publishers/rabbitmq_publisher.py b/eiffellib/publishers/rabbitmq_publisher.py index aa6e7fd..4789a1d 100644 --- a/eiffellib/publishers/rabbitmq_publisher.py +++ b/eiffellib/publishers/rabbitmq_publisher.py @@ -1,4 +1,4 @@ -# Copyright 2020-2021 Axis Communications AB. +# Copyright 2020-2022 Axis Communications AB. # # For a full list of individual contributors, please see the commit history. # @@ -147,6 +147,27 @@ def _confirm_delivery(self, method_frame): '%i were acked and %i were nacked', self._acks+self._nacks, len(self._deliveries), self._acks, self._nacks) + def wait_for_unpublished_events(self, timeout=60): + """Wait for all unpublished events to become published. + + For the RabbitMQ publisher an event becomes published if the + broker (not the consumer) responds with an ACK. + + :raises TimeoutError: If the timeout is reached, this will be raised. + :param timeout: A timeout, in seconds, to wait before exiting. + :type timeout: int + """ + end = time.time() + timeout + deliveries = 0 + while time.time() < end: + time.sleep(0.1) + deliveries = len(self._deliveries) + len(self._nacked_deliveries) + if deliveries == 0: + break + else: + raise TimeoutError("Timeout (%0.2fs) while waiting for events to publish" + " (%d still unpublished)" % (timeout, deliveries)) + def send_event(self, event, block=True): """Validate and send an eiffel event to the rabbitmq server.