From 218936fca73d1c55d9550eca2516fd1ec5a08011 Mon Sep 17 00:00:00 2001 From: Mark McDonald Date: Fri, 31 Jan 2025 11:29:44 -0500 Subject: [PATCH] feat: prevent clearing assigned topic partitions --- .../src/main/java/io/confluent/connect/s3/S3SinkTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java index 5794206a1..d1e2cf92b 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java @@ -328,14 +328,14 @@ public Map preCommit( @Override public void close(Collection partitions) { - for (TopicPartition tp : topicPartitionWriters.keySet()) { + for (TopicPartition tp : partitions) { try { topicPartitionWriters.get(tp).close(); } catch (ConnectException e) { log.error("Error closing writer for {}. Error: {}", tp, e.getMessage()); } + topicPartitionWriters.remove(tp); } - topicPartitionWriters.clear(); } @Override