Skip to content

Fence the MessageLisnterContainer restart once the ConcurrentMessageListenerContainer is stopped #3371

@LokeshAlamuri

Description

@LokeshAlamuri

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.3

Describe the bug

  1. Start the ConcurrentMessageListenerContainer with concurrency 2.
  2. Obtain one of the KafkaMessageListenerContainer from the ConcurrentMessageListenerContainer.
  3. Stop KafkaMessageListenerContainer.
  4. Stop ConcurrentMessageListenerContainer
  5. Start previous KafkaMessageListenerContainer. This container would start running even though ConcurrentMessageListenerContainer has stopped. It is even holding the reference of ConcurrentMessageListenerContainer.

To Reproduce

I have modified the Junit in org.springframework.kafka.listener.ConcurrentMessageListenerContainerTests to reproduce.

    `@Test
public void testAutoCommit() throws Exception {
	this.logger.info("Start auto");
	Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true", embeddedKafka);
	AtomicReference<Properties> overrides = new AtomicReference<>();
	DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props) {
		@Override
		protected Consumer<Integer, String> createKafkaConsumer(String groupId, String clientIdPrefix,
				String clientIdSuffixArg, Properties properties) {

			overrides.set(properties);
			return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties);
		}

	};
	ContainerProperties containerProps = new ContainerProperties(topic1);
	containerProps.setLogContainerConfig(true);
	containerProps.setClientId("client");

	final CountDownLatch latch = new CountDownLatch(3);
	final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
	final List<String> payloads = new ArrayList<>();
	containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
		ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
		listenerThreadNames.add(Thread.currentThread().getName());
		payloads.add(message.value());
		latch.countDown();
	});

	ConcurrentMessageListenerContainer<Integer, String> container =
			new ConcurrentMessageListenerContainer<>(cf, containerProps);
	container.setConcurrency(2);
	container.setBeanName("testAuto");
	container.setChangeConsumerThreadName(true);
	BlockingQueue<KafkaEvent> events = new LinkedBlockingQueue<>();
	CountDownLatch stopLatch = new CountDownLatch(4);
	CountDownLatch concurrentContainerStopLatch = new CountDownLatch(1);
	container.setApplicationEventPublisher(e -> {
		events.add((KafkaEvent) e);
		if (e instanceof ContainerStoppedEvent) {
			stopLatch.countDown();
		}
		if (e instanceof ConcurrentContainerStoppedEvent) {
			concurrentContainerStopLatch.countDown();
		}
	});
	CountDownLatch intercepted = new CountDownLatch(4);
	container.setRecordInterceptor((record, consumer) -> {
		intercepted.countDown();
		return record.value().equals("baz") ? null : record;
	});
	container.start();

	ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
	assertThat(container.getAssignedPartitions()).hasSize(2);
	Map<String, Collection<TopicPartition>> assignments = container.getAssignmentsByClientId();
	assertThat(assignments).hasSize(2);
	assertThat(assignments.get("client-0")).isNotNull();
	assertThat(assignments.get("client-1")).isNotNull();

	Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
	ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
	KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
	template.setDefaultTopic(topic1);
	template.sendDefault(0, 0, "foo");
	template.sendDefault(1, 2, "bar");
	template.sendDefault(0, 0, "baz");
	template.sendDefault(1, 2, "qux");
	template.flush();
	assertThat(intercepted.await(10, TimeUnit.SECONDS)).isTrue();
	assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
	assertThat(payloads).containsExactlyInAnyOrder("foo", "bar", "qux");
	assertThat(listenerThreadNames).contains("testAuto-0", "testAuto-1");
	List<KafkaMessageListenerContainer<Integer, String>> containers = KafkaTestUtils.getPropertyValue(container,
			"containers", List.class);
	assertThat(containers).hasSize(2);
	for (int i = 0; i < 2; i++) {
		assertThat(KafkaTestUtils.getPropertyValue(containers.get(i), "listenerConsumer.acks", Collection.class)
				.size()).isEqualTo(0);
	}
	assertThat(container.metrics()).isNotNull();
	Set<KafkaMessageListenerContainer<Integer, String>> children = new HashSet<>(containers);
	assertThat(container.isInExpectedState()).isTrue();
	container.getContainers().get(0).stopAbnormally(() -> { });
	assertThat(container.isInExpectedState()).isFalse();
	MessageListenerContainer childMessageListenerContainer = container.getContainers().get(0);
	container.getContainers().get(0).start();
	container.stop();
	assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
	assertThat(concurrentContainerStopLatch.await(10, TimeUnit.SECONDS)).isTrue();
	assertThat(container.isInExpectedState()).isTrue();
	events.forEach(e -> {
		assertThat(e.getContainer(MessageListenerContainer.class)).isSameAs(container);
		if (e instanceof ContainerStoppedEvent) {
			if (e.getSource().equals(container)) {
				assertThat(e.getContainer(MessageListenerContainer.class)).isSameAs(container);
			}
			else {
				assertThat(children).contains((KafkaMessageListenerContainer<Integer, String>) e.getSource());
			}
		}
		else if (e instanceof ConcurrentContainerStoppedEvent concurrentContainerStoppedEvent) {
			assertThat(concurrentContainerStoppedEvent.getSource()).isSameAs(container);
			assertThat(concurrentContainerStoppedEvent.getContainer(MessageListenerContainer.class))
					.isSameAs(container);
			assertThat(concurrentContainerStoppedEvent.getReason()).isEqualTo(ConsumerStoppedEvent.Reason.NORMAL);
		}
		else {
			assertThat(children).contains((KafkaMessageListenerContainer<Integer, String>) e.getSource());
		}
	});
	assertThat(overrides.get().getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)).isNull();
	this.logger.info("Stop auto");

	// container started event though it is not supposed to start.
	childMessageListenerContainer.start();
	// container is in running state and the following assertion would fail.
	assertThat(childMessageListenerContainer.isRunning()).isFalse();
}`

Expected behavior

I think, these containers should not be allowed to start once the ConcurrentMessageListenerContainer is stopped.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions