From 2add84e804c54bb6b6f7e3bd93366445f52449dd Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 27 Jun 2025 10:43:56 -0700 Subject: [PATCH] KIP-207: Add ListOffsetsRequest v5 / handle OffsetNotAvailableError --- kafka/consumer/fetcher.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 1689b23f1..d57bc4786 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -508,7 +508,7 @@ def _group_list_offset_requests(self, timestamps): return dict(timestamps_by_node) def _send_list_offsets_request(self, node_id, timestamps_and_epochs): - version = self._client.api_version(ListOffsetsRequest, max_version=4) + version = self._client.api_version(ListOffsetsRequest, max_version=5) if self.config['isolation_level'] == 'read_committed' and version < 2: raise Errors.UnsupportedVersionError('read_committed isolation level requires ListOffsetsRequest >= v2') by_topic = collections.defaultdict(list) @@ -521,14 +521,14 @@ def _send_list_offsets_request(self, node_id, timestamps_and_epochs): data = (tp.partition, timestamp, 1) by_topic[tp.topic].append(data) - if version <= 1: + if version >= 2: request = ListOffsetsRequest[version]( -1, + self._isolation_level, list(six.iteritems(by_topic))) else: request = ListOffsetsRequest[version]( -1, - self._isolation_level, list(six.iteritems(by_topic))) # Client returns a future that only fails on network issues @@ -588,7 +588,9 @@ def _handle_list_offsets_response(self, future, response): " message format version is before 0.10.0", partition) elif error_type in (Errors.NotLeaderForPartitionError, Errors.ReplicaNotAvailableError, - Errors.KafkaStorageError): + Errors.KafkaStorageError, + Errors.OffsetNotAvailableError, + Errors.LeaderNotAvailableError): log.debug("Attempt to fetch offsets for partition %s failed due" " to %s, retrying.", error_type.__name__, partition) partitions_to_retry.add(partition)