diff --git a/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py b/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py index 8da2c983f88..d8efbccbb61 100644 --- a/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py @@ -140,12 +140,25 @@ def user_identity(self) -> dict: return self.get("userIdentity") or {} +class DynamoDBStreamWindow(DictWrapper): + @property + def start(self) -> str: + """The time window started""" + return self["start"] + + @property + def end(self) -> str: + """The time window will end""" + return self["end"] + + class DynamoDBStreamEvent(DictWrapper): """Dynamo DB Stream Event Documentation: ------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html + - https://docs.aws.amazon.com/lambda/latest/dg/services-ddb-windows.html Example ------- @@ -167,3 +180,30 @@ def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): def records(self) -> Iterator[DynamoDBRecord]: for record in self["Records"]: yield DynamoDBRecord(record) + + @property + def window(self) -> DynamoDBStreamWindow | None: + window = self.get("window") + if window: + return DynamoDBStreamWindow(window) + return window + + @property + def state(self) -> dict[str, Any]: + return self.get("state") or {} + + @property + def shard_id(self) -> str | None: + return self.get("shardId") + + @property + def event_source_arn(self) -> str | None: + return self.get("eventSourceARN") + + @property + def is_final_invoke_for_window(self) -> bool | None: + return self.get("isFinalInvokeForWindow") + + @property + def is_window_terminated_early(self) -> bool | None: + return self.get("isWindowTerminatedEarly") diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index 6b189f937fd..64fd26f3a30 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -3,7 +3,7 @@ import base64 import json import zlib -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import ( CloudWatchLogsDecodedData, @@ -100,12 +100,25 @@ def kinesis(self) -> KinesisStreamRecordPayload: return KinesisStreamRecordPayload(self["kinesis"]) +class KinesisStreamWindow(DictWrapper): + @property + def start(self) -> str: + """The time window started""" + return self["start"] + + @property + def end(self) -> str: + """The time window will end""" + return self["end"] + + class KinesisStreamEvent(DictWrapper): """Kinesis stream event Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html + - https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-windows.html """ @property @@ -113,6 +126,33 @@ def records(self) -> Iterator[KinesisStreamRecord]: for record in self["Records"]: yield KinesisStreamRecord(record) + @property + def window(self) -> KinesisStreamWindow | None: + window = self.get("window") + if window: + return KinesisStreamWindow(window) + return window + + @property + def state(self) -> dict[str, Any]: + return self.get("state") or {} + + @property + def shard_id(self) -> str | None: + return self.get("shardId") + + @property + def event_source_arn(self) -> str | None: + return self.get("eventSourceARN") + + @property + def is_final_invoke_for_window(self) -> bool | None: + return self.get("isFinalInvokeForWindow") + + @property + def is_window_terminated_early(self) -> bool | None: + return self.get("isWindowTerminatedEarly") + def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> list[CloudWatchLogsDecodedData]: return [CloudWatchLogsDecodedData(record.kinesis.data_zlib_compressed_as_json()) for record in event.records] diff --git a/tests/events/dynamoStreamTumblingWindowEvent.json b/tests/events/dynamoStreamTumblingWindowEvent.json new file mode 100644 index 00000000000..035d08978e9 --- /dev/null +++ b/tests/events/dynamoStreamTumblingWindowEvent.json @@ -0,0 +1,101 @@ +{ + "Records": [ + { + "eventID": "1", + "eventName": "INSERT", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "111", + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "stream-ARN" + }, + { + "eventID": "2", + "eventName": "MODIFY", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "222", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "stream-ARN" + }, + { + "eventID": "3", + "eventName": "REMOVE", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "333", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "stream-ARN" + } + ], + "window": { + "start": "2020-07-30T17:00:00Z", + "end": "2020-07-30T17:05:00Z" + }, + "state": { + "1": "state1" + }, + "shardId": "shard123456789", + "eventSourceARN": "stream-ARN", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +} diff --git a/tests/events/kinesisStreamEvent.json b/tests/events/kinesisStreamEvent.json index ef8e2096388..cf3a3415ef0 100644 --- a/tests/events/kinesisStreamEvent.json +++ b/tests/events/kinesisStreamEvent.json @@ -32,5 +32,17 @@ "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } - ] + ], + "window": { + "start": "2020-12-09T07:04:00Z", + "end": "2020-12-09T07:06:00Z" + }, + "state": { + "1": 282, + "2": 715 + }, + "shardId": "shardId-000000000006", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false } diff --git a/tests/events/kinesisStreamTumblingWindowEvent.json b/tests/events/kinesisStreamTumblingWindowEvent.json new file mode 100644 index 00000000000..0209525835c --- /dev/null +++ b/tests/events/kinesisStreamTumblingWindowEvent.json @@ -0,0 +1,33 @@ + +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", + "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "approximateArrivalTimestamp": 1607497475.000 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", + "awsRegion": "us-east-1", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" + } + ], + "window": { + "start": "2020-12-09T07:04:00Z", + "end": "2020-12-09T07:06:00Z" + }, + "state": { + "1": 282, + "2": 715 + }, + "shardId": "shardId-000000000006", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +} diff --git a/tests/unit/data_classes/required_dependencies/test_dynamo_db_stream_event.py b/tests/unit/data_classes/required_dependencies/test_dynamo_db_stream_event.py index 8c6b62867ae..02fdab9582e 100644 --- a/tests/unit/data_classes/required_dependencies/test_dynamo_db_stream_event.py +++ b/tests/unit/data_classes/required_dependencies/test_dynamo_db_stream_event.py @@ -46,6 +46,42 @@ def test_dynamodb_stream_trigger_event(): assert dynamodb.stream_view_type == StreamViewType.NEW_AND_OLD_IMAGES +def test_dynamodb_stream_trigger_with_tumbling_window_event(): + raw_event = load_event("dynamoStreamTumblingWindowEvent.json") + parsed_event = DynamoDBStreamEvent(raw_event) + + records = list(parsed_event.records) + + record = records[0] + record_raw = raw_event["Records"][0] + assert record.aws_region == record_raw["awsRegion"] + assert record.event_id == record_raw["eventID"] + assert record.event_name is DynamoDBRecordEventName.INSERT + assert record.event_source == record_raw["eventSource"] + assert record.event_source_arn == record_raw["eventSourceARN"] + assert record.event_version == record_raw["eventVersion"] + assert record.user_identity == {} + dynamodb = record.dynamodb + assert dynamodb is not None + keys = dynamodb.keys + assert keys is not None + assert keys["Id"] == DECIMAL_CONTEXT.create_decimal(101) + assert dynamodb.new_image.get("Message") == record_raw["dynamodb"]["NewImage"]["Message"]["S"] + assert dynamodb.old_image == {} + assert dynamodb.sequence_number == record_raw["dynamodb"]["SequenceNumber"] + assert dynamodb.size_bytes == record_raw["dynamodb"]["SizeBytes"] + assert dynamodb.stream_view_type == StreamViewType.NEW_AND_OLD_IMAGES + + assert parsed_event.window.raw_event == raw_event["window"] + assert parsed_event.window.start == raw_event["window"]["start"] + assert parsed_event.window.end == raw_event["window"]["end"] + assert parsed_event.state == raw_event["state"] + assert parsed_event.shard_id == raw_event["shardId"] + assert parsed_event.event_source_arn == raw_event["eventSourceARN"] + assert parsed_event.is_final_invoke_for_window == raw_event["isFinalInvokeForWindow"] + assert parsed_event.is_window_terminated_early == raw_event["isWindowTerminatedEarly"] + + def test_dynamodb_stream_record_deserialization_large_int(): data = { "Keys": {"key1": {"attr1": "value1"}}, diff --git a/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py b/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py index 5410ed81974..2eab5fe90fe 100644 --- a/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py +++ b/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py @@ -41,6 +41,13 @@ def test_kinesis_stream_event(): assert kinesis.data_as_bytes() == b"Hello, this is a test." assert kinesis.data_as_text() == "Hello, this is a test." + assert parsed_event.window.raw_event == raw_event["window"] + assert parsed_event.state == raw_event["state"] + assert parsed_event.shard_id == raw_event["shardId"] + assert parsed_event.event_source_arn == raw_event["eventSourceARN"] + assert parsed_event.is_final_invoke_for_window == raw_event["isFinalInvokeForWindow"] + assert parsed_event.is_window_terminated_early == raw_event["isWindowTerminatedEarly"] + def test_kinesis_stream_event_json_data(): json_value = {"test": "value"} @@ -56,3 +63,43 @@ def test_kinesis_stream_event_cloudwatch_logs_data_extraction(): individual_logs = [extract_cloudwatch_logs_from_record(record) for record in event.records] assert len(extracted_logs) == len(individual_logs) + + +def test_kinesis_stream_with_tumbling_window_event(): + raw_event = load_event("kinesisStreamTumblingWindowEvent.json") + parsed_event = KinesisStreamEvent(raw_event) + + records = list(parsed_event.records) + assert len(records) == 1 + record = records[0] + + record_raw = raw_event["Records"][0] + + assert record.aws_region == record_raw["awsRegion"] + assert record.event_id == record_raw["eventID"] + assert record.event_name == record_raw["eventName"] + assert record.event_source == record_raw["eventSource"] + assert record.event_source_arn == record_raw["eventSourceARN"] + assert record.event_version == record_raw["eventVersion"] + assert record.invoke_identity_arn == record_raw["invokeIdentityArn"] + + kinesis = record.kinesis + kinesis_raw = raw_event["Records"][0]["kinesis"] + + assert kinesis.approximate_arrival_timestamp == kinesis_raw["approximateArrivalTimestamp"] + assert kinesis.data == kinesis_raw["data"] + assert kinesis.kinesis_schema_version == kinesis_raw["kinesisSchemaVersion"] + assert kinesis.partition_key == kinesis_raw["partitionKey"] + assert kinesis.sequence_number == kinesis_raw["sequenceNumber"] + + assert kinesis.data_as_bytes() == b"Hello, this is a test." + assert kinesis.data_as_text() == "Hello, this is a test." + + assert parsed_event.window.raw_event == raw_event["window"] + assert parsed_event.window.start == raw_event["window"]["start"] + assert parsed_event.window.end == raw_event["window"]["end"] + assert parsed_event.state == raw_event["state"] + assert parsed_event.shard_id == raw_event["shardId"] + assert parsed_event.event_source_arn == raw_event["eventSourceARN"] + assert parsed_event.is_final_invoke_for_window == raw_event["isFinalInvokeForWindow"] + assert parsed_event.is_window_terminated_early == raw_event["isWindowTerminatedEarly"]