Skip to content

Commit 028ee88

Browse files
feat: Utility without annotation (#61)
1 parent 7b17f61 commit 028ee88

File tree

4 files changed

+318
-6
lines changed

4 files changed

+318
-6
lines changed

docs/content/utilities/sqs_large_message_handling.mdx

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,5 +104,50 @@ To disable deletion of payloads setting the following annotation parameter:
104104
```java
105105
@LargeMessageHandler(deletePayloads=false)
106106
public class SqsMessageHandler implements RequestHandler<SQSEvent, String> {
107+
108+
}
109+
```
110+
111+
## Utility
112+
113+
If you want to avoid using annotation and have control over error that can happen during payload enrichment.
114+
115+
`PowertoolsSqs.enrichedMessageFromS3()` provides you access with list of `SQSMessage` object enriched from S3 payload.
116+
Original `SQSEvent` object is never mutated. You can also control if the S3 payload should be deleted after successful
117+
processing. You can enrich messages from S3 with below code:
118+
119+
```java
120+
public class SqsMessageHandler implements RequestHandler<SQSEvent, String> {
121+
122+
@Override
123+
public String handleRequest(SQSEvent sqsEvent, Context context) {
124+
125+
Map<String, String> sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> {
126+
// Some business logic
127+
Map<String, String> someBusinessLogic = new HashMap<>();
128+
someBusinessLogic.put("Message", sqsMessages.get(0).getBody());
129+
return someBusinessLogic;
130+
});
131+
132+
// Do not delete payload after processing.
133+
Map<String, String> sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, false, sqsMessages -> {
134+
// Some business logic
135+
Map<String, String> someBusinessLogic = new HashMap<>();
136+
someBusinessLogic.put("Message", sqsMessages.get(0).getBody());
137+
return someBusinessLogic;
138+
});
139+
140+
// Better control over exception during enrichment
141+
try {
142+
// Do not delete payload after processing.
143+
PowertoolsSqs.enrichedMessageFromS3(sqsEvent, false, sqsMessages -> {
144+
// Some business logic
145+
});
146+
} catch (FailedProcessingLargePayloadException e) {
147+
// handle any exception.
148+
}
149+
150+
return "ok";
151+
}
107152
}
108153
```
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2020 Amazon.com, Inc. or its affiliates.
3+
* Licensed under the Apache License, Version 2.0 (the
4+
* "License"); you may not use this file except in compliance
5+
* with the License. You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*
13+
*/
14+
package software.amazon.lambda.powertools.sqs;
15+
16+
import java.util.List;
17+
import java.util.function.Function;
18+
import java.util.stream.Collectors;
19+
20+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
21+
import com.fasterxml.jackson.core.JsonProcessingException;
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect;
24+
import software.amazon.payloadoffloading.PayloadS3Pointer;
25+
26+
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
27+
import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.processMessages;
28+
29+
/**
30+
* A class of helper functions to add additional functionality to LargeMessageHandler.
31+
* <p>
32+
* {@see PowertoolsLogging}
33+
*/
34+
public class PowertoolsSqs {
35+
36+
private static final ObjectMapper objectMapper = new ObjectMapper();
37+
38+
/**
39+
* This is a utility method when you want to avoid using {@code LargeMessageHandler} annotation.
40+
* Gives you access to enriched messages from S3 in the SQS event produced via extended client lib.
41+
* If all the large S3 payload are successfully retrieved, it will delete them from S3 post success.
42+
*
43+
* @param sqsEvent Event received from SQS Extended client library
44+
* @param messageFunction Function to execute you business logic which provides access to enriched messages from S3 when needed.
45+
* @return Return value from the function.
46+
*/
47+
public static <R> R enrichedMessageFromS3(final SQSEvent sqsEvent,
48+
final Function<List<SQSMessage>, R> messageFunction) {
49+
return enrichedMessageFromS3(sqsEvent, true, messageFunction);
50+
}
51+
52+
/**
53+
* This is a utility method when you want to avoid using {@code LargeMessageHandler} annotation.
54+
* Gives you access to enriched messages from S3 in the SQS event produced via extended client lib.
55+
* if all the large S3 payload are successfully retrieved, Control if it will delete payload from S3 post success.
56+
*
57+
* @param sqsEvent Event received from SQS Extended client library
58+
* @param messageFunction Function to execute you business logic which provides access to enriched messages from S3 when needed.
59+
* @return Return value from the function.
60+
*/
61+
public static <R> R enrichedMessageFromS3(final SQSEvent sqsEvent,
62+
final boolean deleteS3Payload,
63+
final Function<List<SQSMessage>, R> messageFunction) {
64+
65+
List<SQSMessage> sqsMessages = sqsEvent.getRecords().stream()
66+
.map(PowertoolsSqs::clonedMessage)
67+
.collect(Collectors.toList());
68+
69+
List<PayloadS3Pointer> s3Pointers = processMessages(sqsMessages);
70+
71+
R returnValue = messageFunction.apply(sqsMessages);
72+
73+
if (deleteS3Payload) {
74+
s3Pointers.forEach(SqsMessageAspect::deleteMessage);
75+
}
76+
77+
return returnValue;
78+
}
79+
80+
private static SQSMessage clonedMessage(SQSMessage sqsMessage) {
81+
try {
82+
return objectMapper
83+
.readValue(objectMapper.writeValueAsString(sqsMessage), SQSMessage.class);
84+
} catch (JsonProcessingException e) {
85+
throw new RuntimeException(e);
86+
}
87+
}
88+
}

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,13 @@ && placedOnSqsEventRequestHandler(pjp)) {
5959
}
6060

