Skip to content

Conversation

@SatyaKuppam
Copy link

@SatyaKuppam SatyaKuppam commented May 5, 2023

Problem

To decrease the impact of rebalances during rolling bounces of k8s pods, we changed the partition.assignment.strategy from the default RangeAssignor to CooperativeStickyAssignor. After this change we encountered NPEs and the S3SinkTask goes into an unrecoverable state. We did not find the same issue with StickyAssignor however.

Example of an NPE (this is with v10.0.7):

"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask\
      \ due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)\n\
      \tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)\n\
      \tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\
      \tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\
      \tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\
      \tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\t\
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\
      \tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t\
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\
      \tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\
      \tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.NullPointerException\n\
      \tat io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)\n\
      \t... 10 more\n"

Solution

WorkerSinkTask always sends a list of topicPartitions on close. We currently clear all the assigned topicPartitionWriters on close(). This worked fine with stop-the-world rebalance strategies like RangeAssignor or StickyAssignor, since the current assignment would be fully closed. But with CooperativeStickyAssignor only a few topicPartitions could be reassigned/closed. In such a scenario clearing out all topicPartitionWriters is causing NPEs.

I am not sure if there is some historical context that I might be missing here and the .clear() is deliberate, could not find clues from commit history.

Test Strategy

Testing done:

Did not specifically write any tests for this case, nor am I aware of any existing tests that test assignment strategies. Open to ideas on any necessary tests. We have applied this path for the past few days and dont see the same degradation.

  • Unit tests
  • Integration tests
  • System tests
  • Manual tests

@SatyaKuppam SatyaKuppam marked this pull request as ready for review May 5, 2023 20:00
@SatyaKuppam SatyaKuppam requested a review from a team as a code owner May 5, 2023 20:01
@SatyaKuppam
Copy link
Author

Jenkins job failing ITs because of IAM perm issues.
Screenshot 2023-05-05 at 4 34 07 PM

https://jenkins.public.confluent.io/job/kafka-connect-storage-cloud/job/PR-648/2/console

@subhashiyer9
Copy link
Member

@SatyaKuppam Thanks for changes. The changes looks good to me
Can you help rebase the changes against oldest branch 10.0.x and update the PR?
If the builds are failing even after the same, I will take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants