From ec35c7c6f64878347b784c7c4f7d04d75852908f Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Thu, 22 Sep 2022 23:20:07 -0700 Subject: [PATCH 01/16] add KinesisFirehoseEvent data class --- .../utilities/data_classes/__init__.py | 2 + .../data_classes/kinesis_firehose_event.py | 108 ++++++++++++++++++ 2 files changed, 110 insertions(+) create mode 100644 aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py diff --git a/aws_lambda_powertools/utilities/data_classes/__init__.py b/aws_lambda_powertools/utilities/data_classes/__init__.py index 8ed77f9f3a3..2aa2021ed1e 100644 --- a/aws_lambda_powertools/utilities/data_classes/__init__.py +++ b/aws_lambda_powertools/utilities/data_classes/__init__.py @@ -13,6 +13,7 @@ from .event_bridge_event import EventBridgeEvent from .event_source import event_source from .kafka_event import KafkaEvent +from .kinesis_firehose_event import KinesisFirehoseEvent from .kinesis_stream_event import KinesisStreamEvent from .lambda_function_url_event import LambdaFunctionUrlEvent from .s3_event import S3Event @@ -32,6 +33,7 @@ "DynamoDBStreamEvent", "EventBridgeEvent", "KafkaEvent", + "KinesisFirehoseEvent", "KinesisStreamEvent", "LambdaFunctionUrlEvent", "S3Event", diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py new file mode 100644 index 00000000000..8ac4416df2c --- /dev/null +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -0,0 +1,108 @@ +import base64 +import json +from typing import Iterator + +from aws_lambda_powertools.utilities.data_classes.common import DictWrapper + + +class KinesisFirehoseRecordMetadata(DictWrapper): + @property + def shard_id(self) -> str: + """Kinesis stream shard ID; present only when Kinesis Stream is source""" + return self.get("shardId") + + @property + def partition_key(self) -> str: + """Kinesis stream partition key; present only when Kinesis Stream is source""" + return self.get("partitionKey") + + @property + def approximate_arrival_timestamp(self) -> str: + """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source""" + return self.get("approximateArrivalTimestamp") + + @property + def sequence_number(self) -> str: + """Kinesis stream sequence number; present only when Kinesis Stream is source""" + return self.get("sequenceNumber") + + @property + def subsequence_number(self) -> str: + """Kinesis stream sub-sequence number; present only when Kinesis Stream is source + + Note: this will only be present for Kinesis streams using record aggregation + """ + return self.get("subsequenceNumber") + + +class KinesisFirehoseRecord(DictWrapper): + @property + def approximate_arrival_timestamp(self) -> float: + """The approximate time that the record was inserted into the delivery stream""" + return float(self["approximateArrivalTimestamp"]) + + @property + def record_id(self) -> str: + """Record ID; uniquely identifies this record within the current batch""" + return self["recordId"] + + @property + def data(self) -> str: + """The data blob, base64-encoded""" + return self["data"] + + @property + def metadata(self) -> KinesisFirehoseRecordMetadata: + """Optional: metadata associated with this record; present only when Kinesis Stream is source""" + return KinesisFirehoseRecordMetadata(self.get('kinesisRecordMetadata', {})) + + @property + def data_as_bytes(self) -> bytes: + """Decoded base64-encoded data as bytes""" + return base64.b64decode(self.data) + + @property + def data_as_text(self) -> str: + """Decoded base64-encoded data as text""" + return self.data_as_bytes.decode("utf-8") + + @property + def data_as_json(self) -> dict: + """Decoded base64-encoded data loaded to json""" + if self._json_data is None: + self._json_data = json.loads(self.data_as_text) + return self._json_data + + +class KinesisFirehoseEvent(DictWrapper): + """Kinesis Data Firehose event + + Documentation: + -------------- + - https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html + """ + + @property + def invocation_id(self) -> str: + """Unique ID for for Lambda invocation""" + return self["invocationId"] + + @property + def delivery_stream_arn(self) -> str: + """ARN of the Firehose Data Firehose Delivery Stream""" + return self["deliveryStreamArn"] + + @property + def source_kinesis_stream_arn(self) -> str: + """ARN of the Kinesis Stream; present only when Kinesis Stream is source""" + return self.get("sourceKinesisStreamArn") + + @property + def region(self) -> str: + """AWS region where the event originated eg: us-east-1""" + return self["region"] + + @property + def records(self) -> Iterator[KinesisFirehoseRecord]: + for record in self["records"]: + yield KinesisFirehoseRecord(record) From ab36af5c9ef8c00650da268e393b006f77563508 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Thu, 22 Sep 2022 23:20:31 -0700 Subject: [PATCH 02/16] add tests for KinesisFirehoseEvent --- tests/events/kinesisFirehoseEvent.json | 31 ++++++++++++++++++++++++++ tests/functional/test_data_classes.py | 25 +++++++++++++++++++++ 2 files changed, 56 insertions(+) create mode 100644 tests/events/kinesisFirehoseEvent.json diff --git a/tests/events/kinesisFirehoseEvent.json b/tests/events/kinesisFirehoseEvent.json new file mode 100644 index 00000000000..0a0cd4b711b --- /dev/null +++ b/tests/events/kinesisFirehoseEvent.json @@ -0,0 +1,31 @@ +{ + "invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a", + "deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name", + "region": "us-east-2", + "records": [ + { + "data": "SGVsbG8gV29ybGQ=", + "recordId": "record1", + "approximateArrivalTimestamp": 1510772160000, + "kinesisRecordMetadata": { + "shardId": "shardId-000000000000", + "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a", + "approximateArrivalTimestamp": "2012-04-23T18:25:43.511Z", + "sequenceNumber": "49546986683135544286507457936321625675700192471156785154", + "subsequenceNumber": "" + } + }, + { + "data": "eyJIZWxsbyI6ICJXb3JsZCJ9", + "recordId": "record2", + "approximateArrivalTimestamp": 151077216000, + "kinesisRecordMetadata": { + "shardId": "shardId-000000000001", + "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a", + "approximateArrivalTimestamp": "2012-04-23T19:25:43.511Z", + "sequenceNumber": "49546986683135544286507457936321625675700192471156785155", + "subsequenceNumber": "" + } + } + ] +} diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py index dbef57162e2..378c139590f 100644 --- a/tests/functional/test_data_classes.py +++ b/tests/functional/test_data_classes.py @@ -18,6 +18,7 @@ CodePipelineJobEvent, EventBridgeEvent, KafkaEvent, + KinesisFirehoseEvent, KinesisStreamEvent, S3Event, SESEvent, @@ -1201,6 +1202,30 @@ def test_kafka_self_managed_event(): assert record.get_header_value("HeaderKey", case_sensitive=False) == b"headerValue" +def test_kinesis_firehose_event(): + event = KinesisFirehoseEvent(load_event("kinesisFirehoseEvent.json")) + + assert event.region == "us-east-2" + assert event.invocation_id == "2b4d1ad9-2f48-94bd-a088-767c317e994a" + assert event.delivery_stream_arn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name" + assert event.source_kinesis_stream_arn is None + + records = list(event.records) + assert len(records) == 2 + record_01, record_02 = records[:] + + assert record_01.approximate_arrival_timestamp == 1510772160000 + assert record_01.record_id == "record1" + assert record_01.data == "SGVsbG8gV29ybGQ=" + assert record_01.data_as_bytes == b'Hello World' + assert record_01.data_as_text == 'Hello World' + + assert record_02.data == "eyJIZWxsbyI6ICJXb3JsZCJ9" + assert record_02.data_as_bytes == b'{"Hello": "World"}' + assert record_02.data_as_text == '{"Hello": "World"}' + assert record_02.data_as_json == {"Hello": "World"} + + def test_kinesis_stream_event(): event = KinesisStreamEvent(load_event("kinesisStreamEvent.json")) From 4855551a9e50833a8cd161d7204f233354679d6e Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Thu, 22 Sep 2022 23:20:40 -0700 Subject: [PATCH 03/16] update docs for KinesisFirehoseEvent --- docs/utilities/data_classes.md | 40 ++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 67d821fe04f..4061571fca4 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -76,6 +76,7 @@ Event Source | Data_class [DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName` [EventBridge](#eventbridge) | `EventBridgeEvent` [Kafka](#kafka) | `KafkaEvent` +[Kinesis Firehose Delivery Stream](#kinesis-firehose-delivery-stream) | `KinesisFirehoseEvent` [Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent` [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` @@ -892,6 +893,45 @@ or plain text, depending on the original payload. do_something_with(data) ``` +### Kinesis Firehose Delivery Stream + +Kinesis Firehose Data Transformation can use a Lambda Function to modify the records +inline, and re-emit them back to the Delivery Stream. + +Similar to Kinesis Data Streams, the events contain base64 encoded data. You can use the helper +function to access the data either as json or plain text, depending on the original payload. + +=== "app.py" + + ```python + import base64 + import json + from aws_lambda_powertools.utilities.data_classes import event_source, KinesisFirehoseEvent + + @event_source(data_class=KinesisFirehoseEvent) + def lambda_handler(event: KinesisFirehoseEvent, context): + result = [] + for rec in event.records: + # if data was delivered as json; caches loaded value + data = kinesis_firehose_record.data_as_json + + # or swap for below if data was delivered as text + # data = kinesis_firehose_record.data_as_text + + modified_record = do_sometime(data) + + firehose_record_output = { + "recordId": rec.record_id, + "data": base64.b64encode(json.dump(modified_record).encode('utf-8')), + "result": "Ok" + } + + result.append(firehose_record_output) + + # return transformed records + return = {'records': result} + ``` + ### Lambda Function URL === "app.py" From c9b93fc95470ef989a52bebb30ed714c8c9e67da Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 23 Sep 2022 10:00:37 -0700 Subject: [PATCH 04/16] mypy fixes --- .../data_classes/kinesis_firehose_event.py | 18 +++++++++--------- docs/utilities/data_classes.md | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 8ac4416df2c..3e296935bfd 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -1,33 +1,33 @@ import base64 import json -from typing import Iterator +from typing import Iterator, Optional from aws_lambda_powertools.utilities.data_classes.common import DictWrapper class KinesisFirehoseRecordMetadata(DictWrapper): @property - def shard_id(self) -> str: + def shard_id(self) -> Optional[str]: """Kinesis stream shard ID; present only when Kinesis Stream is source""" return self.get("shardId") @property - def partition_key(self) -> str: + def partition_key(self) -> Optional[str] """Kinesis stream partition key; present only when Kinesis Stream is source""" return self.get("partitionKey") @property - def approximate_arrival_timestamp(self) -> str: + def approximate_arrival_timestamp(self) -> Optional[str] """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source""" return self.get("approximateArrivalTimestamp") @property - def sequence_number(self) -> str: + def sequence_number(self) -> Optional[str] """Kinesis stream sequence number; present only when Kinesis Stream is source""" return self.get("sequenceNumber") @property - def subsequence_number(self) -> str: + def subsequence_number(self) -> Optional[str] """Kinesis stream sub-sequence number; present only when Kinesis Stream is source Note: this will only be present for Kinesis streams using record aggregation @@ -50,9 +50,9 @@ def record_id(self) -> str: def data(self) -> str: """The data blob, base64-encoded""" return self["data"] - + @property - def metadata(self) -> KinesisFirehoseRecordMetadata: + def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]: """Optional: metadata associated with this record; present only when Kinesis Stream is source""" return KinesisFirehoseRecordMetadata(self.get('kinesisRecordMetadata', {})) @@ -93,7 +93,7 @@ def delivery_stream_arn(self) -> str: return self["deliveryStreamArn"] @property - def source_kinesis_stream_arn(self) -> str: + def source_kinesis_stream_arn(self) -> Optional[str]: """ARN of the Kinesis Stream; present only when Kinesis Stream is source""" return self.get("sourceKinesisStreamArn") diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 4061571fca4..27017cd5716 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -918,7 +918,7 @@ function to access the data either as json or plain text, depending on the origi # or swap for below if data was delivered as text # data = kinesis_firehose_record.data_as_text - modified_record = do_sometime(data) + modified_record = do_sometime_with(data) firehose_record_output = { "recordId": rec.record_id, From 642734f0ebd95235331aa9fb0eddf9893b1ca21e Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 23 Sep 2022 13:31:30 -0700 Subject: [PATCH 05/16] fixup mypy --- .../data_classes/kinesis_firehose_event.py | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 3e296935bfd..4d5454cfc20 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -6,33 +6,38 @@ class KinesisFirehoseRecordMetadata(DictWrapper): + @property + def _metadata(self) -> dict: + """Optional: metadata associated with this record; present only when Kinesis Stream is source""" + return self["kinesisRecordMetadata"] # could raise KeyError + @property def shard_id(self) -> Optional[str]: """Kinesis stream shard ID; present only when Kinesis Stream is source""" - return self.get("shardId") + return self._metadata.get("shardId") @property - def partition_key(self) -> Optional[str] + def partition_key(self) -> Optional[str]: """Kinesis stream partition key; present only when Kinesis Stream is source""" - return self.get("partitionKey") + return self._metadata.get("partitionKey") @property - def approximate_arrival_timestamp(self) -> Optional[str] + def approximate_arrival_timestamp(self) -> Optional[str]: """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source""" - return self.get("approximateArrivalTimestamp") + return self._metadata.get("approximateArrivalTimestamp") @property - def sequence_number(self) -> Optional[str] + def sequence_number(self) -> Optional[str]: """Kinesis stream sequence number; present only when Kinesis Stream is source""" - return self.get("sequenceNumber") + return self._metadata.get("sequenceNumber") @property - def subsequence_number(self) -> Optional[str] + def subsequence_number(self) -> Optional[str]: """Kinesis stream sub-sequence number; present only when Kinesis Stream is source - + Note: this will only be present for Kinesis streams using record aggregation """ - return self.get("subsequenceNumber") + return self._metadata.get("subsequenceNumber") class KinesisFirehoseRecord(DictWrapper): @@ -40,7 +45,7 @@ class KinesisFirehoseRecord(DictWrapper): def approximate_arrival_timestamp(self) -> float: """The approximate time that the record was inserted into the delivery stream""" return float(self["approximateArrivalTimestamp"]) - + @property def record_id(self) -> str: """Record ID; uniquely identifies this record within the current batch""" @@ -52,9 +57,9 @@ def data(self) -> str: return self["data"] @property - def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]: + def metadata(self) -> KinesisFirehoseRecordMetadata: """Optional: metadata associated with this record; present only when Kinesis Stream is source""" - return KinesisFirehoseRecordMetadata(self.get('kinesisRecordMetadata', {})) + return KinesisFirehoseRecordMetadata(self._data) @property def data_as_bytes(self) -> bytes: From ebe6714d5febc4beabe38c65a0387ecfaa9566bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BAben=20Fonseca?= Date: Tue, 27 Sep 2022 16:04:15 +0100 Subject: [PATCH 06/16] fix(docs): move firehose sample to examples --- docs/utilities/data_classes.md | 29 ++----------------- .../src/kinesis_firehose_delivery_stream.py | 25 ++++++++++++++++ tests/functional/test_data_classes.py | 12 ++++---- 3 files changed, 33 insertions(+), 33 deletions(-) create mode 100644 examples/event_sources/src/kinesis_firehose_delivery_stream.py diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 27017cd5716..bb9614814a4 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -893,7 +893,7 @@ or plain text, depending on the original payload. do_something_with(data) ``` -### Kinesis Firehose Delivery Stream +### Kinesis Firehose delivery stream Kinesis Firehose Data Transformation can use a Lambda Function to modify the records inline, and re-emit them back to the Delivery Stream. @@ -904,32 +904,7 @@ function to access the data either as json or plain text, depending on the origi === "app.py" ```python - import base64 - import json - from aws_lambda_powertools.utilities.data_classes import event_source, KinesisFirehoseEvent - - @event_source(data_class=KinesisFirehoseEvent) - def lambda_handler(event: KinesisFirehoseEvent, context): - result = [] - for rec in event.records: - # if data was delivered as json; caches loaded value - data = kinesis_firehose_record.data_as_json - - # or swap for below if data was delivered as text - # data = kinesis_firehose_record.data_as_text - - modified_record = do_sometime_with(data) - - firehose_record_output = { - "recordId": rec.record_id, - "data": base64.b64encode(json.dump(modified_record).encode('utf-8')), - "result": "Ok" - } - - result.append(firehose_record_output) - - # return transformed records - return = {'records': result} + --8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py" ``` ### Lambda Function URL diff --git a/examples/event_sources/src/kinesis_firehose_delivery_stream.py b/examples/event_sources/src/kinesis_firehose_delivery_stream.py new file mode 100644 index 00000000000..67bf53dfe06 --- /dev/null +++ b/examples/event_sources/src/kinesis_firehose_delivery_stream.py @@ -0,0 +1,25 @@ +import base64 +import json + +from aws_lambda_powertools.utilities.data_classes import KinesisFirehoseEvent, event_source +from aws_lambda_powertools.utilities.typing import LambdaContext + + +@event_source(data_class=KinesisFirehoseEvent) +def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): + result = [] + + for record in event.records: + # if data was delivered as json; caches loaded value + data = record.data_as_json + + processed_record = { + "recordId": record.record_id, + "data": base64.b64encode(json.dumps(data).encode("utf-8")), + "result": "Ok", + } + + result.append(processed_record) + + # return transformed records + return {"records": result} diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py index 378c139590f..2b55aaaaafa 100644 --- a/tests/functional/test_data_classes.py +++ b/tests/functional/test_data_classes.py @@ -1204,22 +1204,22 @@ def test_kafka_self_managed_event(): def test_kinesis_firehose_event(): event = KinesisFirehoseEvent(load_event("kinesisFirehoseEvent.json")) - + assert event.region == "us-east-2" assert event.invocation_id == "2b4d1ad9-2f48-94bd-a088-767c317e994a" assert event.delivery_stream_arn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name" assert event.source_kinesis_stream_arn is None - + records = list(event.records) assert len(records) == 2 record_01, record_02 = records[:] - + assert record_01.approximate_arrival_timestamp == 1510772160000 assert record_01.record_id == "record1" assert record_01.data == "SGVsbG8gV29ybGQ=" - assert record_01.data_as_bytes == b'Hello World' - assert record_01.data_as_text == 'Hello World' - + assert record_01.data_as_bytes == b"Hello World" + assert record_01.data_as_text == "Hello World" + assert record_02.data == "eyJIZWxsbyI6ICJXb3JsZCJ9" assert record_02.data_as_bytes == b'{"Hello": "World"}' assert record_02.data_as_text == '{"Hello": "World"}' From 2416d8dd41b18588ee03e5d06a72a2a03857e24f Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Wed, 12 Oct 2022 17:23:04 +0100 Subject: [PATCH 07/16] Update aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py --- .../utilities/data_classes/kinesis_firehose_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 4d5454cfc20..57f14846441 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -42,7 +42,7 @@ def subsequence_number(self) -> Optional[str]: class KinesisFirehoseRecord(DictWrapper): @property - def approximate_arrival_timestamp(self) -> float: + def approximate_arrival_timestamp(self) -> int: """The approximate time that the record was inserted into the delivery stream""" return float(self["approximateArrivalTimestamp"]) From e8e641785a08cf920e15dc41309e252175520994 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Wed, 12 Oct 2022 17:23:14 +0100 Subject: [PATCH 08/16] Update aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py --- .../utilities/data_classes/kinesis_firehose_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 57f14846441..93f12d6b260 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -22,7 +22,7 @@ def partition_key(self) -> Optional[str]: return self._metadata.get("partitionKey") @property - def approximate_arrival_timestamp(self) -> Optional[str]: + def approximate_arrival_timestamp(self) -> int: """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source""" return self._metadata.get("approximateArrivalTimestamp") From 237f96dd3a308bffccb236c3fcb31e482c4487a6 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 14 Oct 2022 11:59:19 -0700 Subject: [PATCH 09/16] docs fix --- docs/utilities/data_classes.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index bb9614814a4..509110e0480 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -76,8 +76,8 @@ Event Source | Data_class [DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName` [EventBridge](#eventbridge) | `EventBridgeEvent` [Kafka](#kafka) | `KafkaEvent` -[Kinesis Firehose Delivery Stream](#kinesis-firehose-delivery-stream) | `KinesisFirehoseEvent` [Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent` +[Kinesis Firehose Delivery Stream](#kinesis-firehose-delivery-stream) | `KinesisFirehoseEvent` [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` [S3](#s3) | `S3Event` From a9b24dde03d32af2eed621c0dce2f798c625f99b Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 14 Oct 2022 12:02:45 -0700 Subject: [PATCH 10/16] optionals update --- .../data_classes/kinesis_firehose_event.py | 24 +++++++++---------- tests/events/kinesisFirehoseEvent.json | 4 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 93f12d6b260..5683902f9d0 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -12,39 +12,39 @@ def _metadata(self) -> dict: return self["kinesisRecordMetadata"] # could raise KeyError @property - def shard_id(self) -> Optional[str]: + def shard_id(self) -> str: """Kinesis stream shard ID; present only when Kinesis Stream is source""" - return self._metadata.get("shardId") + return self._metadata["shardId"] @property - def partition_key(self) -> Optional[str]: + def partition_key(self) -> str: """Kinesis stream partition key; present only when Kinesis Stream is source""" - return self._metadata.get("partitionKey") + return self._metadata["partitionKey"] @property def approximate_arrival_timestamp(self) -> int: """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source""" - return self._metadata.get("approximateArrivalTimestamp") + return self._metadata["approximateArrivalTimestamp"] @property - def sequence_number(self) -> Optional[str]: + def sequence_number(self) -> str: """Kinesis stream sequence number; present only when Kinesis Stream is source""" - return self._metadata.get("sequenceNumber") + return self._metadata["sequenceNumber"] @property - def subsequence_number(self) -> Optional[str]: + def subsequence_number(self) -> str: """Kinesis stream sub-sequence number; present only when Kinesis Stream is source Note: this will only be present for Kinesis streams using record aggregation """ - return self._metadata.get("subsequenceNumber") + return self._metadata["subsequenceNumber"] class KinesisFirehoseRecord(DictWrapper): @property def approximate_arrival_timestamp(self) -> int: """The approximate time that the record was inserted into the delivery stream""" - return float(self["approximateArrivalTimestamp"]) + return self["approximateArrivalTimestamp"] @property def record_id(self) -> str: @@ -57,9 +57,9 @@ def data(self) -> str: return self["data"] @property - def metadata(self) -> KinesisFirehoseRecordMetadata: + def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]: """Optional: metadata associated with this record; present only when Kinesis Stream is source""" - return KinesisFirehoseRecordMetadata(self._data) + return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None @property def data_as_bytes(self) -> bytes: diff --git a/tests/events/kinesisFirehoseEvent.json b/tests/events/kinesisFirehoseEvent.json index 0a0cd4b711b..840f2bf15d5 100644 --- a/tests/events/kinesisFirehoseEvent.json +++ b/tests/events/kinesisFirehoseEvent.json @@ -10,7 +10,7 @@ "kinesisRecordMetadata": { "shardId": "shardId-000000000000", "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a", - "approximateArrivalTimestamp": "2012-04-23T18:25:43.511Z", + "approximateArrivalTimestamp": 1510772160000, "sequenceNumber": "49546986683135544286507457936321625675700192471156785154", "subsequenceNumber": "" } @@ -22,7 +22,7 @@ "kinesisRecordMetadata": { "shardId": "shardId-000000000001", "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a", - "approximateArrivalTimestamp": "2012-04-23T19:25:43.511Z", + "approximateArrivalTimestamp": 1510772160000, "sequenceNumber": "49546986683135544286507457936321625675700192471156785155", "subsequenceNumber": "" } From cf13bb8ffde0658f2dce2e049a0f07e241efac6d Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 14 Oct 2022 12:08:06 -0700 Subject: [PATCH 11/16] test updates --- ...nesisFirehoseEvent.json => kinesisFirehoseKinesisEvent.json} | 1 + tests/functional/test_data_classes.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) rename tests/events/{kinesisFirehoseEvent.json => kinesisFirehoseKinesisEvent.json} (92%) diff --git a/tests/events/kinesisFirehoseEvent.json b/tests/events/kinesisFirehoseKinesisEvent.json similarity index 92% rename from tests/events/kinesisFirehoseEvent.json rename to tests/events/kinesisFirehoseKinesisEvent.json index 840f2bf15d5..222da55b1c3 100644 --- a/tests/events/kinesisFirehoseEvent.json +++ b/tests/events/kinesisFirehoseKinesisEvent.json @@ -1,5 +1,6 @@ { "invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a", + "sourceKinesisStreamArn":"arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source", "deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name", "region": "us-east-2", "records": [ diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py index 2b55aaaaafa..b05d2f8e626 100644 --- a/tests/functional/test_data_classes.py +++ b/tests/functional/test_data_classes.py @@ -1203,7 +1203,7 @@ def test_kafka_self_managed_event(): def test_kinesis_firehose_event(): - event = KinesisFirehoseEvent(load_event("kinesisFirehoseEvent.json")) + event = KinesisFirehoseEvent(load_event("kinesisFirehoseKinesisEvent.json")) assert event.region == "us-east-2" assert event.invocation_id == "2b4d1ad9-2f48-94bd-a088-767c317e994a" From 085873a5cb2e4c0d47ed39a184757f07ae9df2fd Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 14 Oct 2022 12:12:46 -0700 Subject: [PATCH 12/16] rm tests --- tests/events/kinesisFirehoseKinesisEvent.json | 32 ------------------- tests/functional/test_data_classes.py | 26 ++++++++++++++- 2 files changed, 25 insertions(+), 33 deletions(-) delete mode 100644 tests/events/kinesisFirehoseKinesisEvent.json diff --git a/tests/events/kinesisFirehoseKinesisEvent.json b/tests/events/kinesisFirehoseKinesisEvent.json deleted file mode 100644 index 222da55b1c3..00000000000 --- a/tests/events/kinesisFirehoseKinesisEvent.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a", - "sourceKinesisStreamArn":"arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source", - "deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name", - "region": "us-east-2", - "records": [ - { - "data": "SGVsbG8gV29ybGQ=", - "recordId": "record1", - "approximateArrivalTimestamp": 1510772160000, - "kinesisRecordMetadata": { - "shardId": "shardId-000000000000", - "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a", - "approximateArrivalTimestamp": 1510772160000, - "sequenceNumber": "49546986683135544286507457936321625675700192471156785154", - "subsequenceNumber": "" - } - }, - { - "data": "eyJIZWxsbyI6ICJXb3JsZCJ9", - "recordId": "record2", - "approximateArrivalTimestamp": 151077216000, - "kinesisRecordMetadata": { - "shardId": "shardId-000000000001", - "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a", - "approximateArrivalTimestamp": 1510772160000, - "sequenceNumber": "49546986683135544286507457936321625675700192471156785155", - "subsequenceNumber": "" - } - } - ] -} diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py index b05d2f8e626..230b9c4cc85 100644 --- a/tests/functional/test_data_classes.py +++ b/tests/functional/test_data_classes.py @@ -1202,7 +1202,7 @@ def test_kafka_self_managed_event(): assert record.get_header_value("HeaderKey", case_sensitive=False) == b"headerValue" -def test_kinesis_firehose_event(): +def test_kinesis_firehose_kinesis_event(): event = KinesisFirehoseEvent(load_event("kinesisFirehoseKinesisEvent.json")) assert event.region == "us-east-2" @@ -1226,6 +1226,30 @@ def test_kinesis_firehose_event(): assert record_02.data_as_json == {"Hello": "World"} +def test_kinesis_firehose_direct_put_event(): + event = KinesisFirehoseEvent(load_event("kinesisFirehoseDirectPutEvent.json")) + + assert event.region == "us-east-2" + assert event.invocation_id == "2b4d1ad9-2f48-94bd-a088-767c317e994a" + assert event.delivery_stream_arn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name" + assert event.source_kinesis_stream_arn is None + + records = list(event.records) + assert len(records) == 2 + record_01, record_02 = records[:] + + assert record_01.approximate_arrival_timestamp == 1510772160000 + assert record_01.record_id == "record1" + assert record_01.data == "SGVsbG8gV29ybGQ=" + assert record_01.data_as_bytes == b"Hello World" + assert record_01.data_as_text == "Hello World" + + assert record_02.data == "eyJIZWxsbyI6ICJXb3JsZCJ9" + assert record_02.data_as_bytes == b'{"Hello": "World"}' + assert record_02.data_as_text == '{"Hello": "World"}' + assert record_02.data_as_json == {"Hello": "World"} + + def test_kinesis_stream_event(): event = KinesisStreamEvent(load_event("kinesisStreamEvent.json")) From e10f62a9be5829084490f853909d34c9b5119823 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 14 Oct 2022 12:17:26 -0700 Subject: [PATCH 13/16] formatting nit --- tests/events/kinesisFirehosePutEvent.json | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/events/kinesisFirehosePutEvent.json b/tests/events/kinesisFirehosePutEvent.json index 27aeddd80eb..f3e07190710 100644 --- a/tests/events/kinesisFirehosePutEvent.json +++ b/tests/events/kinesisFirehosePutEvent.json @@ -2,16 +2,16 @@ "invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a", "deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name", "region": "us-east-2", - "records":[ + "records": [ { - "recordId":"record1", - "approximateArrivalTimestamp":1664029185290, - "data":"SGVsbG8gV29ybGQ=" + "recordId": "record1", + "approximateArrivalTimestamp": 1664029185290, + "data": "SGVsbG8gV29ybGQ=" }, { - "recordId":"record2", - "approximateArrivalTimestamp":1664029186945, - "data":"eyJIZWxsbyI6ICJXb3JsZCJ9" + "recordId": "record2", + "approximateArrivalTimestamp": 1664029186945, + "data": "eyJIZWxsbyI6ICJXb3JsZCJ9" } ] } From b95cd979f117f2fa12fe47284a5833dc519dae6d Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 14 Oct 2022 12:27:57 -0700 Subject: [PATCH 14/16] firehose kinesis + put test updates --- tests/functional/test_data_classes.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py index 70ca8fa2b30..8d396c9bef0 100644 --- a/tests/functional/test_data_classes.py +++ b/tests/functional/test_data_classes.py @@ -1246,26 +1246,40 @@ def test_kinesis_firehose_kinesis_event(): assert event.region == "us-east-2" assert event.invocation_id == "2b4d1ad9-2f48-94bd-a088-767c317e994a" assert event.delivery_stream_arn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name" - assert event.source_kinesis_stream_arn is None + assert event.source_kinesis_stream_arn == "arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source" records = list(event.records) assert len(records) == 2 record_01, record_02 = records[:] - assert record_01.approximate_arrival_timestamp == 1510772160000 + assert record_01.approximate_arrival_timestamp == 1664028820148 assert record_01.record_id == "record1" assert record_01.data == "SGVsbG8gV29ybGQ=" assert record_01.data_as_bytes == b"Hello World" assert record_01.data_as_text == "Hello World" + assert record_01.metadata.shard_id == "shardId-000000000000" + assert record_01.metadata.partition_key == "4d1ad2b9-24f8-4b9d-a088-76e9947c317a" + assert record_01.metadata.approximate_arrival_timestamp == 1664028820148 + assert record_01.metadata.sequence_number == "49546986683135544286507457936321625675700192471156785154" + assert record_01.metadata.subsequence_number == "" + + assert record_02.approximate_arrival_timestamp == 1664028793294 + assert record_02.record_id == "record2" assert record_02.data == "eyJIZWxsbyI6ICJXb3JsZCJ9" assert record_02.data_as_bytes == b'{"Hello": "World"}' assert record_02.data_as_text == '{"Hello": "World"}' assert record_02.data_as_json == {"Hello": "World"} + assert record_02.metadata.shard_id == "shardId-000000000001" + assert record_02.metadata.partition_key == "4d1ad2b9-24f8-4b9d-a088-76e9947c318a" + assert record_02.metadata.approximate_arrival_timestamp == 1664028793294 + assert record_02.metadata.sequence_number == "49546986683135544286507457936321625675700192471156785155" + assert record_02.metadata.subsequence_number == "" + -def test_kinesis_firehose_direct_put_event(): - event = KinesisFirehoseEvent(load_event("kinesisFirehoseDirectPutEvent.json")) +def test_kinesis_firehose_put_event(): + event = KinesisFirehoseEvent(load_event("kinesisFirehosePutEvent.json")) assert event.region == "us-east-2" assert event.invocation_id == "2b4d1ad9-2f48-94bd-a088-767c317e994a" @@ -1281,11 +1295,13 @@ def test_kinesis_firehose_direct_put_event(): assert record_01.data == "SGVsbG8gV29ybGQ=" assert record_01.data_as_bytes == b"Hello World" assert record_01.data_as_text == "Hello World" + assert record_01.metadata is None assert record_02.data == "eyJIZWxsbyI6ICJXb3JsZCJ9" assert record_02.data_as_bytes == b'{"Hello": "World"}' assert record_02.data_as_text == '{"Hello": "World"}' assert record_02.data_as_json == {"Hello": "World"} + assert record_02.metadata is None def test_kinesis_stream_event(): From a3d14109fb05f532a2ae47a9e0eefbacc2263112 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 14 Oct 2022 18:26:25 -0700 Subject: [PATCH 15/16] test fixes --- tests/functional/test_data_classes.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py index 8d396c9bef0..bcf0c65a9c4 100644 --- a/tests/functional/test_data_classes.py +++ b/tests/functional/test_data_classes.py @@ -1290,13 +1290,15 @@ def test_kinesis_firehose_put_event(): assert len(records) == 2 record_01, record_02 = records[:] - assert record_01.approximate_arrival_timestamp == 1510772160000 + assert record_01.approximate_arrival_timestamp == 1664029185290 assert record_01.record_id == "record1" assert record_01.data == "SGVsbG8gV29ybGQ=" assert record_01.data_as_bytes == b"Hello World" assert record_01.data_as_text == "Hello World" assert record_01.metadata is None + assert record_02.approximate_arrival_timestamp == 1664029186945 + assert record_01.record_id == "record2" assert record_02.data == "eyJIZWxsbyI6ICJXb3JsZCJ9" assert record_02.data_as_bytes == b'{"Hello": "World"}' assert record_02.data_as_text == '{"Hello": "World"}' From 2ab66fccccd5063276d4596e6018af8166b0609e Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Sat, 15 Oct 2022 15:03:28 +0100 Subject: [PATCH 16/16] feat(dataclass): fix test --- tests/functional/test_data_classes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py index bcf0c65a9c4..235a3f8f8da 100644 --- a/tests/functional/test_data_classes.py +++ b/tests/functional/test_data_classes.py @@ -1298,7 +1298,7 @@ def test_kinesis_firehose_put_event(): assert record_01.metadata is None assert record_02.approximate_arrival_timestamp == 1664029186945 - assert record_01.record_id == "record2" + assert record_02.record_id == "record2" assert record_02.data == "eyJIZWxsbyI6ICJXb3JsZCJ9" assert record_02.data_as_bytes == b'{"Hello": "World"}' assert record_02.data_as_text == '{"Hello": "World"}'