Skip to content

RFC: New batch processor for new native partial response (SQS, DynamoDB, Kinesis) #64

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
to-mc opened this issue Nov 30, 2021 · 25 comments
Labels
all_runtimes Changes that should be applied to all runtimes Python

Comments

@to-mc
Copy link

to-mc commented Nov 30, 2021

Key information

Summary

A new generic batch processing utility, which can process records from SQS, Kinesis Data Streams, and DynamoDB streams, and handle reporting batch failures.

Motivation

With the launch of support for partial batch responses for Lambda/SQS, the event source mapping can now natively handle partial failures in a batch - removing the need for calls to the delete api. This support already exists for Kinesis and DynamoDB streams.

Proposal

The new utility will be implemented in the namespace which already exists for the SQS Batch Processing utility - aws_lambda_powertools.utilities.batch. Rather than calling the SQS DeleteMessage api like the existing batch utility, this version will instead inject BatchItemFailures (or batchItemFailures for kinesis and ddb) into the Lambda response. We will expose at the least, a new decorator batch_processor, which will accept an event type depending on the integration (Kinesis, SQS or DDB Streams). This will look similar to the Event Handler design. There will be a boolean setting to handle partial failures, defaulting to True (users will still need to enable this in the event source mapping for it to work).

from aws_lambda_powertools.batch import KinesisStreamEvent, EventType


def record_handler(record):
    return do_something_with(record["body"])


@batch_processor(record_handler=record_handler, event_type=EventType.KinesisStreamEvent)
def lambda_handler(event, context):
    return {"statusCode": 200} 

Proposed class/method names for the new api:
aws_lambda_powertools.utilities.batch.base.BatchProcessor
aws_lambda_powertools.utilities.batch.batch_processor

Any configuration or corner cases you'd expect?
Users will need to enable this in the event source mapping when configuring the Lambda trigger - reporting partial failures will not work with only changes to the Lambda code. Investigation is needed to better understand the consequences here, and how expensive it would be to enable a check in the code to see if it is enabled or not. If we don't implement this, we need to call this out front and center in the documentation.

Rationale and alternatives

The main consideration here is where the new functionality fits into the powertools package. It could be a new top level utility eg. aws_lambda_powertools.batchv2 - but we prefer not to add version numbers as it is confusing for users.

We could make a straight replacement of the implementation behind the existing API, which was the initial idea. However, the native functionality requires a setting when the event source mapping is created. That means we'd be introducing a breaking change if we did this - the utility would stop working for users who had not made this configuration.

Unresolved questions

Need to decide if we should add a context manager like the existing implementation has. It is simple to implement this, but it burdens the user with ensuring they store the return value and use it in their lambda response. I feel like this is too likely to be misunderstood, but would like opinions. Example:

from aws_lambda_powertools.utilities.batch import BatchProcessor

def record_handler(record):
    return do_something_with(record["body"])

def lambda_handler(event, context):
    records = event["Records"]

    processor = BatchProcessor(event_type=EventType.KinesisStreamEvent)

    with processor(records, record_handler) as proc:
        result = proc.process()  # Users will have to store the response from processor
  
   return result. # Users will have to return the result they stored from the processor
@kimberlyamandalu
Copy link

I think the batch_processor decorator name might already be taken by the following: Create your own Partial Processor. If I'm reading this incorrectly, I apologize and pls correct me.

Does it make sense to potentially leverage on that capability to implement the "new" batch processors?

@heitorlessa
Copy link
Contributor

heitorlessa commented Dec 1, 2021 via email

@heitorlessa
Copy link
Contributor

What about @report_batch_failures(batch_type=, record_handler=)?

That is explicit enough and should help customers find it more easily if they've read the news, found somewhere and came looking for support.

@kimberlyamandalu
Copy link

I think that is a pretty good name for it since sqs/dynamodb/kinesis all refer to it as such.
The only thing I could think of that might be confusing is, it might make it look like it is just an error handling feature. Which I guess it kind of is, but more :)

I honestly cannot think of a better name, other than @partial_batch_processor maybe?

But I'm sure whatever becomes its name, it will be described clearly in the document and searches on google should return relevant results.

@gmcrocetti
Copy link

Hi folks, long time no see :).
I'm really thrilled seeing AWS finally provided such feature for SQS 🚀 .

Why not leverage current @batch_processor instead of creating a new decorator with "similar" behavior. Can't we achieve the same result by doing the following:

  1. Adding a inject_batchitem_failures as an argument to @batch_processor;
  2. Writing a new PartialProcessor extending BasePartialProcessor - not 100% sure we need it.
