Skip to content

Add a method for waiting for unpublished events #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Publishing an event
ACTIVITY_TRIGGERED = EiffelActivityTriggeredEvent()
ACTIVITY_TRIGGERED.data.add("name", "Test activity")
PUBLISHER.send_event(ACTIVITY_TRIGGERED)
PUBLISHER.wait_for_unpublished_events()

Deprecation of routing key
--------------------------
Expand Down
6 changes: 2 additions & 4 deletions docs/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ This code snippet will subscribe to an ActivityStarted in order to publish an Ac

.. code-block:: python

import time
from eiffellib.subscribers import RabbitMQSubscriber
from eiffellib.publishers import RabbitMQPublisher
from eiffellib.events import (EiffelActivityTriggeredEvent,
Expand Down Expand Up @@ -68,7 +67,7 @@ This code snippet will subscribe to an ActivityStarted in order to publish an Ac
PUBLISHER.send_event(ACTIVITY_STARTED)

# Wait for event to be received by 'callback'.
time.sleep(1)
PUBLISHER.wait_for_unpublished_events()

Activity
--------
Expand All @@ -80,7 +79,6 @@ An activity is just a callable which will send ActivityTriggered, Started and Fi
.. code-block:: python

import os
import time
from eiffellib.subscribers import RabbitMQSubscriber
from eiffellib.publishers import RabbitMQPublisher
from eiffellib.events import EiffelAnnouncementPublishedEvent
Expand Down Expand Up @@ -117,4 +115,4 @@ An activity is just a callable which will send ActivityTriggered, Started and Fi
PUBLISHER.send_event(ANNOUNCEMENT)

# Wait for event to be received by 'callback'.
time.sleep(1)
PUBLISHER.wait_for_unpublished_events()
11 changes: 10 additions & 1 deletion eiffellib/publishers/eiffel_publisher.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 Axis Communications AB.
# Copyright 2020-2022 Axis Communications AB.
#
# For a full list of individual contributors, please see the commit history.
#
Expand Down Expand Up @@ -46,6 +46,15 @@ def send(self, msg):
"""
raise NotImplementedError

def wait_for_unpublished_events(timeout=60):
"""Wait for all unpublished events to become published.

:raises TimeoutError: If the timeout is reached, this will be raised.
:param timeout: A timeout, in seconds, to wait before exiting.
:type timeout: int
"""
raise NotImplementedError

def close(self):
"""Close down publisher. Override if special actions are required."""
self.running = False
23 changes: 22 additions & 1 deletion eiffellib/publishers/rabbitmq_publisher.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020-2021 Axis Communications AB.
# Copyright 2020-2022 Axis Communications AB.
#
# For a full list of individual contributors, please see the commit history.
#
Expand Down Expand Up @@ -147,6 +147,27 @@ def _confirm_delivery(self, method_frame):
'%i were acked and %i were nacked', self._acks+self._nacks,
len(self._deliveries), self._acks, self._nacks)

def wait_for_unpublished_events(self, timeout=60):
"""Wait for all unpublished events to become published.

For the RabbitMQ publisher an event becomes published if the
broker (not the consumer) responds with an ACK.

:raises TimeoutError: If the timeout is reached, this will be raised.
:param timeout: A timeout, in seconds, to wait before exiting.
:type timeout: int
"""
end = time.time() + timeout
deliveries = 0
while time.time() < end:
time.sleep(0.1)
deliveries = len(self._deliveries) + len(self._nacked_deliveries)
if deliveries == 0:
break
else:
raise TimeoutError("Timeout (%0.2fs) while waiting for events to publish"
" (%d still unpublished)" % (timeout, deliveries))

def send_event(self, event, block=True):
"""Validate and send an eiffel event to the rabbitmq server.

Expand Down