Skip to content

feat(batch): add support to SQS FIFO queues (SqsFifoPartialProcessor) #1934

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions aws_lambda_powertools/utilities/batch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
)
from aws_lambda_powertools.utilities.batch.exceptions import ExceptionInfo
from aws_lambda_powertools.utilities.batch.sqs_fifo_partial_processor import (
SQSFifoPartialProcessor,
SqsFifoPartialProcessor,
)
from aws_lambda_powertools.utilities.batch.types import BatchTypeModels

Expand All @@ -31,7 +31,7 @@
"EventType",
"FailureResponse",
"SuccessResponse",
"SQSFifoPartialProcessor",
"SqsFifoPartialProcessor",
"batch_processor",
"async_batch_processor",
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class SQSFifoCircuitBreakerError(Exception):
pass


class SQSFifoPartialProcessor(BatchProcessor):
class SqsFifoPartialProcessor(BatchProcessor):
"""Process native partial responses from SQS FIFO queues.

Stops processing records when the first record fails. The remaining records are reported as failed items.
Expand All @@ -26,12 +26,12 @@ class SQSFifoPartialProcessor(BatchProcessor):
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import SQSFifoPartialProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = SQSFifoPartialProcessor()
processor = SqsFifoPartialProcessor()
tracer = Tracer()
logger = Logger()

Expand Down
4 changes: 2 additions & 2 deletions aws_lambda_powertools/utilities/batch/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
]
BatchSqsTypeModel = Optional[Type[SqsRecordModel]]
else:
BatchTypeModels = Optional # type: ignore
BatchSqsTypeModel = Optional # type: ignore
BatchTypeModels = "BatchTypeModels" # type: ignore
BatchSqsTypeModel = "BatchSqsTypeModel" # type: ignore
4 changes: 2 additions & 2 deletions docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,8 @@ Processing batches from SQS works in four stages:

#### FIFO queues

If you're using this feature with a FIFO queue, you should use the `SQSFifoPartialProcessor` class instead. We will
stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`.
When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"},
We will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`.
This helps preserve the ordering of messages in your queue.

=== "As a decorator"
Expand Down
4 changes: 2 additions & 2 deletions examples/batch_processing/src/sqs_fifo_batch_processor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import (
SQSFifoPartialProcessor,
SqsFifoPartialProcessor,
batch_processor,
)
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = SQSFifoPartialProcessor()
processor = SqsFifoPartialProcessor()
tracer = Tracer()
logger = Logger()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import SQSFifoPartialProcessor
from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = SQSFifoPartialProcessor()
processor = SqsFifoPartialProcessor()
tracer = Tracer()
logger = Logger()

Expand Down
29 changes: 9 additions & 20 deletions tests/functional/test_utilities_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
AsyncBatchProcessor,
BatchProcessor,
EventType,
SQSFifoPartialProcessor,
SqsFifoPartialProcessor,
async_batch_processor,
batch_processor,
)
Expand Down Expand Up @@ -42,7 +42,7 @@
def sqs_event_factory() -> Callable:
def factory(body: str):
return {
"messageId": str(uuid.uuid4()),
"messageId": f"{uuid.uuid4()}",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
"body": body,
"attributes": {
Expand Down Expand Up @@ -119,17 +119,6 @@ def handler(record):
return handler


@pytest.fixture(scope="module")
def sqs_fifo_record_handler() -> Callable:
def handler(record):
body = record["body"]
if "fail" in body:
raise Exception("Failed to process record.")
return body

return handler


@pytest.fixture(scope="module")
def async_record_handler() -> Callable[..., Awaitable[Any]]:
async def handler(record):
Expand Down Expand Up @@ -667,15 +656,15 @@ def lambda_handler(event, context):
assert "All records failed processing. " in str(e.value)


def test_sqs_fifo_batch_processor_middleware_success_only(sqs_event_factory, sqs_fifo_record_handler):
def test_sqs_fifo_batch_processor_middleware_success_only(sqs_event_factory, record_handler):
# GIVEN
first_record = SQSRecord(sqs_event_factory("success"))
second_record = SQSRecord(sqs_event_factory("success"))
event = {"Records": [first_record.raw_event, second_record.raw_event]}

processor = SQSFifoPartialProcessor()
processor = SqsFifoPartialProcessor()

@batch_processor(record_handler=sqs_fifo_record_handler, processor=processor)
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context):
return processor.response()

Expand All @@ -686,17 +675,17 @@ def lambda_handler(event, context):
assert result["batchItemFailures"] == []


def test_sqs_fifo_batch_processor_middleware_with_failure(sqs_event_factory, sqs_fifo_record_handler):
def test_sqs_fifo_batch_processor_middleware_with_failure(sqs_event_factory, record_handler):
# GIVEN
first_record = SQSRecord(sqs_event_factory("success"))
second_record = SQSRecord(sqs_event_factory("fail"))
# this would normally suceed, but since it's a FIFO queue, it will be marked as failure
# this would normally succeed, but since it's a FIFO queue, it will be marked as failure
third_record = SQSRecord(sqs_event_factory("success"))
event = {"Records": [first_record.raw_event, second_record.raw_event, third_record.raw_event]}

processor = SQSFifoPartialProcessor()
processor = SqsFifoPartialProcessor()

@batch_processor(record_handler=sqs_fifo_record_handler, processor=processor)
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context):
return processor.response()

Expand Down