Skip to content

@RetryableTopic: support for re-processing of records after DLT handling #2172

@jgslima

Description

@jgslima

Expected Behavior
Support, or at least easiness, to allow designs where records may need to be "reprocessed" again after the DLT handling has been done and after the root cause of the processing error has been fixed.

Current Behavior
After retries have been exceeded, the application has to deal with the record in the @DltHandler method. But in some application cases, the record cannot be completely discarded.

Context
As for critical topics records that exceeded retries cannot be discarded, what we do here is to persist the record data somewhere else (tipically in a database).
The application has an Administration Console where the operations team can query and inspect the records that exceeded retries (and also the Exception data that caused the error, because in the @DltHandler we actually submit the record to the actual listener one more time to have the information of the Exception being thrown by the listener).

After the root cause of the error has been fixed (for instance, a database or an external service that was not responding gets back online, or a bug in the code has been fixed), through the console the operator has the means to select some of the records (or all from a specific original topic) and trigger a "reprocessing" of such records.

A good way of doing this reprocessing would be to restart the flow again, that is, to resend the record to topics chain again.

However, we cannot send the message to the original topic, because we would be creating dirt in it. As it is my problem that I was not able to consume the record, I should send the record to the first retry topic in my retriy topics chain.

  • for fixed delay and multiple topics, this would be "originalTopic-retry-0".
  • for fixed delay and single topic, this would be "originalTopic-retry"
  • for non-fixed delay, this would be the first retry topic, like say "originalTopic-retry-1000"

What spring-kafka might provide would be easiness for the application to send a record to the first retry topic in the chain, for a given original topic. And it is fine that when sending a message to the first topic, if the consumer is currently stopped waiting for the delay of the messages that are already there.

This might be done in more than one way:

  • just providing some utility to return to the application the name of the first retry topic in the same way the framework internally mount this name, and the application takes care of everything else.
  • providing some mean/method that receives a ProducerRecord already instantiated by the application, and then populate in this record the needed headers and then send the record to the first retry topic.

In fact, we are already doing this by ourselves here, just sending the record to the first retry topic. But we are doing this in a risky way, because we duplicated the logic to mount the first topic name, and also, counting on luck because it turns out that posting a record without the internal control headers just work. But ideally the library should populate all the control headers properly.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions