Skip to content

Expose the calculated lag to Readers configured with a GroupID #346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

wekb
Copy link

@wekb wekb commented Sep 4, 2019

We have a use case where seeing the lag for a given topic/partition is almost critical. kafka-go already tracks this internally, and it would be good to be able to read this value in the context of a Consumer Group. This change works fine here, but I'd rather not continue using a fork.

Worst-case, if there are fatal flaws with exposing this, or perhaps non-fatal caveats, they could be added to the README to good effect.

Thanks!

@achille-roussel
Copy link
Contributor

Thanks for the pull request @wekb!

The reason why we did not report this value when the reader is configured to join a consumer group is that the value was inaccurate. With a consumer group the reader may be handling multiple partitions and therefore just a single lag value is not representative and hard to make sense of if only 1 out of N partitions owned by a reader is lagging.

Is there something special about your setup that makes it possible to use this value for lag? (for example, do you have as many consumers as partitions?)

@wekb
Copy link
Author

wekb commented Sep 9, 2019

Our use case in this scenario is guaranteed to have one partition—we need strict ordering, and the overall throughput requirements are modest. In a hypothetical multiple-partition case, if the single lag value was always the highest lag value for a given partition, it would still be useful in our case. Otherwise yeah, at best it's misleading.

We also noticed the recent ConsumerGroups work, but I haven't had a chance yet to see if it exposes the lag (or at least the most recent offset from the topic, which I'd be happy to calculate separately).

Thanks for the feedback!

@wekb
Copy link
Author

wekb commented Oct 16, 2019

Does ConsumerGroups provide more granular lag reporting, or would this change be useful with a documented caveat?

wekb added 2 commits October 16, 2019 12:58
Revert "Change package for this branch so it can be imported"

This reverts commit 0e4740b.
@achille-roussel
Copy link
Contributor

The ConsumerGroup API doesn't support this yet, but we are thinking that we could potentially add a helper function to compute the lag of a reader or consumer group, by reading the __consumer_offset topic for example.

Does that sound like it would address your use case?

@wekb
Copy link
Author

wekb commented Feb 14, 2020

Yes. The forked version of kafka-go that we use has this PR applied and it's been working well. Obviously being able to grab the topic lag on a per-partition basis is useful, but for single-partition use cases like ours, lag for the single partition is enough.

@achille-roussel achille-roussel self-assigned this Feb 22, 2020
@achille-roussel achille-roussel added the in-progress Work in progress, see related pull request label Feb 22, 2020
@rjshrjndrn
Copy link

rjshrjndrn commented Feb 28, 2020

@wekb How does it report lag for you? for me it reports 0 always.
Hmm... I think Lag reporting is not accurate, even for a single partitition. For me it's okay, because I can retry multiple times. But for note.

// Http handler
func serve(w http.ResponseWriter, r *http.Request) {
	// Channel to keep track of offset lag
	lagChannel := make(chan int64, 1)
        // Creating context
        ctx := r.Context()
	// Reading topic
	go func(ctx context.Context, r *kafka.Reader) {
		for {
			m, err := r.ReadMessage(ctx)
			if err != nil {
				fmt.Printf("err reading message: %v", err)
				break
			}
			fmt.Printf("topic: %q partition: %v offset: %v lag: %d\n ", m.Topic, m.Partition, m.Offset, r.Lag())
			go metricsCreation(m.Value)
			fmt.Println(r.Lag())
			lagChannel <- r.Lag()
		}
	}(ctx, kafkaReader)
	for {
		select {
		case message := <-promMetricsChannel:
			fmt.Fprintf(w, "%s\n", message)
			// Waiting for 1 ms before quitting
		case <-ctx.Done():
			fmt.Printf("done")
			return
		case lag := <-lagChannel:
			if lag == 0 {
				return
			}
		}
	}
}

rjshrjndrn added a commit to rjshrjndrn/sunbird-devops that referenced this pull request Mar 4, 2020
Kafka-go doesn't provide a native way to check Lag
when using consumerGroup.
we've to remove hardcoded return -1 from segmentio/kafka-go reader.go
This patch will only work for `1 partition`
segmentio/kafka-go#346
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in-progress Work in progress, see related pull request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants