-
Notifications
You must be signed in to change notification settings - Fork 429
docs(batch_processing): snippets split, improved, and lint #2231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
b381b0c
chore: converted first examples
rubenfonseca b207711
chore: refactored kinesis examples
rubenfonseca bc60371
chore: refactored dynamodb samples
rubenfonseca 9951d39
chore: pydantic refactor
rubenfonseca d637f59
chore: advanced accessing lambda context refactor
rubenfonseca bda4cf9
chore: extending failure
rubenfonseca b271fb6
chore: custom partial processor
rubenfonseca 1d5ede4
chore: refactored testing
rubenfonseca 3bbb1f2
chore: sentry refactor
rubenfonseca 946d9f8
docs: small fixes
leandrodamascena 6d668d9
docs: small fixes
leandrodamascena 174cc3a
Apply suggestions from code review
rubenfonseca f099c94
fix: showcase lambda_context
rubenfonseca bd0cde8
fix: addressed the rest of the comments
rubenfonseca b92614a
docs: small changes to help others + sentry dev dependency
leandrodamascena 6ec8f64
docs: mypy
leandrodamascena cbb6064
fix: highlight
rubenfonseca File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
66 changes: 66 additions & 0 deletions
66
examples/batch_processing/sam/dynamodb_batch_processing.yaml
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
AWSTemplateFormatVersion: '2010-09-09' | ||
Transform: AWS::Serverless-2016-10-31 | ||
Description: partial batch response sample | ||
|
||
Globals: | ||
Function: | ||
Timeout: 5 | ||
MemorySize: 256 | ||
Runtime: python3.10 | ||
Tracing: Active | ||
Environment: | ||
Variables: | ||
LOG_LEVEL: INFO | ||
POWERTOOLS_SERVICE_NAME: hello | ||
|
||
Resources: | ||
HelloWorldFunction: | ||
Type: AWS::Serverless::Function | ||
Properties: | ||
Handler: app.lambda_handler | ||
CodeUri: hello_world | ||
Policies: | ||
# Lambda Destinations require additional permissions | ||
# to send failure records from Kinesis/DynamoDB | ||
- Version: "2012-10-17" | ||
Statement: | ||
Effect: "Allow" | ||
Action: | ||
- sqs:GetQueueAttributes | ||
- sqs:GetQueueUrl | ||
- sqs:SendMessage | ||
Resource: !GetAtt SampleDLQ.Arn | ||
Events: | ||
DynamoDBStream: | ||
Type: DynamoDB | ||
Properties: | ||
Stream: !GetAtt SampleTable.StreamArn | ||
StartingPosition: LATEST | ||
MaximumRetryAttempts: 2 | ||
DestinationConfig: | ||
OnFailure: | ||
Destination: !GetAtt SampleDLQ.Arn | ||
FunctionResponseTypes: | ||
- ReportBatchItemFailures | ||
|
||
SampleDLQ: | ||
Type: AWS::SQS::Queue | ||
|
||
SampleTable: | ||
Type: AWS::DynamoDB::Table | ||
Properties: | ||
BillingMode: PAY_PER_REQUEST | ||
AttributeDefinitions: | ||
- AttributeName: pk | ||
AttributeType: S | ||
- AttributeName: sk | ||
AttributeType: S | ||
KeySchema: | ||
- AttributeName: pk | ||
KeyType: HASH | ||
- AttributeName: sk | ||
KeyType: RANGE | ||
SSESpecification: | ||
SSEEnabled: true | ||
StreamSpecification: | ||
StreamViewType: NEW_AND_OLD_IMAGES |
53 changes: 53 additions & 0 deletions
53
examples/batch_processing/sam/kinesis_batch_processing.yaml
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
AWSTemplateFormatVersion: '2010-09-09' | ||
Transform: AWS::Serverless-2016-10-31 | ||
Description: partial batch response sample | ||
|
||
Globals: | ||
Function: | ||
Timeout: 5 | ||
MemorySize: 256 | ||
Runtime: python3.10 | ||
Tracing: Active | ||
Environment: | ||
Variables: | ||
LOG_LEVEL: INFO | ||
POWERTOOLS_SERVICE_NAME: hello | ||
|
||
Resources: | ||
HelloWorldFunction: | ||
Type: AWS::Serverless::Function | ||
Properties: | ||
Handler: app.lambda_handler | ||
CodeUri: hello_world | ||
Policies: | ||
# Lambda Destinations require additional permissions | ||
# to send failure records to DLQ from Kinesis/DynamoDB | ||
- Version: "2012-10-17" | ||
Statement: | ||
Effect: "Allow" | ||
Action: | ||
- sqs:GetQueueAttributes | ||
- sqs:GetQueueUrl | ||
- sqs:SendMessage | ||
Resource: !GetAtt SampleDLQ.Arn | ||
Events: | ||
KinesisStream: | ||
Type: Kinesis | ||
Properties: | ||
Stream: !GetAtt SampleStream.Arn | ||
BatchSize: 100 | ||
StartingPosition: LATEST | ||
MaximumRetryAttempts: 2 | ||
DestinationConfig: | ||
OnFailure: | ||
Destination: !GetAtt SampleDLQ.Arn | ||
FunctionResponseTypes: | ||
- ReportBatchItemFailures | ||
|
||
SampleDLQ: | ||
Type: AWS::SQS::Queue | ||
|
||
SampleStream: | ||
Type: AWS::Kinesis::Stream | ||
Properties: | ||
ShardCount: 1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
AWSTemplateFormatVersion: '2010-09-09' | ||
Transform: AWS::Serverless-2016-10-31 | ||
Description: partial batch response sample | ||
|
||
Globals: | ||
Function: | ||
Timeout: 5 | ||
MemorySize: 256 | ||
Runtime: python3.10 | ||
Tracing: Active | ||
Environment: | ||
Variables: | ||
LOG_LEVEL: INFO | ||
POWERTOOLS_SERVICE_NAME: hello | ||
|
||
Resources: | ||
HelloWorldFunction: | ||
Type: AWS::Serverless::Function | ||
Properties: | ||
Handler: app.lambda_handler | ||
CodeUri: hello_world | ||
Policies: | ||
- SQSPollerPolicy: | ||
QueueName: !GetAtt SampleQueue.QueueName | ||
Events: | ||
Batch: | ||
Type: SQS | ||
Properties: | ||
Queue: !GetAtt SampleQueue.Arn | ||
FunctionResponseTypes: | ||
- ReportBatchItemFailures | ||
|
||
SampleDLQ: | ||
Type: AWS::SQS::Queue | ||
|
||
SampleQueue: | ||
Type: AWS::SQS::Queue | ||
Properties: | ||
VisibilityTimeout: 30 # Fn timeout * 6 | ||
RedrivePolicy: | ||
maxReceiveCount: 2 | ||
deadLetterTargetArn: !GetAtt SampleDLQ.Arn |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
28 changes: 28 additions & 0 deletions
28
examples/batch_processing/src/advanced_accessing_lambda_context_decorator.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
from typing import Optional | ||
|
||
from aws_lambda_powertools import Logger, Tracer | ||
from aws_lambda_powertools.utilities.batch import ( | ||
BatchProcessor, | ||
EventType, | ||
batch_processor, | ||
) | ||
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
|
||
processor = BatchProcessor(event_type=EventType.SQS) | ||
tracer = Tracer() | ||
logger = Logger() | ||
|
||
|
||
@tracer.capture_method | ||
def record_handler(record: SQSRecord, lambda_context: Optional[LambdaContext] = None): | ||
if lambda_context is not None: | ||
remaining_time = lambda_context.get_remaining_time_in_millis() | ||
logger.info(remaining_time) | ||
|
||
|
||
@logger.inject_lambda_context | ||
@tracer.capture_lambda_handler | ||
@batch_processor(record_handler=record_handler, processor=processor) | ||
def lambda_handler(event, context: LambdaContext): | ||
return processor.response() |
27 changes: 27 additions & 0 deletions
27
examples/batch_processing/src/advanced_accessing_lambda_context_manager.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
from typing import Optional | ||
|
||
from aws_lambda_powertools import Logger, Tracer | ||
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType | ||
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
|
||
processor = BatchProcessor(event_type=EventType.SQS) | ||
tracer = Tracer() | ||
logger = Logger() | ||
|
||
|
||
@tracer.capture_method | ||
def record_handler(record: SQSRecord, lambda_context: Optional[LambdaContext] = None): | ||
if lambda_context is not None: | ||
remaining_time = lambda_context.get_remaining_time_in_millis() | ||
logger.info(remaining_time) | ||
|
||
|
||
@logger.inject_lambda_context | ||
@tracer.capture_lambda_handler | ||
def lambda_handler(event, context: LambdaContext): | ||
batch = event["Records"] | ||
with processor(records=batch, handler=record_handler, lambda_context=context): | ||
result = processor.process() | ||
|
||
return result |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from __future__ import annotations | ||
|
||
import json | ||
from typing import List, Tuple | ||
|
||
from typing_extensions import Literal | ||
|
||
from aws_lambda_powertools import Logger, Tracer | ||
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType | ||
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
|
||
processor = BatchProcessor(event_type=EventType.SQS) | ||
tracer = Tracer() | ||
logger = Logger() | ||
|
||
|
||
@tracer.capture_method | ||
def record_handler(record: SQSRecord): | ||
payload: str = record.body | ||
if payload: | ||
item: dict = json.loads(payload) | ||
logger.info(item) | ||
|
||
|
||
@logger.inject_lambda_context | ||
@tracer.capture_lambda_handler | ||
def lambda_handler(event, context: LambdaContext): | ||
batch = event["Records"] | ||
with processor(records=batch, handler=record_handler): | ||
processed_messages: List[Tuple] = processor.process() | ||
|
||
for message in processed_messages: | ||
status: Literal["success"] | Literal["fail"] = message[0] | ||
record: SQSRecord = message[2] | ||
|
||
logger.info(status, record=record) | ||
rubenfonseca marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
return processor.response() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
import os | ||
import sys | ||
from random import randint | ||
from typing import Any | ||
|
||
import boto3 | ||
|
||
from aws_lambda_powertools import Logger | ||
from aws_lambda_powertools.utilities.batch import ( | ||
BasePartialBatchProcessor, | ||
EventType, | ||
process_partial_response, | ||
) | ||
|
||
table_name = os.getenv("TABLE_NAME", "table_not_found") | ||
|
||
logger = Logger() | ||
|
||
|
||
class MyPartialProcessor(BasePartialBatchProcessor): | ||
""" | ||
Process a record and stores successful results at a Amazon DynamoDB Table | ||
|
||
Parameters | ||
---------- | ||
table_name: str | ||
DynamoDB table name to write results to | ||
""" | ||
|
||
def __init__(self, table_name: str): | ||
self.table_name = table_name | ||
|
||
super().__init__(event_type=EventType.SQS) | ||
|
||
def _prepare(self): | ||
# It's called once, *before* processing | ||
# Creates table resource and clean previous results | ||
self.ddb_table = boto3.resource("dynamodb").Table(self.table_name) | ||
self.success_messages.clear() | ||
|
||
def _clean(self): | ||
# It's called once, *after* closing processing all records (closing the context manager) | ||
# Here we're sending, at once, all successful messages to a ddb table | ||
with self.ddb_table.batch_writer() as batch: | ||
for result in self.success_messages: | ||
batch.put_item(Item=result) | ||
|
||
def _process_record(self, record): | ||
# It handles how your record is processed | ||
# Here we're keeping the status of each run | ||
# where self.handler is the record_handler function passed as an argument | ||
try: | ||
result = self.handler(record) # record_handler passed to decorator/context manager | ||
return self.success_handler(record, result) | ||
except Exception as exc: | ||
logger.error(exc) | ||
return self.failure_handler(record, sys.exc_info()) | ||
|
||
def success_handler(self, record, result: Any): | ||
entry = ("success", result, record) | ||
self.success_messages.append(record) | ||
return entry | ||
|
||
async def _async_process_record(self, record: dict): | ||
raise NotImplementedError() | ||
|
||
|
||
processor = MyPartialProcessor(table_name) | ||
|
||
|
||
def record_handler(record): | ||
return randint(0, 100) | ||
|
||
|
||
def lambda_handler(event, context): | ||
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
import json | ||
|
||
from aws_lambda_powertools import Logger, Tracer | ||
from aws_lambda_powertools.utilities.batch import ( | ||
BatchProcessor, | ||
EventType, | ||
process_partial_response, | ||
) | ||
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
|
||
processor = BatchProcessor(event_type=EventType.SQS) | ||
tracer = Tracer() | ||
logger = Logger() | ||
|
||
|
||
@tracer.capture_method(capture_response=False) | ||
def record_handler(record: SQSRecord): | ||
payload: str = record.body | ||
if payload: | ||
item: dict = json.loads(payload) | ||
logger.info(item) | ||
|
||
|
||
@logger.inject_lambda_context | ||
@tracer.capture_lambda_handler | ||
def lambda_handler(event, context: LambdaContext): | ||
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.