def batchitem_failure_handler(event_source: str, failed_records: List[Dict]) -> Dict:
    key = "batchItemFailures" if event_source in ("aws:kinesis", "aws:dynamodb") else "BatchItemFailures"
    failed_batch_items = [{"ItemIdentifier": failed_record["eventID"]} for failed_record in failed_records]
    return {key: failed_batch_items}


@lambda_handler_decorator
def batch_processor(
    handler: Callable, 
    event: Dict, 
    context: Dict, 
    record_handler: Callable, 
    processor: BasePartialProcessor, 
    inject_batchitem_failures: bool = False,  # non-breaking change
):
    records = event["Records"]
    # Do we really need `event_type` as an argument ?
    event_source = event["eventSource"]

    with processor(records, record_handler):
        processor.process()

    result = handler(event, context)
    
    if inject_batchitem_failures:
        failure_response = batchitem_failure_handler(event_source, processor.fail_messages)
        result.update(failure_response)
    
    return result

I'm in a rush so sorry if I missed a critical point. Looking forward to hear from you 👍

@heitorlessa
Copy link
Contributor

Yes, I mentioned that to @cakepietoast but he had a pretty good argument on the contrary -- can't remember what it was tho :/

@ran-isenberg
Copy link

ran-isenberg commented Dec 5, 2021

@heitorlessa other than automatic handling of error, this can already be achieved with the current Parser utility.
why not extend the parser instead of creating a new utility?

We process SQS/dynamo streams. batches with the parser and it works great.

@heitorlessa
Copy link
Contributor

heitorlessa commented Dec 5, 2021 via email

@heitorlessa
Copy link
Contributor

We've made a decision - we'll create a new Processor that handles all 3 (SQS, DynamoDB Stream, Kinesis Data Streams) and have no additional arguments for the batch_processor to make it easier to get it right + deprecating things in v2 easier.

Gonna start working on the implementation now, and we should have it ready for the next release early next week

@ran-isenberg
Copy link

@heitorlessa Do you think there should be a Parser equivalent? I think it can easily to integrate the processing or failed processing that you write here and add it as an extra parameter to the parser.

@heitorlessa
Copy link
Contributor

Could you give an example @ran-isenberg ? I can see how we could have an extra parameter to receive a Pydantic model OR an event source data class (SQS), so the record handler receives a strongly typed record.


Everyone, this is how the initial UX is looking like now - We'll work on Kinesis/DynamoDB next, but the experience will be a simple toggle e.g., EventType.DynamoDB, everything else remains the same.

As a decorator

from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, EventType

processor = BatchProcessor(event_type=EventType.SQS)


@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context):
    return processor.report() # this is necessary to ensure Lambda returns the correct response

As a context manager, in case you need full access to the batch processed, or handle exceptions yourself

from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, EventType

processor = BatchProcessor(event_type=EventType.SQS)


def lambda_handler(event, context):
    with processor(records, record_handler) as batch:
        processed_messages = batch.process()
    
    return batch.report() # this is necessary to ensure Lambda returns the correct response

@michaelbrewer
Copy link
Contributor

@heitorlessa fitting that into the lightweight event source classes would be neat

@gmcrocetti
Copy link

We've made a decision - we'll create a new Processor that handles all 3 (SQS, DynamoDB Stream, Kinesis Data Streams) and have no additional arguments for the batch_processor to make it easier to get it right + deprecating things in v2 easier.

Gonna start working on the implementation now, and we should have it ready for the next release early next week

Really liked the decision you guys made. Writing a new processor that injects failed messages is the best of both worlds. 🚀
Just to give another 👍, adding up event source classes to this new processor will be super neat. Can't wait to see the RFC ! :) .

Just a small nit. I can't see to find BatchItemFailures - with capital B. Looks like someone corrected the typo on the documentation ? 🤔

@ran-isenberg
Copy link

@heitorlessa I think it makes a lot of sense to integrate this utility with any of the 2 input validation tools- parser/event source.
Maybe having 2 decorators -one in the extras package for the parser and one in the regular.
In the end, I think both decorators would be able to call a shared code part once they finish handling the event with either the parser/event source utilities.

I wrote something like that with the parser for SQS and dynamodb streams which iterates messages, catches processing exceptions, adds failed sqs messages to a list , continues the loop and in the end sends failed messages to a DLQ. So I know it can be done :)

@heitorlessa
Copy link
Contributor

We've made a decision - we'll create a new Processor that handles all 3 (SQS, DynamoDB Stream, Kinesis Data Streams) and have no additional arguments for the batch_processor to make it easier to get it right + deprecating things in v2 easier.
Gonna start working on the implementation now, and we should have it ready for the next release early next week

Really liked the decision you guys made. Writing a new processor that injects failed messages is the best of both worlds. 🚀 Just to give another 👍, adding up event source classes to this new processor will be super neat. Can't wait to see the RFC ! :) .

