Skip to content

KafkaMessageListenerContainer.doInvokeBatchListener() #657

@Gunju-Ko

Description

@Gunju-Ko

Please understand that I'm not good at english.

Even though the transaction is rolled back in the doInvokeBatchListener, the offset can be committed.

Below code is a doInvokeBatchListener's catch

catch (RuntimeException e) {
	if (this.containerProperties.isAckOnError() && !this.autoCommit) {
		for (ConsumerRecord<K, V> record : getHighestOffsetRecords(recordList)) {
			this.acks.add(record);
		}
	}
	if (this.batchErrorHandler == null) {
		throw e;
	}
	.... skip
}		

If ackOnError is true and autoCommit is false, Add ConsumerRecord to acks for committing offset.

If batchErrorHandler is null, exception is throw. So Kafka Transaction will be rollback. (If KafkaMessageListenerContainer is provided KafkaTransactionManager)

problem is that before call KafkaConsumer.poll method() processsCommits() can be called.

below is code

while (isRunning()) {
	try {
		if (!this.autoCommit && !this.isRecordAck) {
			processCommits();
		}

As a result even through Kafka Transaction is rollbacked but offset is committed.

I think before add ConsumerRecord to acks, checks producer and batchErrorHandler like below code

if (producer != null && this.batchErrorHandler == null) {
	throw e;
}
if (this.containerProperties.isAckOnError() && !this.autoCommit) {
	for (ConsumerRecord<K, V> record : getHighestOffsetRecords(recordList)) {
		this.acks.add(record);
	}
}
if (this.batchErrorHandler == null) {
	throw e;
}

Thanks

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions