Skip to content

Bug - KafkaTemplate.sendOffsetsToTransaction() doesn't work in executeInTransaction #1168

@Hixon10

Description

@Hixon10

Affects Version(s): spring-kafka-2.3.0.M2

🐞 Bug report

I use org.apache.kafka.clients.consumer.KafkaConsumer for consuming messages. I mean, it is not spring related consumer.

I use org.springframework.kafka.core.KafkaTemplate for producing messages. I create its bean like this:

@Bean
public Map<String, Object> producerConfigs() {
    final Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrapServers");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    return props;
}

@Bean
public DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory() {
    DefaultKafkaProducerFactory<String, String> kafkaProducerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
    kafkaProducerFactory.setTransactionIdPrefix("transaction-id-prefix");
    return kafkaProducerFactory;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory) {
    return new KafkaTemplate<>(defaultKafkaProducerFactory);
}

I produce result messages like this:

ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(POLL_INTERVAL_IN_MS));

List<List<String>> outputMessages = produceOutput(consumerRecords);

kafkaTemplate.executeInTransaction(kafkaProducer -> {
    for (List<String> resultTasks : outputMessages) {
        for (String resultTask : resultTasks) {
            kafkaProducer.send("topic", "key", resultTask);
        }
    }

    kafkaProducer.sendOffsetsToTransaction(getOffsetsForCommit(consumerRecords), "consumerGroupId");
    return true;
});

Finally, I have this error:

java.lang.IllegalArgumentException: No transaction in process
	at org.springframework.util.Assert.isTrue(Assert.java:118)
	at org.springframework.kafka.core.KafkaTemplate.sendOffsetsToTransaction(KafkaTemplate.java:345)

Exception throws in this method:

@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
	@SuppressWarnings("unchecked")
	KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager
			.getResource(this.producerFactory);
	Assert.isTrue(resourceHolder != null, "No transaction in process"); // here
	if (resourceHolder.getProducer() != null) {
		resourceHolder.getProducer().sendOffsetsToTransaction(offsets, consumerGroupId);
	}
}

This bug is based on https://stackoverflow.com/a/57095356/1756750

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions