Skip to content

Update rabbitmq_publisher.py #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 17, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions eiffellib/publishers/rabbitmq_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ def _resend_nacked_deliveries(self):
return

# If we cannot acquire the lock here, retry later otherwise call the send_event method.
_LOG.debug(f"[{current_thread().getName()}] Attempting to acquire '_resend_nacked_deliveries' lock")
_LOG.debug(f"[{current_thread().name}] Attempting to acquire '_resend_nacked_deliveries' lock")
acquired = self._lock.acquire(blocking=False)
if not acquired:
_LOG.debug(f"[{current_thread().getName()}] '_resend_nacked_deliveries' Locked")
_LOG.debug(f"[{current_thread().name}] '_resend_nacked_deliveries' Locked")
self._connection.ioloop.call_later(1, self._resend_nacked_deliveries)
return
try:
_LOG.debug(f"[{current_thread().getName()}] '_resend_nacked_deliveries' Lock acquired")
_LOG.debug(f"[{current_thread().name}] '_resend_nacked_deliveries' Lock acquired")
deliveries = self._nacked_deliveries.copy()
if deliveries:
_LOG.info("Resending %i NACKed deliveries", len(deliveries))
Expand All @@ -135,7 +135,7 @@ def _resend_nacked_deliveries(self):
time.sleep(0.1) # Make sure we don't hog too much CPU.
finally:
self._lock.release()
_LOG.debug(f"[{current_thread().getName()}] '_resend_nacked_deliveries' Lock released")
_LOG.debug(f"[{current_thread().name}] '_resend_nacked_deliveries' Lock released")
self._connection.ioloop.call_later(1, self._resend_nacked_deliveries)

def _confirm_delivery(self, method_frame):
Expand All @@ -158,9 +158,9 @@ def _confirm_delivery(self, method_frame):

# Since _resend_nacked_deliveries runs in a thread we must protect this
# part that modifies class attributes.
_LOG.debug(f"[{current_thread().getName()}] Attempting to acquire '_confirm_delivery' lock")
_LOG.debug(f"[{current_thread().name}] Attempting to acquire '_confirm_delivery' lock")
with self._lock:
_LOG.debug(f"[{current_thread().getName()}] '_confirm_delivery' Lock acquired")
_LOG.debug(f"[{current_thread().name}] '_confirm_delivery' Lock acquired")
if confirmation_type == 'ack':
self._acks += number_of_acks
elif confirmation_type == 'nack':
Expand All @@ -183,7 +183,7 @@ def _confirm_delivery(self, method_frame):
_LOG.debug('Published %i messages, %i have yet to be confirmed, '
'%i were acked and %i were nacked', self._acks+self._nacks,
len(self._deliveries), self._acks, self._nacks)
_LOG.debug(f"[{current_thread().getName()}] '_confirm_delivery' Lock released")
_LOG.debug(f"[{current_thread().name}] '_confirm_delivery' Lock released")

def wait_for_unpublished_events(self, timeout=60):
"""Wait for all unpublished events to become published.
Expand Down Expand Up @@ -246,9 +246,9 @@ def send_event(self, event, block=True):
event.validate()
routing_key = self.routing_key or event.routing_key

_LOG.debug(f"[{current_thread().getName()}] Attempting to acquire 'send_event' lock")
_LOG.debug(f"[{current_thread().name}] Attempting to acquire 'send_event' lock")
with self._lock:
_LOG.debug(f"[{current_thread().getName()}] 'send_event' Lock acquired")
_LOG.debug(f"[{current_thread().name}] 'send_event' Lock acquired")
try:
self._channel.basic_publish(
self.exchange,
Expand All @@ -261,6 +261,6 @@ def send_event(self, event, block=True):
return
self._delivered += 1
self._deliveries[self._delivered] = event
_LOG.debug(f"[{current_thread().getName()}] 'send_event' Lock released")
_LOG.debug(f"[{current_thread().name}] 'send_event' Lock released")

send = send_event