-
Notifications
You must be signed in to change notification settings - Fork 429
feat(data-classes): ActiveMQ and RabbitMQ support #770
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
3add524
feat(data-classes): Amazon MQ as an event source for AWS Lambda
michaelbrewer 7c8a205
chore: add more properties
michaelbrewer d7b84dd
chore: add some missing docs
michaelbrewer 40a13a5
chore: add rabbitmq data_class
michaelbrewer 30b5e31
tests: add missing tests
michaelbrewer 914c83e
docs: add docs for amazon mq
michaelbrewer a6df24d
Merge branch 'develop' into feat/amazon-mq
michaelbrewer 0dd2489
Merge branch 'develop' into feat/amazon-mq
michaelbrewer 4096b20
docs: fix typo; add highlight
heitorlessa File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
125 changes: 125 additions & 0 deletions
125
aws_lambda_powertools/utilities/data_classes/active_mq_event.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
import base64 | ||
import json | ||
from typing import Any, Iterator, Optional | ||
|
||
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper | ||
|
||
|
||
class ActiveMQMessage(DictWrapper): | ||
@property | ||
def message_id(self) -> str: | ||
"""Unique identifier for the message""" | ||
return self["messageID"] | ||
|
||
@property | ||
def message_type(self) -> str: | ||
return self["messageType"] | ||
|
||
@property | ||
def data(self) -> str: | ||
return self["data"] | ||
|
||
@property | ||
def decoded_data(self) -> str: | ||
"""Decodes the data as a str""" | ||
return base64.b64decode(self.data.encode()).decode() | ||
|
||
@property | ||
def json_data(self) -> Any: | ||
"""Parses the data as json""" | ||
return json.loads(self.decoded_data) | ||
|
||
@property | ||
def connection_id(self) -> str: | ||
return self["connectionId"] | ||
|
||
@property | ||
def redelivered(self) -> bool: | ||
"""true if the message is being resent to the consumer""" | ||
return self["redelivered"] | ||
|
||
@property | ||
def timestamp(self) -> int: | ||
"""Time in milliseconds.""" | ||
return self["timestamp"] | ||
|
||
@property | ||
def broker_in_time(self) -> int: | ||
"""Time stamp (in milliseconds) for when the message arrived at the broker.""" | ||
return self["brokerInTime"] | ||
|
||
@property | ||
def broker_out_time(self) -> int: | ||
"""Time stamp (in milliseconds) for when the message left the broker.""" | ||
return self["brokerOutTime"] | ||
|
||
@property | ||
def destination_physicalname(self) -> str: | ||
return self["destination"]["physicalname"] | ||
|
||
@property | ||
def delivery_mode(self) -> Optional[int]: | ||
"""persistent or non-persistent delivery""" | ||
return self.get("deliveryMode") | ||
|
||
@property | ||
def correlation_id(self) -> Optional[str]: | ||
"""User defined correlation id""" | ||
return self.get("correlationID") | ||
|
||
@property | ||
def reply_to(self) -> Optional[str]: | ||
"""User defined reply to""" | ||
return self.get("replyTo") | ||
|
||
@property | ||
def get_type(self) -> Optional[str]: | ||
"""User defined message type""" | ||
return self.get("type") | ||
|
||
@property | ||
def expiration(self) -> Optional[int]: | ||
"""Expiration attribute whose value is given in milliseconds""" | ||
return self.get("expiration") | ||
|
||
@property | ||
def priority(self) -> Optional[int]: | ||
""" | ||
JMS defines a ten-level priority value, with 0 as the lowest priority and 9 | ||
as the highest. In addition, clients should consider priorities 0-4 as | ||
gradations of normal priority and priorities 5-9 as gradations of expedited | ||
priority. | ||
|
||
JMS does not require that a provider strictly implement priority ordering | ||
of messages; however, it should do its best to deliver expedited messages | ||
ahead of normal messages. | ||
""" | ||
return self.get("priority") | ||
|
||
|
||
class ActiveMQEvent(DictWrapper): | ||
"""Represents an Active MQ event sent to Lambda | ||
|
||
Documentation: | ||
-------------- | ||
- https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html | ||
- https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/ | ||
""" | ||
|
||
@property | ||
def event_source(self) -> str: | ||
return self["eventSource"] | ||
|
||
@property | ||
def event_source_arn(self) -> str: | ||
"""The Amazon Resource Name (ARN) of the event source""" | ||
return self["eventSourceArn"] | ||
|
||
@property | ||
def messages(self) -> Iterator[ActiveMQMessage]: | ||
for record in self["messages"]: | ||
yield ActiveMQMessage(record) | ||
|
||
@property | ||
def message(self) -> ActiveMQMessage: | ||
return next(self.messages) |
121 changes: 121 additions & 0 deletions
121
aws_lambda_powertools/utilities/data_classes/rabbit_mq_event.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
import base64 | ||
import json | ||
from typing import Any, Dict, List | ||
|
||
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper | ||
|
||
|
||
class BasicProperties(DictWrapper): | ||
@property | ||
def content_type(self) -> str: | ||
return self["contentType"] | ||
|
||
@property | ||
def content_encoding(self) -> str: | ||
return self["contentEncoding"] | ||
|
||
@property | ||
def headers(self) -> Dict[str, Any]: | ||
return self["headers"] | ||
|
||
@property | ||
def delivery_mode(self) -> int: | ||
return self["deliveryMode"] | ||
|
||
@property | ||
def priority(self) -> int: | ||
return self["priority"] | ||
|
||
@property | ||
def correlation_id(self) -> str: | ||
return self["correlationId"] | ||
|
||
@property | ||
def reply_to(self) -> str: | ||
return self["replyTo"] | ||
|
||
@property | ||
def expiration(self) -> str: | ||
return self["expiration"] | ||
|
||
@property | ||
def message_id(self) -> str: | ||
return self["messageId"] | ||
|
||
@property | ||
def timestamp(self) -> str: | ||
return self["timestamp"] | ||
|
||
@property | ||
def get_type(self) -> str: | ||
return self["type"] | ||
|
||
@property | ||
def user_id(self) -> str: | ||
return self["userId"] | ||
|
||
@property | ||
def app_id(self) -> str: | ||
return self["appId"] | ||
|
||
@property | ||
def cluster_id(self) -> str: | ||
return self["clusterId"] | ||
|
||
@property | ||
def body_size(self) -> int: | ||
return self["bodySize"] | ||
|
||
|
||
class RabbitMessage(DictWrapper): | ||
@property | ||
def basic_properties(self) -> BasicProperties: | ||
return BasicProperties(self["basicProperties"]) | ||
|
||
@property | ||
def redelivered(self) -> bool: | ||
return self["redelivered"] | ||
|
||
@property | ||
def data(self) -> str: | ||
return self["data"] | ||
|
||
@property | ||
def decoded_data(self) -> str: | ||
"""Decodes the data as a str""" | ||
return base64.b64decode(self.data.encode()).decode() | ||
|
||
@property | ||
def json_data(self) -> Any: | ||
"""Parses the data as json""" | ||
return json.loads(self.decoded_data) | ||
|
||
|
||
class RabbitMQEvent(DictWrapper): | ||
"""Represents a Rabbit MQ event sent to Lambda | ||
|
||
Documentation: | ||
-------------- | ||
- https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html | ||
- https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/ | ||
""" | ||
|
||
def __init__(self, data: Dict[str, Any]): | ||
super().__init__(data) | ||
self._rmq_messages_by_queue = { | ||
key: [RabbitMessage(message) for message in messages] | ||
for key, messages in self["rmqMessagesByQueue"].items() | ||
} | ||
|
||
@property | ||
def event_source(self) -> str: | ||
return self["eventSource"] | ||
|
||
@property | ||
def event_source_arn(self) -> str: | ||
"""The Amazon Resource Name (ARN) of the event source""" | ||
return self["eventSourceArn"] | ||
|
||
@property | ||
def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]: | ||
return self._rmq_messages_by_queue |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
{ | ||
"eventSource": "aws:amq", | ||
"eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", | ||
"messages": [ | ||
{ | ||
"messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", | ||
"messageType": "jms/text-message", | ||
"data": "QUJDOkFBQUE=", | ||
"connectionId": "myJMSCoID", | ||
"redelivered": false, | ||
"destination": { | ||
"physicalname": "testQueue" | ||
}, | ||
"timestamp": 1598827811958, | ||
"brokerInTime": 1598827811958, | ||
"brokerOutTime": 1598827811959 | ||
}, | ||
{ | ||
"messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", | ||
"messageType": "jms/text-message", | ||
"data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==", | ||
"connectionId": "myJMSCoID2", | ||
"redelivered": false, | ||
"destination": { | ||
"physicalname": "testQueue" | ||
}, | ||
"timestamp": 1598827811958, | ||
"brokerInTime": 1598827811958, | ||
"brokerOutTime": 1598827811959 | ||
}, | ||
{ | ||
"messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", | ||
"messageType": "jms/bytes-message", | ||
"data": "3DTOOW7crj51prgVLQaGQ82S48k=", | ||
"connectionId": "myJMSCoID1", | ||
"persistent": false, | ||
"destination": { | ||
"physicalname": "testQueue" | ||
}, | ||
"timestamp": 1598827811958, | ||
"brokerInTime": 1598827811958, | ||
"brokerOutTime": 1598827811959 | ||
} | ||
] | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.