Skip to content

docs(batch-processing): Support for moving non retryable msg to DLQ #531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 14, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 129 additions & 6 deletions docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,14 @@ To install this utility, add the following dependency to your project.
}
```

**IAM Permissions**
## IAM Permissions

This utility requires additional permissions to work as expected. Lambda functions using this utility require the `sqs:GetQueueUrl` and `sqs:DeleteMessageBatch` permission.
This utility requires additional permissions to work as expected. Lambda functions using this utility require the `sqs:DeleteMessageBatch` permission.

If you are also using [nonRetryableExceptions](#move-non-retryable-messages-to-a-dead-letter-queue) attribute, utility will need additional permission of `sqs:GetQueueAttributes` on source SQS.
It also needs `sqs:SendMessage` and `sqs:SendMessageBatch` on configured dead letter queue.

Refer [example project](https://github.com/aws-samples/aws-lambda-powertools-examples/blob/main/java/SqsBatchProcessing/template.yaml#L67) for policy details example.

## Processing messages from SQS

Expand All @@ -91,7 +96,8 @@ Both have nearly the same behaviour when it comes to processing messages from th
* **Entire batch has been successfully processed**, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost
* **Entire Batch has been partially processed successfully**, where exceptions were raised within your `SqsMessageHandler` interface implementation, we will:
- **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch`
- **2)** Raise `SQSBatchProcessingException` to ensure failed messages return to your SQS queue
- **2)** if, non retryable exceptions occur, messages resulting in configured exceptions during processing will be immediately moved to the dead letter queue associated to the source SQS queue or deleted from the source SQS queue if `deleteNonRetryableMessageFromQueue` is set to `true`.
- **3)** Raise `SQSBatchProcessingException` to ensure failed messages return to your SQS queue

The only difference is that **SqsUtils Utility API** will give you access to return from the processed messages if you need. Exception `SQSBatchProcessingException` thrown from the
utility will have access to both successful and failed messaged along with failure exceptions.
Expand All @@ -110,15 +116,20 @@ When using this annotation, you need provide a class implementation of `SqsMessa

All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:

* **Any successfully processed messages**, we will delete them from the queue via `sqs:DeleteMessageBatch`
* **Any unprocessed messages detected**, we will raise `SQSBatchProcessingException` to ensure failed messages return to your SQS queue
* **Any successfully processed messages**, we will delete them from the queue via `sqs:DeleteMessageBatch`.
* **if, nonRetryableExceptions attribute is used**, messages resulting in configured exceptions during processing will be immediately moved to the dead letter queue associated to the source SQS queue or deleted from the source SQS queue if `deleteNonRetryableMessageFromQueue` is set to `true`.
* **Any unprocessed messages detected**, we will raise `SQSBatchProcessingException` to ensure failed messages return to your SQS queue.

!!! warning
You will not have access to the **processed messages** within the Lambda Handler - all processing logic will and should be performed by the implemented `#!java SqsMessageHandler#process()` function.

=== "AppSqsEvent.java"

```java hl_lines="3"
```java hl_lines="7"
import software.amazon.lambda.powertools.sqs.SqsBatch;
import software.amazon.lambda.powertools.sqs.SqsMessageHandler;
import software.amazon.lambda.powertools.sqs.SqsUtils;

public class AppSqsEvent implements RequestHandler<SQSEvent, String> {
@Override
@SqsBatch(SampleMessageHandler.class)
Expand All @@ -139,6 +150,39 @@ All records in the batch will be passed to this handler for processing, even if
}
```

=== "AppSqsEventWithNonRetryableExceptions.java"

```java hl_lines="7 21"
import software.amazon.lambda.powertools.sqs.SqsBatch;
import software.amazon.lambda.powertools.sqs.SqsMessageHandler;
import software.amazon.lambda.powertools.sqs.SqsUtils;

public class AppSqsEvent implements RequestHandler<SQSEvent, String> {
@Override
@SqsBatch(value = SampleMessageHandler.class, nonRetryableExceptions = {IllegalArgumentException.class})
public String handleRequest(SQSEvent input, Context context) {
return "{\"statusCode\": 200}";
}

public class SampleMessageHandler implements SqsMessageHandler<Object> {

@Override
public String process(SQSMessage message) {
// This will be called for each individual message from a batch
// It should raise an exception if the message was not processed successfully
String returnVal = doSomething(message.getBody());

if(/**Business validation failure**/) {
throw new IllegalArgumentException("Failed business validation. No point of retrying. Move me to DLQ." + message.getMessageId());
}

return returnVal;
}
}
}
```


### SqsUtils Utility API

If you require access to the result of processed messages, you can use this utility. The result from calling **`#!java SqsUtils#batchProcessor()`** on the context manager will be a list of all the return values
Expand Down Expand Up @@ -248,3 +292,82 @@ If you want to disable the default behavior where `SQSBatchProcessingException`
return returnValues;
}
```

## Move non retryable messages to a dead letter queue

If you want certain exceptions to be treated as permanent failures during batch processing, i.e. exceptions where the result of retrying will
always be a failure and want these can be immediately moved to the dead letter queue associated to the source SQS queue, you can use `SqsBatch#nonRetryableExceptions()`
to configure such exceptions.

If you want such messages to be deleted instead, set `SqsBatch#deleteNonRetryableMessageFromQueue()` to `true`. By default, its value is `false`.

Same capability is also provided by [SqsUtils Utility API](#sqsutils-utility-api).

!!! info
Make sure the lambda function has required permissions needed by utility. Refer [this section](#iam-permissions).

=== "SqsBatch annotation"

```java hl_lines="7 21"
import software.amazon.lambda.powertools.sqs.SqsBatch;
import software.amazon.lambda.powertools.sqs.SqsMessageHandler;
import software.amazon.lambda.powertools.sqs.SqsUtils;

public class AppSqsEvent implements RequestHandler<SQSEvent, String> {
@Override
@SqsBatch(value = SampleMessageHandler.class, nonRetryableExceptions = {IllegalArgumentException.class})
public String handleRequest(SQSEvent input, Context context) {
return "{\"statusCode\": 200}";
}

public class SampleMessageHandler implements SqsMessageHandler<Object> {

@Override
public String process(SQSMessage message) {
// This will be called for each individual message from a batch
// It should raise an exception if the message was not processed successfully
String returnVal = doSomething(message.getBody());

if(/**Business validation failure**/) {
throw new IllegalArgumentException("Failed business validation. No point of retrying. Move me to DLQ." + message.getMessageId());
}

return returnVal;
}
}
}
```

=== "SqsBatch API"

```java hl_lines="9 23"
import software.amazon.lambda.powertools.sqs.SqsBatch;
import software.amazon.lambda.powertools.sqs.SqsMessageHandler;
import software.amazon.lambda.powertools.sqs.SqsUtils;

public class AppSqsEvent implements RequestHandler<SQSEvent, String> {
@Override
public String handleRequest(SQSEvent input, Context context) {

SqsUtils.batchProcessor(input, BatchProcessor.class, IllegalArgumentException.class);

return "{\"statusCode\": 200}";
}

public class SampleMessageHandler implements SqsMessageHandler<Object> {

@Override
public String process(SQSMessage message) {
// This will be called for each individual message from a batch
// It should raise an exception if the message was not processed successfully
String returnVal = doSomething(message.getBody());

if(/**Business validation failure**/) {
throw new IllegalArgumentException("Failed business validation. No point of retrying. Move me to DLQ." + message.getMessageId());
}

return returnVal;
}
}
}
```