Skip to content

Improving Observability in Asynchronous Processing (CompletableFuture, Mono) #3528

@chickenchickenlove

Description

@chickenchickenlove

Expected Behavior

When the consumer endpoint method returns CompletableFuture<?> or Mono<?> as a result, and both the CompletableFuture<?> and the Mono<?> are fail to complete due to an error thrown in the async loop, spring-kafka should report metrics indicating that there have been failures in async consumer tasks.

For example,

spring_kafka_listener_seconds_count{
error="AsyncLoopExecutionFailedException",
messaging_kafka_consumer_group="topic4",
messaging_operation="receive",
messaging_source_kind="topic",
messaging_source_name="topic4",
messaging_system="kafka",
spring_kafka_listener_id="org.springframework.kafka.KafkaListenerEndpointContainer#0-0"} 6

Current Behavior

When the consumer endpoint method returns CompletableFuture<?> or Mono<?> as result, the MessagingMessageListenerAdapater adds a callback to both the CompletableFuture<?> and the Mono<?>.

However, currently, KafkaMessageListenerContainer does not consider whether CompletableFuture and Mono fail to complete.

If both CompletableFuture and Mono instances are success to be created, KafkaMessageListenerContainer will not report error metric even if both CompletableFuture and Mono fail to complete.

Context

I discovered this issue while solving another problem(GH-3276), and I checked to see if it was actually the case.
As expected, there was an issue.

To reproduce it, you can use method below.
it describes that

  1. The CompletableFuture<?> successes to be created.
  2. The CompletableFuture<?> fails to complete.
@Slf4j
@Component
public class MyAsyncBean {

    @RetryableTopic(
            attempts = "5",
            topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE,
            autoCreateTopics = "true",
            backoff = @Backoff(delay = 1000)
    )
    @KafkaListener(topics = "topic4", groupId = "topic4")
    public CompletableFuture<�Void> receive(ConsumerRecord<Object, Object> record) {
        return CompletableFuture.supplyAsync(() -> {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }

                    throw new RuntimeException("Should report metric.");
                });
    }
}

To reviewer

If you consider this to be a reasonable request,
may I take on this task and try to solve it?

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions