Skip to content

Custom AfterRollbackProcessor not invoked with ConcurrentMessageListenerContainer #690

@abhinabasarkar

Description

@abhinabasarkar

During consumer receive, if a transaction fails, I want to send the received message to a failure topic for which I am using a custom AfterRollbackProcessor. This is not getting invoked when I am using a ConcurrentMessageListenerContainer. Instead, DefaultAfterRollbackProcessor is getting called. Here is the code and the config that I am using -

@Test
    public void simpleTransactionTest1() throws InterruptedException {
        String testCaseId = "simpleTransactionTest";
        final CountDownLatch latch = new CountDownLatch(1);

        /**
         * producer properties
         */
        Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092");
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        
        /**
         * send to test topic
         */
        Producer<String, String> producer = new KafkaProducer<>(producerProps);
        ProducerRecord<String, String> pr = new ProducerRecord<String, String>(
                "test", "simpleTransactionTest");
        producer.send(pr);
        producer.close();

        /**
         * producer factory
         */
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(
                producerProps);
        pf.setTransactionIdPrefix("my.transaction");
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);

        /**
         * consumer properties
         */
        Map<String, Object> consumerProps = kafkaConfig.consumerProps();
        consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
                "read_committed");
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        /**
         * consumer factory
         */
        DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
                consumerProps);
        ContainerProperties cp = new ContainerProperties("test");
        KafkaTransactionManager<String, String> tm = new KafkaTransactionManager<>(
                pf);
        cp.setTransactionManager(tm);
        cp.setAckMode(AckMode.RECORD);
        cp.setAckOnError(true);
        cp.setMessageListener((MessageListener<String, String>) m -> {
            kafkaTemplate.send("output1", "output1 " + m.value());
            kafkaTemplate.send("output2", "output2 " + m.value());
            throw new RuntimeException("TestException");
        });

        /**
         * kafka message listener container
         */
        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
                cf, cp);
        container.setAfterRollbackProcessor((records, consumer) -> {
            Assert.assertTrue(records.size() == 1);
            DefaultKafkaProducerFactory<String, String> pf1 = new DefaultKafkaProducerFactory<>(
                    producerProps);
            KafkaTemplate<String, String> kafkaTemplate1 = new KafkaTemplate<>(
                    pf1);
            kafkaTemplate1.send("test.bo", "bo " + records.get(0).value());
            consumer.commitSync();
            latch.countDown();
        });
        container.setBeanName("simpleTransaction");
        container.start();

        // some sleep
        Thread.sleep(2000);

        // check whether latch is zero
        Assert.assertTrue(latch.await(60, TimeUnit.SECONDS));

        container.stop();

        // some sleep
        Thread.sleep(2000);

        Consumer<String, String> consumer = cf.createConsumer();
        consumer.subscribe(Collections.singletonList("test"));
        ConsumerRecords<String, String> records = consumer.poll(1000);
        consumer.commitSync();
        consumer.close();
        Assert.assertTrue(records.count() == 0);

    }

This works if I am using KafkaMessageListenerContainer instead.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions