Skip to content

KafkaConsumer.close() no longer triggers the DefaultKafkaConsumerFactory.listener.removeConsumer() #3375

@yanivnahoum

Description

@yanivnahoum

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.1.0+ (included in Spring Boot 3.2+)

Describe the bug
This PR removed the use of a proxy to invoke the listeners upon close, and replaced it with a simple class: org.springframework.kafka.core.DefaultKafkaConsumerFactory.ExtendedKafkaConsumer. However, ExtendedKafkaConsumer only overrides close(Duration timeout), but not close(). As a result, when creating an ad-hoc consumer via the Spring Boot auto-configured ConsumerFactory using try-with-resources, the ConsumerFactory listeners are invoked on creation, but are not cleaned up when exiting the try block's scope (since close() is called in try-with-resources, not close(Duration timeout))

To Reproduce

private final ConsumerFactory<String, String> consumerFactory;
...
public void consume() {
  try (var consumer = consumerFactory.createConsumer()) {
    // use consumer here
  } // consumer.close() will be invoked here.
}

This resulted in a notable memory leak, since the above mentioned ad hoc consumer was created for a health check invoked every 60s. Creating a consumer invokes the ConsumerFactory's listener.consumerAdded() method which in turn invokes org.springframework.kafka.core.MicrometerConsumerListener#consumerAdded, adding a new KafkaClientMetrics to the MicrometerConsumerListener metrics map. Since the ConsumerFactory's listener.consumerRemoved() never gets invoked when the consumer is closed, the KafkaClientMetrics is never removed from the map, and the map just grows and grows.

Expected behavior
ConsumerFactory listeners should be notified when a consumer is closed.

Sample

See above.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions