From c2ca9b3100f141d496ebdb2ba5b3ce2524cb4d1a Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Mon, 31 Jan 2022 12:29:12 +0100 Subject: [PATCH 1/2] fix: Prevent message to be marked as success if failed sending to DLQ --- .../powertools/sqs/internal/BatchContext.java | 33 ++++++++++------ .../SqsMessageBatchProcessorAspectTest.java | 38 +++++++++++++++++-- 2 files changed, 56 insertions(+), 15 deletions(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java index ff9add984..b61c9e630 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java @@ -6,7 +6,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.IntStream; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; @@ -158,17 +159,23 @@ private boolean moveNonRetryableMessagesToDlqIfConfigured(Map { + List sendMessageBatchResponses = batchRequest(dlqMessages, 10, entriesToSend -> { SendMessageBatchResponse sendMessageBatchResponse = client.sendMessageBatch(SendMessageBatchRequest.builder() - .entries(entriesToSend) - .queueUrl(dlqUrl.get()) - .build()); + .entries(entriesToSend) + .queueUrl(dlqUrl.get()) + .build()); + LOG.debug("Response from send batch message to DLQ request {}", sendMessageBatchResponse); + + return sendMessageBatchResponse; }); - return true; + return sendMessageBatchResponses.stream() + .filter(response -> null != response && response.hasFailed()) + .peek(sendMessageBatchResponse -> LOG.error("Failed sending message to the DLQ. Entire batch will be re processed. Check if need permissions are configured for the function. Response: {}", sendMessageBatchResponse)) + .count() == 0; } @@ -220,17 +227,21 @@ private void deleteMessagesFromQueue(final List messages) { DeleteMessageBatchResponse deleteMessageBatchResponse = client.deleteMessageBatch(request); LOG.debug("Response from delete request {}", deleteMessageBatchResponse); + + return deleteMessageBatchResponse; }); } } - private void batchRequest(final List listOFEntries, - final int size, - final Consumer> batchLogic) { - IntStream.range(0, listOFEntries.size()) + private List batchRequest(final List listOFEntries, + final int size, + final Function, R> batchLogic) { + + return IntStream.range(0, listOFEntries.size()) .filter(index -> index % size == 0) .mapToObj(index -> listOFEntries.subList(index, Math.min(index + size, listOFEntries.size()))) - .forEach(batchLogic); + .map(batchLogic) + .collect(Collectors.toList()); } private String url(String queueArn) { diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java index dd10ec1df..a65aa486b 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java @@ -1,10 +1,7 @@ package software.amazon.lambda.powertools.sqs.internal; import java.io.IOException; -import java.time.LocalDateTime; import java.util.HashMap; -import java.util.function.Consumer; - import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; @@ -15,11 +12,13 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse; import software.amazon.awssdk.services.sqs.model.QueueAttributeName; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException; import software.amazon.lambda.powertools.sqs.handlers.LambdaHandlerApiGateway; import software.amazon.lambda.powertools.sqs.handlers.PartialBatchFailureSuppressedHandler; @@ -30,7 +29,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.assertj.core.api.Assertions.in; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -137,6 +135,38 @@ void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() { verify(sqsClient).sendMessageBatch(any(SendMessageBatchRequest.class)); } + @Test + void shouldBatchProcessAndThrowExceptionForNonRetryableExceptionWhenMoveToDlqReturnFailedResponse() { + requestHandler = new SqsMessageHandlerWithNonRetryableHandler(); + event.getRecords().get(0).setMessageId(""); + + when(sqsClient.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(SendMessageBatchResponse.builder() + .failed(BatchResultErrorEntry.builder() + .message("Permission Error") + .code("KMS.AccessDeniedException") + .senderFault(true) + .build()) + .build()); + + HashMap attributes = new HashMap<>(); + + attributes.put(QueueAttributeName.REDRIVE_POLICY, "{\n" + + " \"deadLetterTargetArn\": \"arn:aws:sqs:us-east-2:123456789012:retry-queue\",\n" + + " \"maxReceiveCount\": 2\n" + + "}"); + + when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder() + .attributes(attributes) + .build()); + + Assertions.assertThatExceptionOfType(SQSBatchProcessingException.class). + isThrownBy(() -> requestHandler.handleRequest(event, context)); + + verify(interactionClient).listQueues(); + verify(sqsClient).sendMessageBatch(any(SendMessageBatchRequest.class)); + verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + } + @Test void shouldBatchProcessAndDeleteNonRetryableExceptionMessage() { requestHandler = new SqsMessageHandlerWithNonRetryableHandlerWithDelete(); From 212c8410719fd8feab7bcf5b8a9a6b2ce3ce469b Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Mon, 31 Jan 2022 12:35:26 +0100 Subject: [PATCH 2/2] docs: IAM permission clarification when using with encrypted SQS --- docs/utilities/batch.md | 6 +++++- .../amazon/lambda/powertools/sqs/internal/BatchContext.java | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index ebb98b08b..1c3586360 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -93,7 +93,11 @@ This utility requires additional permissions to work as expected. Lambda functio If you are also using [nonRetryableExceptions](#move-non-retryable-messages-to-a-dead-letter-queue) attribute, utility will need additional permission of `sqs:GetQueueAttributes` on source SQS. It also needs `sqs:SendMessage` and `sqs:SendMessageBatch` on configured dead letter queue. -Refer [example project](https://github.com/aws-samples/aws-lambda-powertools-examples/blob/main/java/SqsBatchProcessing/template.yaml#L67) for policy details example. +If source or dead letter queue is configured to use encryption at rest using [AWS Key Management Service (KMS)](https://aws.amazon.com/kms/), function will need additional permissions of +`kms:GenerateDataKey` and `kms:Decrypt` on the KMS key being used for encryption. Refer [docs](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-key-management.html#compatibility-with-aws-services) for more details. + +Refer [example project](https://github.com/aws-samples/aws-lambda-powertools-examples/blob/main/java/SqsBatchProcessing/template.yaml#L105) for policy details example. + ## Processing messages from SQS diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java index b61c9e630..93ad2abc4 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java @@ -174,7 +174,7 @@ private boolean moveNonRetryableMessagesToDlqIfConfigured(Map null != response && response.hasFailed()) - .peek(sendMessageBatchResponse -> LOG.error("Failed sending message to the DLQ. Entire batch will be re processed. Check if need permissions are configured for the function. Response: {}", sendMessageBatchResponse)) + .peek(sendMessageBatchResponse -> LOG.error("Failed sending message to the DLQ. Entire batch will be re processed. Check if needed permissions are configured for the function. Response: {}", sendMessageBatchResponse)) .count() == 0; }