Skip to content

Added amqp, fixed examples, lint, and tests #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: candidate-0.9.0
Choose a base branch
from

Conversation

andrewfayres
Copy link
Collaborator

I've got some lint stuff to clean up still that is being a hassle. Other than that I think everything is ready. Took me a bit longer to fix all the tests, examples, and stuff than I expected.

@@ -116,7 +286,43 @@ def publish(self, topic: str, payload: bytes, persist: bool) -> None:
if it should be removed immediately (False)
"""
# NOTE: RabbitMQ only works with QOS of 1 and 0, and seems to convert QOS2 to QOS1
self._connection.publish(topic, payload, qos=2 if persist else 0)
if self._connection and self.is_connected():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an improvement over the current code (since the current code can potentially try to call this function while the connection is down, leading to an exception being thrown), but I was thinking that it may make more sense to have an event flag in the MQTT client which can wait for the broker to reconnect and then send the message, instead of simply dropping the message if not connected at the instant this conditional is checked.

Comment on lines +653 to +656
# Resubscribe to all topics to ensure queues and bindings are re-established
for topic, _ in self._topics_to_handlers().items():
logger.info(f'Resubscribing to topic: {topic}')
self.subscribe(topic, True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did the resubscription check in this callback fix anything? I would think you would only want to do this in _on_input_channel_open, because technically the output channel can start publishing events immediately once it verifies the exchange.

(when we start using the registry service approach, we can do exchange_declare(..., passive=True))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants