diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis.py b/aws_lambda_powertools/utilities/parser/models/kinesis.py index 4b1a93fd226..1b47581b91b 100644 --- a/aws_lambda_powertools/utilities/parser/models/kinesis.py +++ b/aws_lambda_powertools/utilities/parser/models/kinesis.py @@ -2,7 +2,7 @@ import zlib from typing import Dict, List, Literal, Type, Union -from pydantic import BaseModel, field_validator +from pydantic import BaseModel, Field, field_validator from aws_lambda_powertools.shared.functions import base64_decode from aws_lambda_powertools.utilities.parser.models.cloudwatch import ( @@ -11,11 +11,28 @@ class KinesisDataStreamRecordPayload(BaseModel): - kinesisSchemaVersion: str - partitionKey: str - sequenceNumber: str - data: Union[bytes, Type[BaseModel], BaseModel] # base64 encoded str is parsed into bytes - approximateArrivalTimestamp: float + kinesisSchemaVersion: str = Field( + description="The version of the Kinesis Data Streams record format.", + examples=["1.0"], + ) + partitionKey: str = Field( + description="The partition key that was used to place the record in the stream.", + examples=["user123", "device-001", "order-12345"], + ) + sequenceNumber: str = Field( + description="The unique sequence number for the record within the shard.", + examples=[ + "49590338271490256608559692538361571095921575989136588898", + "49545115243490985018280067714973144582180062593244200961", + ], + ) + data: Union[bytes, Type[BaseModel], BaseModel] = Field( # base64 encoded str is parsed into bytes + description="The data payload of the record. Base64 encoded string is automatically decoded to bytes.", + ) + approximateArrivalTimestamp: float = Field( + description="The approximate time that the record was inserted into the stream (Unix timestamp).", + examples=[1428537600.0, 1609459200.5], + ) @field_validator("data", mode="before") def data_base64_decode(cls, value): @@ -23,14 +40,32 @@ def data_base64_decode(cls, value): class KinesisDataStreamRecord(BaseModel): - eventSource: Literal["aws:kinesis"] - eventVersion: str - eventID: str - eventName: Literal["aws:kinesis:record"] - invokeIdentityArn: str - awsRegion: str - eventSourceARN: str - kinesis: KinesisDataStreamRecordPayload + eventSource: Literal["aws:kinesis"] = Field( + description="The AWS service that generated the event.", + examples=["aws:kinesis"], + ) + eventVersion: str = Field(description="The version of the event schema.", examples=["1.0"]) + eventID: str = Field( + description="A unique identifier for the event.", + examples=["shardId-000000000006:49590338271490256608559692538361571095921575989136588898"], + ) + eventName: Literal["aws:kinesis:record"] = Field( + description="The name of the event type.", + examples=["aws:kinesis:record"], + ) + invokeIdentityArn: str = Field( + description="The ARN of the IAM role used to invoke the Lambda function.", + examples=["arn:aws:iam::123456789012:role/lambda-kinesis-role"], + ) + awsRegion: str = Field( + description="The AWS region where the Kinesis stream is located.", + examples=["us-east-1", "us-west-2", "eu-west-1"], + ) + eventSourceARN: str = Field( + description="The ARN of the Kinesis stream that generated the event.", + examples=["arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"], + ) + kinesis: KinesisDataStreamRecordPayload = Field(description="The Kinesis-specific data for the record.") def decompress_zlib_record_data_as_json(self) -> Dict: """Decompress Kinesis Record bytes data zlib compressed to JSON""" @@ -41,7 +76,10 @@ def decompress_zlib_record_data_as_json(self) -> Dict: class KinesisDataStreamModel(BaseModel): - Records: List[KinesisDataStreamRecord] + Records: List[KinesisDataStreamRecord] = Field( + description="A list of Kinesis Data Stream records that triggered the Lambda function.", + examples=[[]], + ) def extract_cloudwatch_logs_from_event(event: KinesisDataStreamModel) -> List[CloudWatchLogsDecode]: diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py index 6c50b93b9a7..697a3fbdd89 100644 --- a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py +++ b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py @@ -1,23 +1,55 @@ from typing import List, Optional, Type, Union -from pydantic import BaseModel, PositiveInt, field_validator +from pydantic import BaseModel, Field, PositiveInt, field_validator from aws_lambda_powertools.shared.functions import base64_decode class KinesisFirehoseRecordMetadata(BaseModel): - shardId: str - partitionKey: str - approximateArrivalTimestamp: PositiveInt - sequenceNumber: str - subsequenceNumber: int + shardId: str = Field( + description="The shard ID of the Kinesis stream record.", + examples=["shardId-000000000000", "shardId-000000000001"], + ) + partitionKey: str = Field( + description="The partition key of the Kinesis stream record.", + examples=["user123", "device-001", "transaction-456"], + ) + approximateArrivalTimestamp: PositiveInt = Field( + description="The approximate time when the record arrived in the Kinesis stream \ + (Unix timestamp in milliseconds).", + examples=[1428537600000, 1609459200500], + ) + sequenceNumber: str = Field( + description="The sequence number of the Kinesis stream record.", + examples=["49590338271490256608559692538361571095921575989136588898"], + ) + subsequenceNumber: int = Field( + description="The subsequence number for records that share the same sequence number.", + examples=[0, 1, 2], + ) class KinesisFirehoseRecord(BaseModel): - data: Union[bytes, Type[BaseModel]] # base64 encoded str is parsed into bytes - recordId: str - approximateArrivalTimestamp: PositiveInt - kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] = None + data: Union[bytes, Type[BaseModel]] = Field( # base64 encoded str is parsed into bytes + description="The data payload of the record. Base64 encoded string is automatically decoded to bytes.", + ) + recordId: str = Field( + description="A unique identifier for the record within the batch.", + examples=[ + "49546986683135544286507457936321625675700192471156785154", + "49546986683135544286507457936321625675700192471156785155", + ], + ) + approximateArrivalTimestamp: PositiveInt = Field( + description="The approximate time when the record arrived in Kinesis Data Firehose \ + (Unix timestamp in milliseconds).", + examples=[1428537600000, 1609459200500], + ) + kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] = Field( + None, + description="Metadata about the original Kinesis stream record \ + (only present when the delivery stream source is a Kinesis stream).", + ) @field_validator("data", mode="before") def data_base64_decode(cls, value): @@ -25,8 +57,25 @@ def data_base64_decode(cls, value): class KinesisFirehoseModel(BaseModel): - invocationId: str - deliveryStreamArn: str - region: str - sourceKinesisStreamArn: Optional[str] = None - records: List[KinesisFirehoseRecord] + invocationId: str = Field( + description="A unique identifier for the Lambda invocation.", + examples=["invocationIdExample", "12345678-1234-1234-1234-123456789012"], + ) + deliveryStreamArn: str = Field( + description="The ARN of the Kinesis Data Firehose delivery stream.", + examples=["arn:aws:firehose:us-east-1:123456789012:deliverystream/my-delivery-stream"], + ) + region: str = Field( + description="The AWS region where the delivery stream is located.", + examples=["us-east-1", "us-west-2", "eu-west-1"], + ) + sourceKinesisStreamArn: Optional[str] = Field( + None, + description="The ARN of the source Kinesis stream \ + (only present when the delivery stream source is a Kinesis stream).", + examples=["arn:aws:kinesis:us-east-1:123456789012:stream/my-source-stream"], + ) + records: List[KinesisFirehoseRecord] = Field( + description="A list of records to be processed by the Lambda function.", + examples=[[]], + ) diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py index 7117fc4a011..b9032a4c934 100644 --- a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py +++ b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py @@ -1,7 +1,7 @@ import json from typing import List, Optional -from pydantic import BaseModel, PositiveInt, field_validator +from pydantic import BaseModel, Field, PositiveInt, field_validator from aws_lambda_powertools.shared.functions import base64_decode from aws_lambda_powertools.utilities.parser.models import KinesisFirehoseRecordMetadata @@ -10,10 +10,21 @@ class KinesisFirehoseSqsRecord(BaseModel): - data: SqsRecordModel - recordId: str - approximateArrivalTimestamp: PositiveInt - kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] = None + data: SqsRecordModel = Field(description="The SQS record data that was delivered through Kinesis Data Firehose.") + recordId: str = Field( + description="A unique identifier for the record within the batch.", + examples=["49546986683135544286507457936321625675700192471156785154"], + ) + approximateArrivalTimestamp: PositiveInt = Field( + description="The approximate time when the record arrived in Kinesis Data Firehose \ + (Unix timestamp in milliseconds).", + examples=[1428537600000, 1609459200500], + ) + kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] = Field( + None, + description="Metadata about the original Kinesis stream record \ + (only present when the delivery stream source is a Kinesis stream).", + ) @field_validator("data", mode="before") def data_base64_decode(cls, value): @@ -22,8 +33,26 @@ def data_base64_decode(cls, value): class KinesisFirehoseSqsModel(BaseModel): - invocationId: str - deliveryStreamArn: str - region: str - sourceKinesisStreamArn: Optional[str] = None - records: List[KinesisFirehoseSqsRecord] + invocationId: str = Field( + description="A unique identifier for the Lambda invocation.", + examples=["invocationIdExample", "12345678-1234-1234-1234-123456789012"], + ) + deliveryStreamArn: str = Field( + description="The ARN of the Kinesis Data Firehose delivery stream.", + examples=["arn:aws:firehose:us-east-1:123456789012:deliverystream/my-sqs-delivery-stream"], + ) + region: str = Field( + description="The AWS region where the delivery stream is located.", + examples=["us-east-1", "us-west-2", "eu-west-1"], + ) + sourceKinesisStreamArn: Optional[str] = Field( + None, + description="The ARN of the source Kinesis stream \ + (only present when the delivery stream source is a Kinesis stream).", + examples=["arn:aws:kinesis:us-east-1:123456789012:stream/my-source-stream"], + ) + records: List[KinesisFirehoseSqsRecord] = Field( + description="A list of SQS records delivered through Kinesis Data Firehose \ + to be processed by the Lambda function.", + examples=[[]], + )