|
15 | 15 |
|
16 | 16 | import java.lang.reflect.Constructor;
|
17 | 17 | import java.util.ArrayList;
|
| 18 | +import java.util.LinkedList; |
18 | 19 | import java.util.List;
|
| 20 | +import java.util.Queue; |
19 | 21 | import java.util.function.Function;
|
20 | 22 | import java.util.stream.Collectors;
|
21 | 23 |
|
@@ -496,41 +498,43 @@ public static <R> List<R> batchProcessor(final SQSEvent event,
|
496 | 498 | // If we are working on a FIFO queue, when any message fails we should stop processing and return the
|
497 | 499 | // rest of the batch as failed too. We use this variable to track when that has happened.
|
498 | 500 | // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
|
499 |
| - boolean failWholeBatch = false; |
500 | 501 |
|
501 | 502 | BatchContext batchContext = new BatchContext(client);
|
502 |
| - |
503 |
| - for (SQSMessage message : event.getRecords()) { |
504 |
| - // If the batch has already failed, we mark each message past the failure point as failed |
505 |
| - if (failWholeBatch) { |
506 |
| - LOG.info("Skipping message {} as another message with a message group failed in this batch", |
507 |
| - message.getMessageId()); |
508 |
| - batchContext.addFailure(message, new SkippedMessageDueToFailedBatchException()); |
509 |
| - } else { |
510 |
| - // If the batch hasn't failed, try process the message |
511 |
| - try { |
512 |
| - handlerReturn.add(handler.process(message)); |
513 |
| - batchContext.addSuccess(message); |
514 |
| - } catch(Exception e){ |
515 |
| - |
516 |
| - // Record the failure |
517 |
| - batchContext.addFailure(message, e); |
518 |
| - |
519 |
| - // If we are trying to process a message that has a messageGroupId, we are on a FIFO queue. A failure |
520 |
| - // now stops us from processing the rest of the batch. |
521 |
| - String messageGroupId = message.getAttributes() != null ? |
522 |
| - message.getAttributes().get(MESSAGE_GROUP_ID) : null; |
523 |
| - |
524 |
| - if (messageGroupId != null) { |
525 |
| - failWholeBatch = true; |
526 |
| - LOG.info("A message in a message batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too" |
527 |
| - , messageGroupId, message.getMessageId()); |
528 |
| - } |
529 |
| - LOG.error("Encountered issue processing message: {}", message.getMessageId(), e); |
| 503 | + Queue<SQSMessage> messagesToProcess = new LinkedList<>(event.getRecords()); |
| 504 | + while (!messagesToProcess.isEmpty()) { |
| 505 | + SQSMessage message = messagesToProcess.remove(); |
| 506 | + // If the batch hasn't failed, try process the message |
| 507 | + try { |
| 508 | + handlerReturn.add(handler.process(message)); |
| 509 | + batchContext.addSuccess(message); |
| 510 | + } catch(Exception e){ |
| 511 | + |
| 512 | + // Record the failure |
| 513 | + batchContext.addFailure(message, e); |
| 514 | + |
| 515 | + // If we are trying to process a message that has a messageGroupId, we are on a FIFO queue. A failure |
| 516 | + // now stops us from processing the rest of the batch; we break out of the loop leaving unprocessed |
| 517 | + // messages in the queu |
| 518 | + String messageGroupId = message.getAttributes() != null ? |
| 519 | + message.getAttributes().get(MESSAGE_GROUP_ID) : null; |
| 520 | + if (messageGroupId != null) { |
| 521 | + LOG.info("A message in a message batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too" |
| 522 | + , messageGroupId, message.getMessageId()); |
| 523 | + break; |
530 | 524 | }
|
| 525 | + LOG.error("Encountered issue processing message: {}", message.getMessageId(), e); |
531 | 526 | }
|
532 | 527 | }
|
533 | 528 |
|
| 529 | + // If we have a FIFO batch failure, unprocessed messages will remain on the queue |
| 530 | + // past the failed message. We have to add these to the errors |
| 531 | + while (!messagesToProcess.isEmpty()) { |
| 532 | + SQSMessage message = messagesToProcess.remove(); |
| 533 | + LOG.info("Skipping message {} as another message with a message group failed in this batch", |
| 534 | + message.getMessageId()); |
| 535 | + batchContext.addFailure(message, new SkippedMessageDueToFailedBatchException()); |
| 536 | + } |
| 537 | + |
534 | 538 | batchContext.processSuccessAndHandleFailed(handlerReturn, suppressException, deleteNonRetryableMessageFromQueue, nonRetryableExceptions);
|
535 | 539 |
|
536 | 540 | return handlerReturn;
|
|
0 commit comments