Just a small nit. I can't see to find BatchItemFailures - with capital B. Looks like someone corrected the typo on the documentation ? 🤔

They've corrected in the docs. The docs that went live as part of the launch had a capital B, they have been corrected after this RFC was created ;)

@heitorlessa
Copy link
Contributor

@heitorlessa I think it makes a lot of sense to integrate this utility with any of the 2 input validation tools- parser/event source. Maybe having 2 decorators -one in the extras package for the parser and one in the regular. In the end, I think both decorators would be able to call a shared code part once they finish handling the event with either the parser/event source utilities.

I wrote something like that with the parser for SQS and dynamodb streams which iterates messages, catches processing exceptions, adds failed sqs messages to a list , continues the loop and in the end sends failed messages to a DLQ. So I know it can be done :)

I'll think this through and get @cakepietoast @am29d thoughts too - Atm, I'd prefer to keep them separate (parser to parse & validate, batch to handle batch processing). Integration, however, I'd like Pydantic customers using Parser to be able to pass a subclass of SQS Record, DynamoDB Record, and Kinesis Record, so they can bring their own validations, methods, while having a single place to learn and tweak about batch processing.

@heitorlessa
Copy link
Contributor

@gmcrocetti @michaelbrewer integrated event source data classes as the default, and we will optionally support Parser Models for a richer validation experience.

This is how it looks like for data classes now:

from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, EventType
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord

processor = BatchProcessor(event_type=EventType.SQS) # or, EventType.KinesisDataStreams, EventType.DynamoDB

def record_handler(record: SQSRecord): # or DynamoDBRecord, KinesisStreamRecord
    return "success"

@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context):
    return processor.response() # this is necessary to ensure Lambda returns the correct response

@heitorlessa heitorlessa self-assigned this Dec 13, 2021
@heitorlessa
Copy link
Contributor

After some Mypy wrestling, this will also include support for Pydantic/Parser - This is how the UX will look like:

from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, EventType
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel
from aws_lambda_powertools.utilities.parser import BaseModel


class Order(BaseModel):
    item: dict


class OrderSqs(SqsRecordModel):
    body: Order

    # auto transform json string
    # so Pydantic can auto-initialize nested Order model
    @validator('body', pre=True)
    def transform_body_to_dict(cls, value):
            return json.loads(value)


processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqs)


def pydantic_record_handler(record: OrderSqs):
    print(record.body.item)
    return "success"


@batch_processor(record_handler=pydantic_record_handler, processor=processor)
# @batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context):
    return processor.response()  # this is necessary to ensure Lambda returns the correct response

cc @ran-isenberg

@ran-isenberg
Copy link

After some Mypy wrestling, this will also include support for Pydantic/Parser - This is how the UX will look like:

from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, EventType
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel
from aws_lambda_powertools.utilities.parser import BaseModel


class Order(BaseModel):
    item = dict


class OrderSqs(SqsRecordModel):
    body = Order

    # auto transform json string
    # so Pydantic can auto-initialize nested Order model
    @validator('body', pre=True)
    def transform_body_to_dict(cls, value):
            return json.loads(value)


processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqs)


def pydantic_record_handler(record: OrderSqs):
    print(record.body.item)
    return "success"


@batch_processor(record_handler=pydantic_record_handler, processor=processor)
# @batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context):
    return processor.response()  # this is necessary to ensure Lambda returns the correct response

cc @ran-isenberg

Amazing, love it!

@heitorlessa
Copy link
Contributor

Just deployed a stack with staged code and I can happily all of this works, including the undocumented SQS Report Partial Failure.

We'll get to the docs now, and once complete we are ready to release!

SQS

image

Kinesis

image

DynamoDB

image


Template used

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: partial batch response sample

Globals:
    Api:
        EndpointConfiguration: REGIONAL
        TracingEnabled: true
    Function:
        Timeout: 5
        MemorySize: 256
        Runtime: python3.8
        Tracing: Active
        Environment:
            Variables:
                # Powertools env vars: https://awslabs.github.io/aws-lambda-powertools-python/#environment-variables
                LOG_LEVEL: INFO
                POWERTOOLS_LOGGER_SAMPLE_RATE: 0.1
                POWERTOOLS_LOGGER_LOG_EVENT: true
                POWERTOOLS_SERVICE_NAME: hello

