Skip to content

Commit 65b78d8

Browse files
committed
START: Partition state tracking
step: remove complex dirty/clean management step: compiles step: move offset search into PartitionState step: move offset search into PartitionState step: replace search with state lookup
1 parent 3dcb9d7 commit 65b78d8

File tree

9 files changed

+319
-56
lines changed

9 files changed

+319
-56
lines changed

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractOffsetCommitter.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
/*-
44
* Copyright (C) 2020-2022 Confluent, Inc.
55
*/
6+
7+
import io.confluent.parallelconsumer.state.PartitionState;
68
import io.confluent.parallelconsumer.state.WorkManager;
79
import lombok.RequiredArgsConstructor;
810
import lombok.extern.slf4j.Slf4j;
@@ -11,6 +13,7 @@
1113
import org.apache.kafka.common.TopicPartition;
1214

1315
import java.util.Map;
16+
import java.util.stream.Collectors;
1417

1518
@Slf4j
1619
@RequiredArgsConstructor
@@ -28,15 +31,18 @@ public void retrieveOffsetsAndCommit() {
2831
// todo shouldn't be removed until commit succeeds (there's no harm in committing the same offset twice)
2932
preAcquireWork();
3033
try {
31-
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = wm.findCompletedEligibleOffsetsAndRemove();
34+
Map<TopicPartition, PartitionState.OffsetPair> offsetsToCommit = wm.findCompletedEligibleOffsetsAndRemove();
3235
if (offsetsToCommit.isEmpty()) {
3336
log.debug("No offsets ready");
3437
} else {
3538
log.debug("Will commit offsets for {} partition(s): {}", offsetsToCommit.size(), offsetsToCommit);
3639
ConsumerGroupMetadata groupMetadata = consumerMgr.groupMetadata();
3740

3841
log.debug("Begin commit");
39-
commitOffsets(offsetsToCommit, groupMetadata);
42+
Map<TopicPartition, OffsetAndMetadata> offsetsToCommitOFFSEEETS = offsetsToCommit.entrySet().parallelStream()
43+
.collect(Collectors.toMap(Map.Entry::getKey,
44+
o -> o.getValue().getSync()));
45+
commitOffsets(offsetsToCommitOFFSEEETS, groupMetadata);
4046

4147
log.debug("On commit success");
4248
onOffsetCommitSuccess(offsetsToCommit);
@@ -54,7 +60,7 @@ protected void preAcquireWork() {
5460
// default noop
5561
}
5662

57-
private void onOffsetCommitSuccess(final Map<TopicPartition, OffsetAndMetadata> committed) {
63+
private void onOffsetCommitSuccess(final Map<TopicPartition, PartitionState.OffsetPair> committed) {
5864
wm.onOffsetCommitSuccess(committed);
5965
}
6066

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.lang.reflect.Field;
2222
import java.lang.reflect.Method;
2323
import java.time.Duration;
24-
import java.time.temporal.TemporalUnit;
2524
import java.util.ConcurrentModificationException;
2625
import java.util.Map;
2726
import java.util.concurrent.Future;

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionMonitor.java

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
1212
import io.confluent.parallelconsumer.offsets.EncodingNotSupportedException;
1313
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
14+
import io.confluent.parallelconsumer.state.PartitionState.OffsetPair;
1415
import lombok.Getter;
1516
import lombok.RequiredArgsConstructor;
1617
import lombok.Setter;
@@ -49,7 +50,7 @@ public class PartitionMonitor<K, V> implements ConsumerRebalanceListener {
4950
*/
5051
@Getter
5152
@Setter
52-
private double USED_PAYLOAD_THRESHOLD_MULTIPLIER = 0.75;
53+
private static double USED_PAYLOAD_THRESHOLD_MULTIPLIER = 0.75;
5354

5455
private final Consumer<K, V> consumer;
5556

@@ -172,7 +173,7 @@ public void onPartitionsLost(Collection<TopicPartition> partitions) {
172173
* When commits are made to broker, we can throw away all the individually tracked offsets before the committed
173174
* offset.
174175
*/
175-
public void onOffsetCommitSuccess(Map<TopicPartition, OffsetAndMetadata> committed) {
176+
public void onOffsetCommitSuccess(Map<TopicPartition, OffsetPair> committed) {
176177
// partitionOffsetHighWaterMarks this will get overwritten in due course
177178
committed.forEach((tp, meta) -> {
178179
var partition = getPartitionState(tp);
@@ -408,6 +409,11 @@ public void onSuccess(WorkContainer<K, V> wc) {
408409
partitionState.onSuccess(wc);
409410
}
410411

412+
public void onFailure(WorkContainer<K, V> wc) {
413+
PartitionState<K, V> partitionState = getPartitionState(wc.getTopicPartition());
414+
partitionState.onFailure(wc);
415+
}
416+
411417
/**
412418
* Takes a record as work and puts it into internal queues, unless it's been previously recorded as completed as per
413419
* loaded records.
@@ -445,16 +451,47 @@ boolean maybeRegisterNewRecordAsWork(final ConsumerRecord<K, V> rec) {
445451
}
446452
}
447453

448-
public Map<TopicPartition, OffsetAndMetadata> findCompletedEligibleOffsetsAndRemove() {
454+
public Map<TopicPartition, OffsetPair> findCompletedEligibleOffsetsAndRemove() {
449455
return findCompletedEligibleOffsetsAndRemove(true);
450456
}
451457

458+
private Map<TopicPartition, OffsetPair> findCompletedEligibleOffsetsAndRemove(boolean remove) {
459+
460+
var newone = findCompletedEligibleOffsetsAndRemoveNew();
461+
462+
var oldone = findCompletedEligibleOffsetsAndRemoveOld(remove);
463+
464+
return newone;
465+
}
466+
467+
private Map<TopicPartition, OffsetPair> findCompletedEligibleOffsetsAndRemoveNew() {
468+
Map<TopicPartition, OffsetPair> offsetsToSend = new HashMap<>();
469+
470+
for (var e : getAssignedPartitions().entrySet()) {
471+
Optional<OffsetPair> completedEligibleOffsetsAndRemoveNew = e.getValue().getCompletedEligibleOffsetsAndRemoveNew();
472+
completedEligibleOffsetsAndRemoveNew.ifPresent(offsetAndMetadata ->
473+
offsetsToSend.put(e.getKey(), offsetAndMetadata)
474+
);
475+
}
476+
477+
// Map<TopicPartition, OffsetAndMetadata> collect = getAssignedPartitions().entrySet().stream()
478+
// .map(x -> x.getValue().getCompletedEligibleOffsetsAndRemoveNew(remove))
479+
// .filter(Optional::isPresent)
480+
// .map(Optional::get)
481+
// .collect(Collectors.toMap(Map.Entry::getKey,
482+
// o -> o.getValue().getCompletedEligibleOffsetsAndRemoveNew()));
483+
484+
log.debug("Scan finished, {} were in flight, offset(s) ({}) to be committed", offsetsToSend.size(), offsetsToSend);
485+
return offsetsToSend;
486+
}
487+
488+
452489
/**
453490
* Finds eligible offset positions to commit in each assigned partition
454491
*/
455492
// todo remove completely as state of offsets should be tracked live, no need to scan for them - see #201
456493
// https://github.com/confluentinc/parallel-consumer/issues/201 Refactor: Live tracking of offsets as they change, so we don't need to scan for them
457-
Map<TopicPartition, OffsetAndMetadata> findCompletedEligibleOffsetsAndRemove(boolean remove) {
494+
Map<TopicPartition, OffsetAndMetadata> findCompletedEligibleOffsetsAndRemoveOld(boolean remove) {
458495

459496
//
460497
Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
@@ -470,7 +507,7 @@ Map<TopicPartition, OffsetAndMetadata> findCompletedEligibleOffsetsAndRemove(boo
470507
log.trace("Starting scan of partition: {}", topicPartitionKey);
471508

472509
count += partitionQueue.size();
473-
var workToRemove = new LinkedList<WorkContainer<K, V>>();
510+
var workToRemove = new LinkedList<WorkContainer<?, ?>>();
474511
var incompleteOffsets = new LinkedHashSet<Long>();
475512
long lowWaterMark = -1;
476513
var highestSucceeded = partitionState.getOffsetHighestSucceeded();
@@ -539,6 +576,14 @@ private Map<TopicPartition, PartitionState<K, V>> getAssignedPartitions() {
539576
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
540577
}
541578

579+
boolean isDirty() {
580+
return this.partitionStates.values().parallelStream().anyMatch(PartitionState::isDirty);
581+
}
582+
583+
public boolean isClean() {
584+
return !isDirty();
585+
}
586+
542587
public boolean couldBeTakenAsWork(WorkContainer<?, ?> workContainer) {
543588
if (checkIfWorkIsStale(workContainer)) {
544589
log.debug("Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: {}", workContainer);

0 commit comments

Comments
 (0)