6161
private List<PayloadS3Pointer> rewriteMessages(SQSEvent sqsEvent) {
62-
List<PayloadS3Pointer> s3Pointers = new ArrayList<>();
62+
List<SQSMessage> records = sqsEvent.getRecords();
63+
return processMessages(records);
64+
}
6365

64-
for (SQSMessage sqsMessage : sqsEvent.getRecords()) {
66+
public static List<PayloadS3Pointer> processMessages(final List<SQSMessage> records) {
67+
List<PayloadS3Pointer> s3Pointers = new ArrayList<>();
68+
for (SQSMessage sqsMessage : records) {
6569
if (isBodyLargeMessagePointer(sqsMessage.getBody())) {
6670
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody());
6771

@@ -79,11 +83,11 @@ private List<PayloadS3Pointer> rewriteMessages(SQSEvent sqsEvent) {
7983
return s3Pointers;
8084
}
8185

82-
private boolean isBodyLargeMessagePointer(String record) {
86+
private static boolean isBodyLargeMessagePointer(String record) {
8387
return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\"");
8488
}
8589

86-
private String readStringFromS3Object(S3Object object) {
90+
private static String readStringFromS3Object(S3Object object) {
8791
try (S3ObjectInputStream is = object.getObjectContent()) {
8892
return IOUtils.toString(is);
8993
} catch (IOException e) {
@@ -100,7 +104,15 @@ private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) {
100104
});
101105
}
102106

103-
private <R> R callS3Gracefully(final PayloadS3Pointer pointer,
107+
public static void deleteMessage(PayloadS3Pointer s3Pointer) {
108+
callS3Gracefully(s3Pointer, pointer -> {
109+
amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key());
110+
LOG.info("Message deleted from S3: " + s3Pointer.toJson());
111+
return null;
112+
});
113+
}
114+
115+
private static <R> R callS3Gracefully(final PayloadS3Pointer pointer,
104116
final Function<PayloadS3Pointer, R> function) {
105117
try {
106118
return function.apply(pointer);
@@ -119,7 +131,7 @@ public static boolean placedOnSqsEventRequestHandler(ProceedingJoinPoint pjp) {
119131
&& pjp.getArgs()[1] instanceof Context;
120132
}
121133

122-
static class FailedProcessingLargePayloadException extends RuntimeException {
134+
public static class FailedProcessingLargePayloadException extends RuntimeException {
123135
public FailedProcessingLargePayloadException(String message, Throwable cause) {
124136
super(message, cause);
125137
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package software.amazon.lambda.powertools.sqs;
2+
3+
import java.io.ByteArrayInputStream;
4+
import java.io.IOException;
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
import java.util.stream.Stream;
8+
9+
import com.amazonaws.AmazonServiceException;
10+
import com.amazonaws.SdkClientException;
11+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
12+
import com.amazonaws.services.s3.AmazonS3;
13+
import com.amazonaws.services.s3.model.S3Object;
14+
import com.amazonaws.services.s3.model.S3ObjectInputStream;
15+
import com.amazonaws.util.StringInputStream;
16+
import org.apache.http.client.methods.HttpRequestBase;
17+
import org.junit.jupiter.api.BeforeEach;
18+
import org.junit.jupiter.api.Test;
19+
import org.junit.jupiter.params.ParameterizedTest;
20+
import org.junit.jupiter.params.provider.Arguments;
21+
import org.junit.jupiter.params.provider.MethodSource;
22+
import org.junit.jupiter.params.provider.ValueSource;
23+
import org.mockito.Mock;
24+
import software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect;
25+
26+
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
27+
import static java.util.Collections.singletonList;
28+
import static org.apache.commons.lang3.reflect.FieldUtils.writeStaticField;
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
31+
import static org.mockito.Mockito.mock;
32+
import static org.mockito.Mockito.never;
33+
import static org.mockito.Mockito.verify;
34+
import static org.mockito.Mockito.verifyNoInteractions;
35+
import static org.mockito.Mockito.when;
36+
import static org.mockito.MockitoAnnotations.initMocks;
37+
38+
class PowertoolsSqsTest {
39+
40+
@Mock
41+
private AmazonS3 amazonS3;
42+
private static final String BUCKET_NAME = "ms-extended-sqs-client";
43+
private static final String BUCKET_KEY = "c71eb2ae-37e0-4265-8909-32f4153faddf";
44+
45+
@BeforeEach
46+
void setUp() throws IllegalAccessException {
47+
initMocks(this);
48+
writeStaticField(SqsMessageAspect.class, "amazonS3", amazonS3, true);
49+
}
50+
51+
@Test
52+
public void testLargeMessage() {
53+
S3Object s3Response = new S3Object();
54+
s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));
55+
56+
when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response);
57+
SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]");
58+
59+
Map<String, String> sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> {
60+
Map<String, String> someBusinessLogic = new HashMap<>();
61+
someBusinessLogic.put("Message", sqsMessages.get(0).getBody());
62+
return someBusinessLogic;
63+
});
64+
65+
assertThat(sqsMessage)
66+
.hasSize(1)
67+
.containsEntry("Message", "A big message");
68+
69+
verify(amazonS3).deleteObject(BUCKET_NAME, BUCKET_KEY);
70+
}
71+
72+
@ParameterizedTest
73+
@ValueSource(booleans = {true, false})
74+
public void testLargeMessageDeleteFromS3Toggle(boolean deleteS3Payload) {
75+
S3Object s3Response = new S3Object();
76+
s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));
77+
78+
when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response);
79+
SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]");
80+
81+
Map<String, String> sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, deleteS3Payload, sqsMessages -> {
82+
Map<String, String> someBusinessLogic = new HashMap<>();
83+
someBusinessLogic.put("Message", sqsMessages.get(0).getBody());
84+
return someBusinessLogic;
85+
});
86+
87+
assertThat(sqsMessage)
88+
.hasSize(1)
89+
.containsEntry("Message", "A big message");
90+
if (deleteS3Payload) {
91+
verify(amazonS3).deleteObject(BUCKET_NAME, BUCKET_KEY);
92+
} else {
93+
verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY);
94+
}
95+
}
96+
97+
@Test
98+
public void shouldNotProcessSmallMessageBody() {
99+
S3Object s3Response = new S3Object();
100+
s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));
101+
102+
when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response);
103+
SQSEvent sqsEvent = messageWithBody("This is small message");
104+
105+
Map<String, String> sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> {
106+
Map<String, String> someBusinessLogic = new HashMap<>();
107+
someBusinessLogic.put("Message", sqsMessages.get(0).getBody());
108+
return someBusinessLogic;
109+
});
110+
111+
assertThat(sqsMessage)
112+
.containsEntry("Message", "This is small message");
113+
114+
verifyNoInteractions(amazonS3);
115+
}
116+
117+
@ParameterizedTest
118+
@MethodSource("exception")
119+
public void shouldFailEntireBatchIfFailedDownloadingFromS3(RuntimeException exception) {
120+
when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenThrow(exception);
121+
122+
String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]";
123+
SQSEvent sqsEvent = messageWithBody(messageBody);
124+
125+
assertThatExceptionOfType(SqsMessageAspect.FailedProcessingLargePayloadException.class)
126+
.isThrownBy(() -> PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> sqsMessages.get(0).getBody()))
127+
.withCause(exception);
128+
129+
verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY);
130+
}
131+
132+
@Test
133+
public void shouldFailEntireBatchIfFailedProcessingDownloadMessageFromS3() throws IOException {
134+
S3Object s3Response = new S3Object();
135+
136+
s3Response.setObjectContent(new S3ObjectInputStream(new StringInputStream("test") {
137+
@Override
138+
public void close() throws IOException {
139+
throw new IOException("Failed");
140+
}
141+
}, mock(HttpRequestBase.class)));
142+
143+
when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response);
144+
145+
String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]";
146+
SQSEvent sqsEvent = messageWithBody(messageBody);
147+
148+
assertThatExceptionOfType(SqsMessageAspect.FailedProcessingLargePayloadException.class)
149+
.isThrownBy(() -> PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> sqsMessages.get(0).getBody()))
150+
.withCauseInstanceOf(IOException.class);
151+
152+
verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY);
153+
}
154+
155+
private static Stream<Arguments> exception() {
156+
return Stream.of(Arguments.of(new AmazonServiceException("Service Exception")),
157+
Arguments.of(new SdkClientException("Client Exception")));
158+
}
159+
160+
private SQSEvent messageWithBody(String messageBody) {
161+
SQSMessage sqsMessage = new SQSMessage();
162+
sqsMessage.setBody(messageBody);
163+
SQSEvent sqsEvent = new SQSEvent();
164+
sqsEvent.setRecords(singletonList(sqsMessage));
165+
return sqsEvent;
166+
}
167+
}

0 commit comments

Comments
 (0)