Resources:
    HelloWorldFunction:
        Type: AWS::Serverless::Function
        Properties:
            Handler: app.lambda_handler
            CodeUri: hello_world
            Policies:
                - SQSPollerPolicy:
                    QueueName: !GetAtt SampleQueue.QueueName
                - KinesisStreamReadPolicy:
                    StreamName: !Ref SampleStream
                # Lambda Destinations require additional permissions
                # to send failure records from Kinesis/DynamoDB
                - Version: "2012-10-17"
                  Statement:
                    Effect: "Allow"
                    Action:
                        - sqs:GetQueueAttributes
                        - sqs:GetQueueUrl
                        - sqs:SendMessage
                    Resource: !GetAtt SampleDLQ.Arn
            Events:
                Batch:
                    Type: SQS
                    Properties:
                        Queue: !GetAtt SampleQueue.Arn
                        BatchSize: 1
                        FunctionResponseTypes:
                            - ReportBatchItemFailures
                KinesisStream:
                    Type: Kinesis
                    Properties:
                        Stream: !GetAtt SampleStream.Arn
                        BatchSize: 100
                        StartingPosition: LATEST
                        MaximumRetryAttempts: 2
                        DestinationConfig:
                            OnFailure:
                                Destination: !GetAtt SampleDLQ.Arn
                        FunctionResponseTypes:
                            - ReportBatchItemFailures
                Stream:
                    Type: DynamoDB
                    Properties:
                        Stream: !GetAtt SampleTable.StreamArn
                        StartingPosition: LATEST
                        MaximumRetryAttempts: 2
                        DestinationConfig:
                            OnFailure:
                                Destination: !GetAtt SampleDLQ.Arn
                        FunctionResponseTypes:
                            - ReportBatchItemFailures

    SampleDLQ:
        Type: AWS::SQS::Queue

    SampleQueue:
        Type: AWS::SQS::Queue
        Properties:
            VisibilityTimeout: 30 # Fn timeout * 6
            RedrivePolicy:
                maxReceiveCount: 2
                deadLetterTargetArn: !GetAtt SampleDLQ.Arn

    SampleStream:
        Type: AWS::Kinesis::Stream
        Properties:
            ShardCount: 1

    SampleTable:
        Type: AWS::DynamoDB::Table
        Properties:
            BillingMode: PAY_PER_REQUEST
            AttributeDefinitions:
                - AttributeName: pk
                  AttributeType: S
                - AttributeName: sk
                  AttributeType: S
            KeySchema:
                - AttributeName: pk
                  KeyType: HASH
                - AttributeName: sk
                  KeyType: RANGE
            SSESpecification:
                SSEEnabled: yes
            StreamSpecification:
                StreamViewType: NEW_IMAGE

Outputs:
    HelloWorldFunction:
        Value: !GetAtt HelloWorldFunction.Arn

    Queue:
        Value: !Ref SampleQueue

    Stream:
        Value: !Ref SampleStream

    Table:
        Value: !Ref SampleTable

@heitorlessa
Copy link
Contributor

Docs finally done - @michaelbrewer @ran-isenberg could you have a quick pass before we release it tomorrow? All I need to do now is merge and start writing the release notes.

PR: aws-powertools/powertools-lambda-python#886

Here's what I've changed:

  • Updated the docs to reflect the new BatchProcessor in favour of sqs_batch_processor and PartialSQSProcessor
  • Added a Migration guide for those using sqs_batch_processor and PartialSQSProcessor
  • Added Caveats section in case people use Tracer response auto-capture feature with this utility in large batch sizes
  • Added sample event and sample response for each
  • Added SAM infrastructure for SQS, Kinesis and DynamoDB along with DLQ and IAM Permissions
  • Added an Extensibility section
  • Added Pydantic section
  • Updated Create your own processor
  • Updated Key features
  • Updated Intro
  • Recreated Background
  • Repurposed a few previous sections like Sentry integration under Legacy

@heitorlessa
Copy link
Contributor

Merged - it'll be out tomorrow with our Christmas release

@heitorlessa heitorlessa transferred this issue from aws-powertools/powertools-lambda-python Dec 20, 2021
@heitorlessa heitorlessa changed the title RFC: New batch processing utility RFC: New batch processor for new native partial response (SQS, DynamoDB, Kinesis) Dec 20, 2021
@heitorlessa heitorlessa added all_runtimes Changes that should be applied to all runtimes Python labels Dec 20, 2021
@heitorlessa
Copy link
Contributor

cc @pankajagrawal16 @saragerion for the now native partial response support

@heitorlessa
Copy link
Contributor

Documentation is now fully updated with all mechanics we've learned along the implementation (official docs were sparse), incoming event/response, and unit tests:

https://awslabs.github.io/aws-lambda-powertools-python/latest/utilities/batch/

@pankajagrawal16
Copy link

Just an update for java runtime , @machafer will start to look the implementations and the UX. I will create a feature request on java repo as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
all_runtimes Changes that should be applied to all runtimes Python
Projects
None yet
Development

No branches or pull requests

7 participants