Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package org.springframework.kafka.listener;

import java.util.Collection;
import java.util.List;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;

import org.springframework.kafka.support.TopicPartitionOffset;

Expand Down Expand Up @@ -212,4 +214,13 @@ default void setAckAfterHandle(boolean ack) {
throw new UnsupportedOperationException("This error handler does not support setting this property");
}

/**
* Called when partitions are assigned.
* @param consumer the consumer.
* @param partitions the newly assigned partitions.
* @since 2.8.8
*/
default void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

package org.springframework.kafka.listener;

import java.util.Collection;
import java.util.List;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;

import org.springframework.kafka.support.KafkaUtils;
Expand Down Expand Up @@ -199,4 +201,9 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
}
}

@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
getFallbackBatchHandler().onPartitionsAssigned(consumer, partitions);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package org.springframework.kafka.listener;

import java.util.Collection;
import java.util.Collections;
import java.util.List;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;

import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -160,5 +162,12 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
}
}

@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
if (this.batchErrorHandler instanceof FallbackBatchErrorHandler) {
((FallbackBatchErrorHandler) this.batchErrorHandler).onPartitionsAssigned(consumer, partitions);
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
this(recoverer, backOff, null, fallbackHandler);
}

/**
* Return the fallback batch error handler.
* @return the handler.
* @since 2.8.8
*/
protected CommonErrorHandler getFallbackBatchHandler() {
return this.fallbackBatchHandler;
}


/**
* Construct an instance with the provided properties.
* @param recoverer the recoverer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

package org.springframework.kafka.listener;

import java.util.Collection;
import java.util.function.BiConsumer;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;

import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -54,6 +56,8 @@ class FallbackBatchErrorHandler extends KafkaExceptionLogLevelAware

private boolean ackAfterHandle = true;

private boolean retrying;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

volatile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need; always used from the consumer thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, you mean that onPartitionsAssigned() is called withing that ErrorHandlingUtils.retryBatch() process?
Otherwise I don't see how this variable is involved...

Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ErrorHandlingUtils.retryBatch pauses the consumer and polls in a while loop until retries are exhausted, and then resumes.

We need to repause the consumer if onPartitionsAssigned() is called while we are in that loop (it is called from poll() in the kafka-clients).

So the thread that tests this field is the same thread that is looping within retryBatch.


/**
* Construct an instance with a default {@link FixedBackOff} (unlimited attempts with
* a 5 second back off).
Expand Down Expand Up @@ -99,8 +103,16 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
this.logger.error(thrownException, "Called with no records; consumer exception");
return;
}
this.retrying = true;
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
this.seeker, this.recoverer, this.logger, getLogLevel());
this.retrying = false;
}

public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
if (this.retrying) {
consumer.pause(consumer.assignment());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3464,6 +3464,10 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
ListenerConsumer.this.firstPoll = true;
ListenerConsumer.this.consumerSeekAwareListener.onFirstPoll();
}
if (ListenerConsumer.this.commonErrorHandler != null) {
ListenerConsumer.this.commonErrorHandler.onPartitionsAssigned(ListenerConsumer.this.consumer,
partitions);
}
}

private void repauseIfNeeded(Collection<TopicPartition> partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -38,6 +39,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;

import org.springframework.kafka.KafkaException;
import org.springframework.util.backoff.FixedBackOff;
Expand Down Expand Up @@ -165,4 +167,39 @@ void exitOnContainerStop() {
assertThat(this.invoked).isEqualTo(1);
}

@Test
void rePauseOnRebalance() {
this.invoked = 0;
List<ConsumerRecord<?, ?>> recovered = new ArrayList<>();
FallbackBatchErrorHandler eh = new FallbackBatchErrorHandler(new FixedBackOff(0L, 1L), (cr, ex) -> {
recovered.add(cr);
});
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> map = new HashMap<>();
map.put(new TopicPartition("foo", 0),
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, "foo", "bar")));
map.put(new TopicPartition("foo", 1),
Collections.singletonList(new ConsumerRecord<>("foo", 1, 0L, "foo", "bar")));
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
Consumer<?, ?> consumer = mock(Consumer.class);
willAnswer(inv -> {
eh.onPartitionsAssigned(consumer, List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)));
return records;
}).given(consumer).poll(any());
MessageListenerContainer container = mock(MessageListenerContainer.class);
given(container.isRunning()).willReturn(true);
eh.handle(new RuntimeException(), records, consumer, container, () -> {
this.invoked++;
throw new RuntimeException();
});
assertThat(this.invoked).isEqualTo(1);
assertThat(recovered).hasSize(2);
InOrder inOrder = inOrder(consumer);
inOrder.verify(consumer).pause(any());
inOrder.verify(consumer).poll(any());
inOrder.verify(consumer).pause(any());
inOrder.verify(consumer).resume(any());
verify(consumer, times(3)).assignment();
verifyNoMoreInteractions(consumer);
}

}