Skip to content

Feature request: Improve support for SQS-wrapped S3 event notifications #2078

Closed
@theipster

Description

@theipster

Use case

Originally discussed in #1656 (comment).

S3 event notifications can be sometimes be ingested into Lambda via an intermediary such as an SQS queue (i.e. Lambda event source), for various architectural reasons - batching, retries, etc.. However, from the Lambda function's perspective, the intermediary might not be too important; what's important is the S3 event notification itself.

With the current Powertools built-ins, it is possible to parse the S3 event notification data structure out of the SQS message data structure, but this requires some awkward boilerplate code to chain the two data structures together (mostly due to the JSON-formatted wrapper in the middle):

from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel, S3Model
from pydantic import Json

class SqsS3EventNotificationModel(SqsRecordModel):
  body: Json[S3Model]

processor = BatchProcessor(event_type=EventType.SQS, model=SqsS3EventNotificationModel)

def record_handler(record: SqsS3EventNotificationModel):
  for s3_record in record.body.Records:
    do_something_useful(s3_record)

@batch_processor(record_handler=record_handler, processor=processor)
def handler(event, context):
  return processor.response()

Solution/User Experience

A simple quick-win to the user-experience would be to make a new data model available as a built-in:

from aws_lambda_powertools.utilities.parser.models import SqsRecordModel, S3Model
from pydantic import Json

class SqsS3EventNotificationModel(SqsRecordModel):
  body: Json[S3Model]

This would be compatible with the existing framework and could be used as just another model, e.g.:

from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor

processor = BatchProcessor(event_type=EventType.SQS, model=SqsS3EventNotificationModel)

def record_handler(record: SqsS3EventNotificationModel):
  for s3_record in record.body.Records:
    do_something_useful(s3_record)

@batch_processor(record_handler=record_handler, processor=processor)
def handler(event, context):
  return processor.response()

Note: see this comment about Mypy compatibility, although I believe it may no longer be a concern due to a recent typing improvement.

Alternative solutions

If there's a way to skip the intermediate handler function, that would be even nicer (but see trade-off below).

In other words, rather than needing to define a def record_handler(record: SqsS3EventNotificationModel) that iterates through the record.body.Records, it would be nice to just directly define a def record_handler(record: S3Model) and let the processor unwrap everything instead.

This would keep the business logic clean, and not having to care whether it came via SQS or EventBridge, etc..

However, the trade-off to this even simpler interface would be that any event metadata (from both the SQS event and the S3 event notification) would be unavailable.

Acknowledgment

Metadata

Metadata

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions