diff --git a/parallel-consumer-core/pom.xml b/parallel-consumer-core/pom.xml
index a0afe5800..ed9edf6e1 100644
--- a/parallel-consumer-core/pom.xml
+++ b/parallel-consumer-core/pom.xml
@@ -20,6 +20,7 @@
+
org.apache.kafka
kafka-clients
@@ -37,6 +38,7 @@
1.1.8.4
compile
+
org.awaitility
@@ -81,6 +83,7 @@
org.testcontainers
postgresql
+ test
org.postgresql
@@ -103,20 +106,18 @@
truth-java8-extension
test
-
- com.google.flogger
- flogger
-
-
- com.google.flogger
- flogger-slf4j-backend
-
org.threeten
threeten-extra
1.7.0
test
+
+ one.util
+ streamex
+ 0.8.1
+ test
+
diff --git a/parallel-consumer-core/src/main/java/io/confluent/csid/utils/JavaUtils.java b/parallel-consumer-core/src/main/java/io/confluent/csid/utils/JavaUtils.java
index 8515733eb..bba786ed7 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/csid/utils/JavaUtils.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/csid/utils/JavaUtils.java
@@ -12,6 +12,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static java.time.Duration.ofMillis;
@@ -39,4 +41,19 @@ public static boolean isGreaterThan(Duration compare, Duration to) {
return compare.compareTo(to) > 0;
}
+ /**
+ * A shortcut for changing only the values of a Map.
+ *
+ * https://stackoverflow.com/a/50740570/105741
+ */
+ public static Map remap(Map map,
+ Function super V1, ? extends V2> function) {
+ return map.entrySet()
+ .stream() // or parallel
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> function.apply(e.getValue())
+ ));
+ }
+
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ErrorInUserFunctionException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ErrorInUserFunctionException.java
index 00c77ef23..bd1b81f36 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ErrorInUserFunctionException.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ErrorInUserFunctionException.java
@@ -1,13 +1,13 @@
package io.confluent.parallelconsumer;
/*-
- * Copyright (C) 2020-2021 Confluent, Inc.
+ * Copyright (C) 2020-2022 Confluent, Inc.
*/
/**
* This exception is only used when there is an exception thrown from code provided by the user.
*/
-public class ErrorInUserFunctionException extends RuntimeException {
+public class ErrorInUserFunctionException extends ParallelConsumerException {
public ErrorInUserFunctionException(final String message, final Throwable cause) {
super(message, cause);
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerException.java
new file mode 100644
index 000000000..70d282e23
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerException.java
@@ -0,0 +1,15 @@
+package io.confluent.parallelconsumer;
+
+/*-
+ * Copyright (C) 2020-2022 Confluent, Inc.
+ */
+
+
+/**
+ * Generic Parallel Consumer {@link RuntimeException} parent.
+ */
+public class ParallelConsumerException extends RuntimeException {
+ public ParallelConsumerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractOffsetCommitter.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractOffsetCommitter.java
index 1b6f9b2ae..762e7ce7e 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractOffsetCommitter.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractOffsetCommitter.java
@@ -3,6 +3,7 @@
/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/
+
import io.confluent.parallelconsumer.state.WorkManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -25,10 +26,9 @@ public abstract class AbstractOffsetCommitter implements OffsetCommitter {
@Override
public void retrieveOffsetsAndCommit() {
log.debug("Commit starting - find completed work to commit offsets");
- // todo shouldn't be removed until commit succeeds (there's no harm in committing the same offset twice)
preAcquireWork();
try {
- Map offsetsToCommit = wm.findCompletedEligibleOffsetsAndRemove();
+ var offsetsToCommit = wm.collectCommitDataForDirtyPartitions();
if (offsetsToCommit.isEmpty()) {
log.debug("No offsets ready");
} else {
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ExternalEngine.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ExternalEngine.java
index 543899e31..1c863de5f 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ExternalEngine.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ExternalEngine.java
@@ -4,7 +4,6 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/
-import com.google.common.flogger.FluentLogger;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.state.WorkContainer;
import lombok.extern.slf4j.Slf4j;
@@ -18,8 +17,6 @@
@Slf4j
public abstract class ExternalEngine extends AbstractParallelEoSStreamProcessor {
- private static final FluentLogger flog = FluentLogger.forEnclosingClass();
-
protected ExternalEngine(final ParallelConsumerOptions newOptions) {
super(newOptions);
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ParallelConsumerInternalException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ParallelConsumerInternalException.java
new file mode 100644
index 000000000..96dcf471d
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ParallelConsumerInternalException.java
@@ -0,0 +1,14 @@
+package io.confluent.parallelconsumer.internal;
+
+/*-
+ * Copyright (C) 2020-2022 Confluent, Inc.
+ */
+
+/**
+ * Generic Parallel Consumer parent exception.
+ */
+public class ParallelConsumerInternalException extends Exception {
+ public ParallelConsumerInternalException(String msg) {
+ super(msg);
+ }
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java
index 1f19ced26..e6db737f7 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.internal;
/*-
- * Copyright (C) 2020-2021 Confluent, Inc.
+ * Copyright (C) 2020-2022 Confluent, Inc.
*/
import io.confluent.csid.utils.TimeUtils;
@@ -21,7 +21,6 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
-import java.time.temporal.TemporalUnit;
import java.util.ConcurrentModificationException;
import java.util.Map;
import java.util.concurrent.Future;
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/RateLimiter.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/RateLimiter.java
index d31120dd1..cafbb68e8 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/RateLimiter.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/RateLimiter.java
@@ -1,11 +1,10 @@
package io.confluent.parallelconsumer.internal;
/*-
- * Copyright (C) 2020-2021 Confluent, Inc.
+ * Copyright (C) 2020-2022 Confluent, Inc.
*/
import lombok.Getter;
-import lombok.SneakyThrows;
import java.time.Duration;
@@ -22,7 +21,6 @@ public RateLimiter(int seconds) {
this.rate = Duration.ofSeconds(seconds);
}
- @SneakyThrows
public void performIfNotLimited(final Runnable action) {
if (isOkToCallAction()) {
lastFireMs = System.currentTimeMillis();
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/EncodingNotSupportedException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/EncodingNotSupportedException.java
index c17edd3d0..334ae159e 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/EncodingNotSupportedException.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/EncodingNotSupportedException.java
@@ -1,9 +1,15 @@
package io.confluent.parallelconsumer.offsets;
+/*-
+ * Copyright (C) 2020-2022 Confluent, Inc.
+ */
+
+import io.confluent.parallelconsumer.internal.ParallelConsumerInternalException;
+
/*-
* Copyright (C) 2020-2021 Confluent, Inc.
*/
-public class EncodingNotSupportedException extends Exception {
+public class EncodingNotSupportedException extends ParallelConsumerInternalException {
public EncodingNotSupportedException(final String message) {
super(message);
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/NoEncodingPossibleException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/NoEncodingPossibleException.java
new file mode 100644
index 000000000..edea86d10
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/NoEncodingPossibleException.java
@@ -0,0 +1,14 @@
+package io.confluent.parallelconsumer.offsets;
+
+/*-
+ * Copyright (C) 2020-2022 Confluent, Inc.
+ */
+
+import io.confluent.parallelconsumer.internal.ParallelConsumerInternalException;
+
+public class NoEncodingPossibleException extends ParallelConsumerInternalException {
+
+ public NoEncodingPossibleException(String msg) {
+ super(msg);
+ }
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetEncoder.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetEncoder.java
index de7f191d4..86542e0f6 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetEncoder.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetEncoder.java
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.offsets;
/*-
- * Copyright (C) 2020-2021 Confluent, Inc.
+ * Copyright (C) 2020-2022 Confluent, Inc.
*/
import lombok.SneakyThrows;
@@ -18,7 +18,7 @@ abstract class OffsetEncoder {
private final OffsetSimultaneousEncoder offsetSimultaneousEncoder;
- public OffsetEncoder(OffsetSimultaneousEncoder offsetSimultaneousEncoder) {
+ protected OffsetEncoder(OffsetSimultaneousEncoder offsetSimultaneousEncoder) {
this.offsetSimultaneousEncoder = offsetSimultaneousEncoder;
}
@@ -50,7 +50,8 @@ void register() throws EncodingNotSupportedException {
private void register(final OffsetEncoding type, final byte[] bytes) {
log.debug("Registering {}, with site {}", type, bytes.length);
- offsetSimultaneousEncoder.sortedEncodings.add(new EncodedOffsetPair(type, ByteBuffer.wrap(bytes)));
+ EncodedOffsetPair encodedPair = new EncodedOffsetPair(type, ByteBuffer.wrap(bytes));
+ offsetSimultaneousEncoder.sortedEncodings.add(encodedPair);
offsetSimultaneousEncoder.encodingMap.put(type, bytes);
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetMapCodecManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetMapCodecManager.java
index b620fff67..9054b411b 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetMapCodecManager.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetMapCodecManager.java
@@ -3,6 +3,7 @@
/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/
+
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import io.confluent.parallelconsumer.state.PartitionState;
import lombok.Value;
@@ -15,7 +16,6 @@
import java.nio.charset.Charset;
import java.util.*;
-import static io.confluent.csid.utils.Range.range;
import static io.confluent.csid.utils.StringUtils.msg;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -55,7 +55,7 @@ public class OffsetMapCodecManager {
public static final Charset CHARSET_TO_USE = UTF_8;
- // todo OffsetMapCodecManager needs refactoring - consumer presence here smells bad
+ // todo OffsetMapCodecManager needs refactoring - consumer presence here smells bad #233
org.apache.kafka.clients.consumer.Consumer consumer;
/**
@@ -67,23 +67,23 @@ public static class HighestOffsetAndIncompletes {
/**
* The highest represented offset in this result.
*/
- Long highestSeenOffset;
+ Optional highestSeenOffset;
/**
* Of the offsets encoded, the incomplete ones.
*/
Set incompleteOffsets;
- public static HighestOffsetAndIncompletes of(Long highestSeenOffset) {
- return new HighestOffsetAndIncompletes(highestSeenOffset, new HashSet<>());
+ public static HighestOffsetAndIncompletes of(long highestSeenOffset) {
+ return new HighestOffsetAndIncompletes(Optional.of(highestSeenOffset), new HashSet<>());
}
public static HighestOffsetAndIncompletes of(long highestSeenOffset, Set incompleteOffsets) {
- return new HighestOffsetAndIncompletes(highestSeenOffset, incompleteOffsets);
+ return new HighestOffsetAndIncompletes(Optional.of(highestSeenOffset), incompleteOffsets);
}
public static HighestOffsetAndIncompletes of() {
- return HighestOffsetAndIncompletes.of(null);
+ return new HighestOffsetAndIncompletes(Optional.empty(), new HashSet<>());
}
}
@@ -92,26 +92,26 @@ public static HighestOffsetAndIncompletes of() {
*/
public static Optional forcedCodec = Optional.empty();
+ // todo remove consumer #233
public OffsetMapCodecManager(final org.apache.kafka.clients.consumer.Consumer consumer) {
this.consumer = consumer;
}
/**
* Load all the previously completed offsets that were not committed
- *
- * @return
*/
+ // todo this is the only method that needs the consumer - offset encoding is being conflated with decoding upon assignment #233
// todo make package private?
// todo rename
- public Map> loadOffsetMapForPartition(final Set assignment) {
+ public Map> loadPartitionStateForAssignment(final Collection assignment) {
// load last committed state / metadata from consumer
// todo this should be controlled for - improve consumer management so that this can't happen
- Map lastCommittedOffsets = null;
+ Map partitionLastCommittedOffsets = null;
int attempts = 0;
- while (lastCommittedOffsets == null) {
+ while (partitionLastCommittedOffsets == null) {
WakeupException lastWakeupException = null;
try {
- lastCommittedOffsets = consumer.committed(assignment);
+ partitionLastCommittedOffsets = consumer.committed(new HashSet<>(assignment));
} catch (WakeupException exception) {
log.debug("Woken up trying to get assignment", exception);
lastWakeupException = exception;
@@ -121,15 +121,12 @@ public Map> loadOffsetMapForPartition(final
throw new InternalRuntimeError("Failed to get partition assignment - continuously woken up.", lastWakeupException);
}
- var states = new HashMap>();
- lastCommittedOffsets.forEach((tp, offsetAndMeta) -> {
+ var partitionStates = new HashMap>();
+ partitionLastCommittedOffsets.forEach((tp, offsetAndMeta) -> {
if (offsetAndMeta != null) {
- long nextExpectedOffset = offsetAndMeta.offset();
- String metadata = offsetAndMeta.metadata();
try {
- // todo rename
- PartitionState incompletes = decodeIncompletes(nextExpectedOffset, tp, metadata);
- states.put(tp, incompletes);
+ PartitionState state = decodePartitionState(tp, offsetAndMeta);
+ partitionStates.put(tp, state);
} catch (OffsetDecodingError offsetDecodingError) {
log.error("Error decoding offsets from assigned partition, dropping offset map (will replay previously completed messages - partition: {}, data: {})",
tp, offsetAndMeta, offsetDecodingError);
@@ -138,13 +135,20 @@ public Map> loadOffsetMapForPartition(final
});
- // for each assignment which isn't now added in the states to return, enter a default entry. Catches multiple other cases.
+ // assigned partitions for which there has never been a commit
+ // for each assignment with no commit history, enter a default entry. Catches multiple other cases.
assignment.stream()
- .filter(x -> !states.containsKey(x))
- .forEach(x ->
- states.put(x, new PartitionState<>(x, HighestOffsetAndIncompletes.of())));
+ .filter(topicPartition -> !partitionStates.containsKey(topicPartition))
+ .forEach(topicPartition -> {
+ PartitionState defaultEntry = new PartitionState<>(topicPartition, HighestOffsetAndIncompletes.of());
+ partitionStates.put(topicPartition, defaultEntry);
+ });
- return states;
+ return partitionStates;
+ }
+
+ private HighestOffsetAndIncompletes deserialiseIncompleteOffsetMapFromBase64(OffsetAndMetadata offsetData) throws OffsetDecodingError {
+ return deserialiseIncompleteOffsetMapFromBase64(offsetData.offset(), offsetData.metadata());
}
public static HighestOffsetAndIncompletes deserialiseIncompleteOffsetMapFromBase64(long committedOffsetForPartition, String base64EncodedOffsetPayload) throws OffsetDecodingError {
@@ -157,19 +161,18 @@ public static HighestOffsetAndIncompletes deserialiseIncompleteOffsetMapFromBase
return decodeCompressedOffsets(committedOffsetForPartition, decodedBytes);
}
- // todo rename
- PartitionState decodeIncompletes(long nextExpectedOffset, TopicPartition tp, String offsetMetadataPayload) throws OffsetDecodingError {
- HighestOffsetAndIncompletes incompletes = deserialiseIncompleteOffsetMapFromBase64(nextExpectedOffset, offsetMetadataPayload);
+ PartitionState decodePartitionState(TopicPartition tp, OffsetAndMetadata offsetData) throws OffsetDecodingError {
+ HighestOffsetAndIncompletes incompletes = deserialiseIncompleteOffsetMapFromBase64(offsetData);
log.debug("Loaded incomplete offsets from offset payload {}", incompletes);
return new PartitionState(tp, incompletes);
}
- public String makeOffsetMetadataPayload(long finalOffsetForPartition, PartitionState state) throws EncodingNotSupportedException {
+ public String makeOffsetMetadataPayload(long finalOffsetForPartition, PartitionState state) throws NoEncodingPossibleException {
String offsetMap = serialiseIncompleteOffsetMapToBase64(finalOffsetForPartition, state);
return offsetMap;
}
- String serialiseIncompleteOffsetMapToBase64(long finalOffsetForPartition, PartitionState state) throws EncodingNotSupportedException {
+ String serialiseIncompleteOffsetMapToBase64(long finalOffsetForPartition, PartitionState state) throws NoEncodingPossibleException {
byte[] compressedEncoding = encodeOffsetsCompressed(finalOffsetForPartition, state);
String b64 = OffsetSimpleSerialisation.base64(compressedEncoding);
return b64;
@@ -183,11 +186,11 @@ String serialiseIncompleteOffsetMapToBase64(long finalOffsetForPartition, Partit
*
* Can remove string encoding in favour of the boolean array for the `BitSet` if that's how things settle.
*/
- byte[] encodeOffsetsCompressed(long finalOffsetForPartition, PartitionState partition) throws EncodingNotSupportedException {
- Set incompleteOffsets = partition.getIncompleteOffsets();
- log.debug("Encoding partition {} incomplete offsets {}", partition.getTp(), incompleteOffsets);
- long offsetHighestSucceeded = partition.getOffsetHighestSucceeded();
- OffsetSimultaneousEncoder simultaneousEncoder = new OffsetSimultaneousEncoder(finalOffsetForPartition, offsetHighestSucceeded, incompleteOffsets).invoke();
+ byte[] encodeOffsetsCompressed(long finalOffsetForPartition, PartitionState partitionState) throws NoEncodingPossibleException {
+ var incompleteOffsets = partitionState.getIncompleteOffsetsBelowHighestSucceeded();
+ long highestSucceeded = partitionState.getOffsetHighestSucceeded();
+ log.debug("Encoding partition {}: highest suceeded {}, incomplete offsets {}, ", partitionState.getTp(), highestSucceeded, incompleteOffsets);
+ OffsetSimultaneousEncoder simultaneousEncoder = new OffsetSimultaneousEncoder(finalOffsetForPartition, highestSucceeded, incompleteOffsets).invoke();
//
if (forcedCodec.isPresent()) {
@@ -196,7 +199,7 @@ byte[] encodeOffsetsCompressed(long finalOffsetForPartition, PartitionState encodingMap = simultaneousEncoder.getEncodingMap();
byte[] bytes = encodingMap.get(forcedOffsetEncoding);
if (bytes == null)
- throw new EncodingNotSupportedException(msg("Can't force an encoding that hasn't been run: {}", forcedOffsetEncoding));
+ throw new NoEncodingPossibleException(msg("Can't force an encoding that hasn't been run: {}", forcedOffsetEncoding));
return simultaneousEncoder.packEncoding(new EncodedOffsetPair(forcedOffsetEncoding, ByteBuffer.wrap(bytes)));
} else {
return simultaneousEncoder.packSmallest();
@@ -219,48 +222,8 @@ static HighestOffsetAndIncompletes decodeCompressedOffsets(long nextExpectedOffs
return HighestOffsetAndIncompletes.of(highestSeenOffsetIsThen);
} else {
var result = EncodedOffsetPair.unwrap(decodedBytes);
-
- HighestOffsetAndIncompletes incompletesTuple = result.getDecodedIncompletes(nextExpectedOffset);
-
- Set incompletes = incompletesTuple.getIncompleteOffsets();
- long highWater = incompletesTuple.getHighestSeenOffset();
-
- return HighestOffsetAndIncompletes.of(highWater, incompletes);
+ return result.getDecodedIncompletes(nextExpectedOffset);
}
}
- String incompletesToBitmapString(long finalOffsetForPartition, PartitionState state) {
- var runLengthString = new StringBuilder();
- Long lowWaterMark = finalOffsetForPartition;
- Long highWaterMark = state.getOffsetHighestSeen();
- long end = highWaterMark - lowWaterMark;
- for (final var relativeOffset : range(end)) {
- long offset = lowWaterMark + relativeOffset;
- if (state.getIncompleteOffsets().contains(offset)) {
- runLengthString.append("o");
- } else {
- runLengthString.append("x");
- }
- }
- return runLengthString.toString();
- }
-
- static Set bitmapStringToIncomplete(final long baseOffset, final String inputBitmapString) {
- final Set incompleteOffsets = new HashSet<>();
-
- final long longLength = inputBitmapString.length();
- range(longLength).forEach(i -> {
- var bit = inputBitmapString.charAt(i);
- if (bit == 'o') {
- incompleteOffsets.add(baseOffset + i);
- } else if (bit == 'x') {
- log.trace("Dropping completed offset");
- } else {
- throw new IllegalArgumentException("Invalid encoding - unexpected char: " + bit);
- }
- });
-
- return incompleteOffsets;
- }
-
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetSimultaneousEncoder.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetSimultaneousEncoder.java
index e9479a867..237bccdad 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetSimultaneousEncoder.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetSimultaneousEncoder.java
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.offsets;
/*-
- * Copyright (C) 2020-2021 Confluent, Inc.
+ * Copyright (C) 2020-2022 Confluent, Inc.
*/
import io.confluent.parallelconsumer.state.WorkManager;
@@ -56,12 +56,12 @@ public class OffsetSimultaneousEncoder {
Map encodingMap = new EnumMap<>(OffsetEncoding.class);
/**
- * Ordered set of the the different encodings, used to quickly retrieve the most compressed encoding
+ * Ordered set of the different encodings, used to quickly retrieve the most compressed encoding
*
* @see #packSmallest()
*/
@Getter
- PriorityQueue sortedEncodings = new PriorityQueue();
+ PriorityQueue sortedEncodings = new PriorityQueue<>();
/**
@@ -80,7 +80,7 @@ public class OffsetSimultaneousEncoder {
/**
* The encoders to run
*/
- private final Set encoders = new HashSet<>();
+ private final Set encoders;
public OffsetSimultaneousEncoder(long lowWaterMark, long highestSucceededOffset, Set incompleteOffsets) {
this.lowWaterMark = lowWaterMark;
@@ -101,28 +101,31 @@ public OffsetSimultaneousEncoder(long lowWaterMark, long highestSucceededOffset,
// sanity
if (bitsetLengthL != length) throw new IllegalArgumentException("Integer overflow");
- initEncoders();
+ this.encoders = initEncoders();
}
- private void initEncoders() {
+ private Set initEncoders() {
+ var newEncoders = new HashSet();
if (length > LARGE_INPUT_MAP_SIZE_THRESHOLD) {
log.debug("~Large input map size: {} (start: {} end: {})", length, lowWaterMark, lowWaterMark + length);
}
try {
- encoders.add(new BitSetEncoder(length, this, v1));
+ newEncoders.add(new BitSetEncoder(length, this, v1));
} catch (BitSetEncodingNotSupportedException a) {
log.debug("Cannot use {} encoder ({})", BitSetEncoder.class.getSimpleName(), a.getMessage());
}
try {
- encoders.add(new BitSetEncoder(length, this, v2));
+ newEncoders.add(new BitSetEncoder(length, this, v2));
} catch (BitSetEncodingNotSupportedException a) {
log.warn("Cannot use {} encoder ({})", BitSetEncoder.class.getSimpleName(), a.getMessage());
}
- encoders.add(new RunLengthEncoder(this, v1));
- encoders.add(new RunLengthEncoder(this, v2));
+ newEncoders.add(new RunLengthEncoder(this, v1));
+ newEncoders.add(new RunLengthEncoder(this, v2));
+
+ return newEncoders;
}
/**
@@ -204,7 +207,7 @@ private void registerEncodings(final Set extends OffsetEncoder> encoders) {
toRemove.add(encoder);
}
}
- encoders.removeAll(toRemove);
+ toRemove.forEach(encoders::remove);
// compressed versions
// sizes over LARGE_INPUT_MAP_SIZE_THRESHOLD bytes seem to benefit from compression
@@ -219,9 +222,9 @@ private void registerEncodings(final Set extends OffsetEncoder> encoders) {
*
* @see #packEncoding(EncodedOffsetPair)
*/
- public byte[] packSmallest() throws EncodingNotSupportedException {
+ public byte[] packSmallest() throws NoEncodingPossibleException {
if (sortedEncodings.isEmpty()) {
- throw new EncodingNotSupportedException("No encodings could be used");
+ throw new NoEncodingPossibleException("No encodings could be used");
}
final EncodedOffsetPair best = this.sortedEncodings.poll();
log.debug("Compression chosen is: {}", best.encoding.name());
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionMonitor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionMonitor.java
index 4e7a08d87..65c19b2dd 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionMonitor.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionMonitor.java
@@ -9,7 +9,6 @@
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.BrokerPollSystem;
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
-import io.confluent.parallelconsumer.offsets.EncodingNotSupportedException;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -20,17 +19,17 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
-import pl.tlinkowski.unij.api.UniSets;
import java.time.Clock;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static io.confluent.csid.utils.KafkaUtils.toTopicPartition;
import static io.confluent.csid.utils.StringUtils.msg;
-import static io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.DefaultMaxMetadataSize;
/**
* In charge of managing {@link PartitionState}s.
@@ -49,13 +48,13 @@ public class PartitionMonitor implements ConsumerRebalanceListener {
*/
@Getter
@Setter
- private double USED_PAYLOAD_THRESHOLD_MULTIPLIER = 0.75;
+ private static double USED_PAYLOAD_THRESHOLD_MULTIPLIER = 0.75;
private final Consumer consumer;
private final ShardManager sm;
- private final ParallelConsumerOptions options;
+ private final ParallelConsumerOptions options;
/**
* Hold the tracking state for each of our managed partitions.
@@ -71,15 +70,7 @@ public class PartitionMonitor implements ConsumerRebalanceListener {
*/
private final Map partitionsAssignmentEpochs = new ConcurrentHashMap<>();
- /**
- * Gets set to true whenever work is returned completed, so that we know when a commit needs to be made.
- *
- * In normal operation, this probably makes very little difference, as typical commit frequency is 1 second, so low
- * chances no work has completed in the last second.
- */
- private final AtomicBoolean workStateIsDirtyNeedsCommitting = new AtomicBoolean(false);
-
- final private Clock clock;
+ private final Clock clock;
public PartitionState getPartitionState(TopicPartition tp) {
// may cause the system to wait for a rebalance to finish
@@ -90,32 +81,32 @@ public PartitionState getPartitionState(TopicPartition tp) {
}
/**
- * Load offset map for assigned partitions
+ * Load offset map for assigned assignedPartitions
*/
@Override
- public void onPartitionsAssigned(Collection partitions) {
- log.debug("Partitions assigned: {}", partitions);
+ public void onPartitionsAssigned(Collection assignedPartitions) {
+ log.debug("Partitions assigned: {}", assignedPartitions);
synchronized (this.partitionStates) {
- for (final TopicPartition partition : partitions) {
- if (this.partitionStates.containsKey(partition)) {
- PartitionState state = partitionStates.get(partition);
- if (state.isRemoved()) {
- log.trace("Reassignment of previously revoked partition {} - state: {}", partition, state);
+ for (final TopicPartition partitionAssignment : assignedPartitions) {
+ boolean isAlreadyAssigned = this.partitionStates.containsKey(partitionAssignment);
+ if (isAlreadyAssigned) {
+ PartitionState previouslyAssignedState = partitionStates.get(partitionAssignment);
+ if (previouslyAssignedState.isRemoved()) {
+ log.trace("Reassignment of previously revoked partition {} - state: {}", partitionAssignment, previouslyAssignedState);
} else {
log.warn("New assignment of partition which already exists and isn't recorded as removed in " +
"partition state. Could be a state bug - was the partition revocation somehow missed, " +
- "or is this a race? Please file a GH issue. Partition: {}, state: {}", partition, state);
+ "or is this a race? Please file a GH issue. Partition: {}, state: {}", partitionAssignment, previouslyAssignedState);
}
}
}
- incrementPartitionAssignmentEpoch(partitions);
+ incrementPartitionAssignmentEpoch(assignedPartitions);
try {
- Set partitionsSet = UniSets.copyOf(partitions);
- OffsetMapCodecManager om = new OffsetMapCodecManager<>(this.consumer); // todo remove throw away instance creation
- var partitionStates = om.loadOffsetMapForPartition(partitionsSet);
+ OffsetMapCodecManager om = new OffsetMapCodecManager<>(this.consumer); // todo remove throw away instance creation - #233
+ var partitionStates = om.loadPartitionStateForAssignment(assignedPartitions);
this.partitionStates.putAll(partitionStates);
} catch (Exception e) {
log.error("Error in onPartitionsAssigned", e);
@@ -203,8 +194,7 @@ private void resetOffsetMapAndRemoveWork(Collection allRemovedPa
partitionStates.put(removedPartition, RemovedPartitionState.getSingleton());
//
- NavigableMap> workFromRemovedPartition = partition.getCommitQueue();
- sm.removeAnyShardsReferencedBy(workFromRemovedPartition);
+ partition.onPartitionsRemoved(sm);
}
}
@@ -252,12 +242,7 @@ boolean checkIfWorkIsStale(final WorkContainer, ?> workContainer) {
return false;
}
- public void maybeRaiseHighestSeenOffset(TopicPartition tp, long seenOffset) {
- PartitionState partitionState = getPartitionState(tp);
- partitionState.maybeRaiseHighestSeenOffset(seenOffset);
- }
-
- boolean isRecordPreviouslyCompleted(ConsumerRecord rec) {
+ public boolean isRecordPreviouslyCompleted(ConsumerRecord rec) {
var tp = toTopicPartition(rec);
var partitionState = getPartitionState(tp);
boolean previouslyCompleted = partitionState.isRecordPreviouslyCompleted(rec);
@@ -296,12 +281,7 @@ public long getNumberOfEntriesInPartitionQueues() {
.orElse(0);
}
- private void setPartitionMoreRecordsAllowedToProcess(TopicPartition topicPartitionKey, boolean moreMessagesAllowed) {
- var state = getPartitionState(topicPartitionKey);
- state.setAllowedMoreRecords(moreMessagesAllowed);
- }
-
- public Long getHighestSeenOffset(final TopicPartition tp) {
+ public long getHighestSeenOffset(final TopicPartition tp) {
return getPartitionState(tp).getOffsetHighestSeen();
}
@@ -327,75 +307,6 @@ public boolean isBlocked(final TopicPartition topicPartition) {
return !isAllowedMoreRecords(topicPartition);
}
- /**
- * Get final offset data, build the offset map, and replace it in our map of offset data to send
- *
- * @param offsetsToSend
- * @param topicPartitionKey
- * @param incompleteOffsets
- */
- //todo refactor
- void addEncodedOffsets(Map offsetsToSend,
- TopicPartition topicPartitionKey,
- LinkedHashSet incompleteOffsets) {
- // TODO potential optimisation: store/compare the current incomplete offsets to the last committed ones, to know if this step is needed or not (new progress has been made) - isdirty?
- boolean incompleteOffsetsNeedingEncoding = !incompleteOffsets.isEmpty();
- if (incompleteOffsetsNeedingEncoding) {
- // todo offsetOfNextExpectedMessage should be an attribute of State - consider deriving it from the state class
- long offsetOfNextExpectedMessage;
- OffsetAndMetadata finalOffsetOnly = offsetsToSend.get(topicPartitionKey);
- if (finalOffsetOnly == null) {
- // no new low watermark to commit, so use the last one again
- offsetOfNextExpectedMessage = incompleteOffsets.iterator().next(); // first element
- } else {
- offsetOfNextExpectedMessage = finalOffsetOnly.offset();
- }
-
- OffsetMapCodecManager om = new OffsetMapCodecManager<>(this.consumer);
- try {
- PartitionState state = getPartitionState(topicPartitionKey);
- // todo smelly - update the partition state with the new found incomplete offsets. This field is used by nested classes accessing the state
- state.setIncompleteOffsets(incompleteOffsets);
- String offsetMapPayload = om.makeOffsetMetadataPayload(offsetOfNextExpectedMessage, state);
- int metaPayloadLength = offsetMapPayload.length();
- boolean moreMessagesAllowed;
- OffsetAndMetadata offsetWithExtraMap;
- // todo move
- double pressureThresholdValue = DefaultMaxMetadataSize * USED_PAYLOAD_THRESHOLD_MULTIPLIER;
-
- if (metaPayloadLength > DefaultMaxMetadataSize) {
- // exceeded maximum API allowed, strip the payload
- moreMessagesAllowed = false;
- offsetWithExtraMap = new OffsetAndMetadata(offsetOfNextExpectedMessage); // strip payload
- log.warn("Offset map data too large (size: {}) to fit in metadata payload hard limit of {} - cannot include in commit. " +
- "Warning: messages might be replayed on rebalance. " +
- "See kafka.coordinator.group.OffsetConfig#DefaultMaxMetadataSize = {} and issue #47.",
- metaPayloadLength, DefaultMaxMetadataSize, DefaultMaxMetadataSize);
- } else if (metaPayloadLength > pressureThresholdValue) { // and thus metaPayloadLength <= DefaultMaxMetadataSize
- // try to turn on back pressure before max size is reached
- moreMessagesAllowed = false;
- offsetWithExtraMap = new OffsetAndMetadata(offsetOfNextExpectedMessage, offsetMapPayload);
- log.warn("Payload size {} higher than threshold {}, but still lower than max {}. Will write payload, but will " +
- "not allow further messages, in order to allow the offset data to shrink (via succeeding messages).",
- metaPayloadLength, pressureThresholdValue, DefaultMaxMetadataSize);
- } else { // and thus (metaPayloadLength <= pressureThresholdValue)
- moreMessagesAllowed = true;
- offsetWithExtraMap = new OffsetAndMetadata(offsetOfNextExpectedMessage, offsetMapPayload);
- log.debug("Payload size {} within threshold {}", metaPayloadLength, pressureThresholdValue);
- }
-
- setPartitionMoreRecordsAllowedToProcess(topicPartitionKey, moreMessagesAllowed);
- offsetsToSend.put(topicPartitionKey, offsetWithExtraMap);
- } catch (EncodingNotSupportedException e) {
- setPartitionMoreRecordsAllowedToProcess(topicPartitionKey, false);
- log.warn("No encodings could be used to encode the offset map, skipping. Warning: messages might be replayed on rebalance.", e);
- }
- } else {
- setPartitionMoreRecordsAllowedToProcess(topicPartitionKey, true);
- }
- }
-
-
public boolean isPartitionRemovedOrNeverAssigned(ConsumerRecord, ?> rec) {
TopicPartition topicPartition = toTopicPartition(rec);
var partitionState = getPartitionState(topicPartition);
@@ -408,6 +319,11 @@ public void onSuccess(WorkContainer wc) {
partitionState.onSuccess(wc);
}
+ public void onFailure(WorkContainer wc) {
+ PartitionState partitionState = getPartitionState(wc.getTopicPartition());
+ partitionState.onFailure(wc);
+ }
+
/**
* Takes a record as work and puts it into internal queues, unless it's been previously recorded as completed as per
* loaded records.
@@ -433,8 +349,7 @@ boolean maybeRegisterNewRecordAsWork(final ConsumerRecord rec) {
return false;
} else {
int currentPartitionEpoch = getEpoch(rec);
- //noinspection unchecked - Lombok builder getter erases generics
- var wc = new WorkContainer(currentPartitionEpoch, rec, options.getRetryDelayProvider(), clock);
+ var wc = new WorkContainer<>(currentPartitionEpoch, rec, options.getRetryDelayProvider(), clock);
sm.addWorkContainer(wc);
@@ -445,92 +360,13 @@ boolean maybeRegisterNewRecordAsWork(final ConsumerRecord rec) {
}
}
- public Map findCompletedEligibleOffsetsAndRemove() {
- return findCompletedEligibleOffsetsAndRemove(true);
- }
-
- /**
- * Finds eligible offset positions to commit in each assigned partition
- */
- // todo remove completely as state of offsets should be tracked live, no need to scan for them - see #201
- // https://github.com/confluentinc/parallel-consumer/issues/201 Refactor: Live tracking of offsets as they change, so we don't need to scan for them
- Map findCompletedEligibleOffsetsAndRemove(boolean remove) {
-
- //
- Map offsetsToSend = new HashMap<>();
- int count = 0;
- int removed = 0;
- log.trace("Scanning for in order in-flight work that has completed...");
-
- //
- for (var partitionStateEntry : getAssignedPartitions().entrySet()) {
- var partitionState = partitionStateEntry.getValue();
- Map> partitionQueue = partitionState.getCommitQueue();
- TopicPartition topicPartitionKey = partitionStateEntry.getKey();
- log.trace("Starting scan of partition: {}", topicPartitionKey);
-
- count += partitionQueue.size();
- var workToRemove = new LinkedList>();
- var incompleteOffsets = new LinkedHashSet();
- long lowWaterMark = -1;
- var highestSucceeded = partitionState.getOffsetHighestSucceeded();
- // can't commit this offset or beyond, as this is the latest offset that is incomplete
- // i.e. only commit offsets that come before the current one, and stop looking for more
- boolean beyondSuccessiveSucceededOffsets = false;
- for (final var offsetAndItsWorkContainer : partitionQueue.entrySet()) {
- // ordered iteration via offset keys thanks to the tree-map
- WorkContainer container = offsetAndItsWorkContainer.getValue();
-
- //
- long offset = container.getCr().offset();
- if (offset > highestSucceeded) {
- break; // no more to encode
- }
-
- //
- boolean complete = container.isUserFunctionComplete();
- if (complete) {
- if (container.getUserFunctionSucceeded().get() && !beyondSuccessiveSucceededOffsets) {
- log.trace("Found offset candidate ({}) to add to offset commit map", container);
- workToRemove.add(container);
- // as in flights are processed in order, this will keep getting overwritten with the highest offset available
- // current offset is the highest successful offset, so commit +1 - offset to be committed is defined as the offset of the next expected message to be read
- long offsetOfNextExpectedMessageToBeCommitted = offset + 1;
- OffsetAndMetadata offsetData = new OffsetAndMetadata(offsetOfNextExpectedMessageToBeCommitted);
- offsetsToSend.put(topicPartitionKey, offsetData);
- } else if (container.getUserFunctionSucceeded().get() && beyondSuccessiveSucceededOffsets) {
- // todo lookup the low water mark and include here
- log.trace("Offset {} is complete and succeeded, but we've iterated past the lowest committable offset ({}). Will mark as complete in the offset map.",
- container.getCr().offset(), lowWaterMark);
- // no-op - offset map is only for not succeeded or completed offsets
- } else {
- log.trace("Offset {} is complete, but failed processing. Will track in offset map as failed. Can't do normal offset commit past this point.", container.getCr().offset());
- beyondSuccessiveSucceededOffsets = true;
- incompleteOffsets.add(offset);
- }
- } else {
- lowWaterMark = container.offset();
- beyondSuccessiveSucceededOffsets = true;
- log.trace("Offset (:{}) is incomplete, holding up the queue ({}) of size {}.",
- container.getCr().offset(),
- topicPartitionKey,
- partitionQueue.size());
- incompleteOffsets.add(offset);
- }
- }
-
- addEncodedOffsets(offsetsToSend, topicPartitionKey, incompleteOffsets);
-
- if (remove) {
- removed += workToRemove.size();
- partitionState.remove(workToRemove);
- }
+ public Map collectDirtyCommitData() {
+ var dirties = new HashMap();
+ for (var state : getAssignedPartitions().values()) {
+ var offsetAndMetadata = state.getCommitDataIfDirty();
+ offsetAndMetadata.ifPresent(andMetadata -> dirties.put(state.getTp(), andMetadata));
}
-
-
- log.debug("Scan finished, {} were in flight, {} completed offsets removed, coalesced to {} offset(s) ({}) to be committed",
- count, removed, offsetsToSend.size(), offsetsToSend);
- return offsetsToSend;
+ return dirties;
}
private Map> getAssignedPartitions() {
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java
index ffed07e9a..e2ccf4094 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java
@@ -4,9 +4,10 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/
+import io.confluent.parallelconsumer.internal.BrokerPollSystem;
+import io.confluent.parallelconsumer.offsets.NoEncodingPossibleException;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import lombok.Getter;
-import lombok.NonNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -14,11 +15,16 @@
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.NavigableMap;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.stream.Collectors;
+import static io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.DefaultMaxMetadataSize;
+import static java.util.Optional.empty;
+import static java.util.Optional.of;
import static lombok.AccessLevel.*;
@Slf4j
@@ -28,46 +34,43 @@ public class PartitionState {
private final TopicPartition tp;
/**
- * A subset of Offsets, beyond the highest committable offset, which haven't been totally completed.
+ * Offset data beyond the highest committable offset, which haven't totally succeeded.
*
- * We only need to know the full incompletes when we do the {@link #findCompletedEligibleOffsetsAndRemove} scan, so
- * find the full sent only then, and discard. Otherwise, for continuous encoding, the encoders track it them
- * selves.
+ * This is independent of the actual queued {@link WorkContainer}s. This is because to start with, data about
+ * incomplete offsets come from the encoded metadata payload that gets committed along with the highest committable
+ * offset ({@link #getOffsetHighestSequentialSucceeded()}). They are not always in sync.
*
- * We work with incompletes, instead of completes, because it's a bet that most of the time the storage space for
- * storing the incompletes in memory will be smaller.
- *
- * @see #findCompletedEligibleOffsetsAndRemove(boolean)
- * @see #encodeWorkResult(boolean, WorkContainer)
- * @see #onSuccess(WorkContainer)
- * @see #onFailure(WorkContainer)
+ * TreeSet so we can always get the lowest offset.
+ *
+ * Needs to be concurrent because, the committer requesting the data to commit may be another thread - the broker
+ * polling sub system - {@link BrokerPollSystem#maybeDoCommit}. The alternative to having this as a concurrent
+ * collection, would be to have the control thread prepare possible commit data on every cycle, and park that data
+ * so that the broker polling thread can grab it, if it wants to commit - i.e. the poller would not prepare/query
+ * the data for itself. See also #200 Refactor: Consider a shared nothing architecture.
*/
- // visible for testing
- // todo should be tracked live, as we know when the state of work containers flips - i.e. they are continuously tracked
- // this is derived from partitionCommitQueues WorkContainer states
- // todo remove setter - leaky abstraction, shouldn't be needed
- @Setter
- private Set incompleteOffsets;
+ private final ConcurrentSkipListSet incompleteOffsets;
- public Set getIncompleteOffsets() {
- return Collections.unmodifiableSet(incompleteOffsets);
- }
+ /**
+ * Cache view of the state of the partition. Is set dirty when the incomplete state of any offset changes. Is set
+ * clean after a successful commit of the state.
+ */
+ @Setter(PRIVATE)
+ @Getter(PRIVATE)
+ private boolean dirty;
/**
* The highest seen offset for a partition.
*
- * Starts off as null - no data
+ * Starts off as -1 - no data. Offsets in Kafka are never negative, so this is fine.
*/
// visible for testing
- @NonNull
@Getter(PUBLIC)
- private Long offsetHighestSeen;
+ private long offsetHighestSeen;
/**
* Highest offset which has completed successfully ("succeeded").
*/
@Getter(PUBLIC)
- @Setter(PRIVATE)
private long offsetHighestSucceeded = -1L;
/**
@@ -84,7 +87,7 @@ public Set getIncompleteOffsets() {
* @see OffsetMapCodecManager#DefaultMaxMetadataSize
*/
@Getter(PACKAGE)
- @Setter(PACKAGE)
+ @Setter(PRIVATE)
private boolean allowedMoreRecords = true;
/**
@@ -94,72 +97,77 @@ public Set getIncompleteOffsets() {
* advancing offsets, as this isn't a guarantee of kafka's.
*
* Concurrent because either the broker poller thread or the control thread may be requesting offset to commit
- * ({@link #findCompletedEligibleOffsetsAndRemove})
- *
- * @see #findCompletedEligibleOffsetsAndRemove
+ * ({@link #getCommitDataIfDirty()}), or reading upon {@link #onPartitionsRemoved}
*/
+ // todo doesn't need to be concurrent any more?
private final NavigableMap> commitQueue = new ConcurrentSkipListMap<>();
- NavigableMap> getCommitQueue() {
+ private NavigableMap> getCommitQueue() {
return Collections.unmodifiableNavigableMap(commitQueue);
}
- public PartitionState(TopicPartition tp, OffsetMapCodecManager.HighestOffsetAndIncompletes incompletes) {
+ public PartitionState(TopicPartition tp, OffsetMapCodecManager.HighestOffsetAndIncompletes offsetData) {
this.tp = tp;
- this.incompleteOffsets = incompletes.getIncompleteOffsets();
- this.offsetHighestSeen = incompletes.getHighestSeenOffset();
+ this.offsetHighestSeen = offsetData.getHighestSeenOffset().orElse(-1L);
+ this.incompleteOffsets = new ConcurrentSkipListSet<>(offsetData.getIncompleteOffsets());
+ this.offsetHighestSucceeded = this.offsetHighestSeen;
}
- public void maybeRaiseHighestSeenOffset(final long highestSeen) {
- // rise the high water mark
- Long oldHighestSeen = this.offsetHighestSeen;
- if (oldHighestSeen == null || highestSeen >= oldHighestSeen) {
- log.trace("Updating highest seen - was: {} now: {}", offsetHighestSeen, highestSeen);
- offsetHighestSeen = highestSeen;
+ private void maybeRaiseHighestSeenOffset(final long offset) {
+ // rise the highest seen offset
+ if (offset >= offsetHighestSeen) {
+ log.trace("Updating highest seen - was: {} now: {}", offsetHighestSeen, offset);
+ offsetHighestSeen = offset;
}
}
- /**
- * Removes all offsets that fall below the new low water mark.
- */
- public void truncateOffsets(final long nextExpectedOffset) {
- incompleteOffsets.removeIf(offset -> offset < nextExpectedOffset);
+ public void onOffsetCommitSuccess(final OffsetAndMetadata committed) {
+ setClean();
}
- public void onOffsetCommitSuccess(final OffsetAndMetadata committed) {
- long nextExpectedOffset = committed.offset();
- truncateOffsets(nextExpectedOffset);
+ private void setClean() {
+ setDirty(false);
+ }
+
+ private void setDirty() {
+ setDirty(true);
}
public boolean isRecordPreviouslyCompleted(final ConsumerRecord rec) {
- boolean previouslyProcessed;
- long offset = rec.offset();
- if (incompleteOffsets.contains(offset)) {
- // record previously saved as not being completed, can exit early
- previouslyProcessed = false;
+ long recOffset = rec.offset();
+ if (!incompleteOffsets.contains(recOffset)) {
+ // if within the range of tracked offsets, must have been previously completed, as it's not in the incomplete set
+ return recOffset <= offsetHighestSeen;
} else {
- Long offsetHighWaterMark = offsetHighestSeen;
- // within the range of tracked offsets, so must have been previously completed
// we haven't recorded this far up, so must not have been processed yet
- previouslyProcessed = offsetHighWaterMark != null && offset <= offsetHighWaterMark;
+ return false;
}
- return previouslyProcessed;
}
public boolean hasWorkInCommitQueue() {
return !commitQueue.isEmpty();
}
- public boolean hasWorkThatNeedsCommitting() {
- return commitQueue.values().parallelStream().anyMatch(x -> x.isUserFunctionSucceeded());
- }
-
public int getCommitQueueSize() {
return commitQueue.size();
}
public void onSuccess(WorkContainer work) {
+ long offset = work.offset();
+
+ WorkContainer removedFromQueue = this.commitQueue.remove(work.offset());
+ assert (removedFromQueue != null);
+
+ boolean removedFromIncompletes = this.incompleteOffsets.remove(offset);
+ assert (removedFromIncompletes);
+
updateHighestSucceededOffsetSoFar(work);
+
+ setDirty();
+ }
+
+ public void onFailure(WorkContainer work) {
+ // no-op
}
/**
@@ -170,21 +178,14 @@ private void updateHighestSucceededOffsetSoFar(WorkContainer work) {
long thisOffset = work.offset();
if (thisOffset > highestSucceeded) {
log.trace("Updating highest completed - was: {} now: {}", highestSucceeded, thisOffset);
- setOffsetHighestSucceeded(thisOffset);
+ this.offsetHighestSucceeded = thisOffset;
}
}
public void addWorkContainer(WorkContainer wc) {
maybeRaiseHighestSeenOffset(wc.offset());
- NavigableMap> queue = this.commitQueue;
- queue.put(wc.offset(), wc);
- }
-
- public void remove(LinkedList> workToRemove) {
- for (var workContainer : workToRemove) {
- var offset = workContainer.getCr().offset();
- this.commitQueue.remove(offset);
- }
+ commitQueue.put(wc.offset(), wc);
+ incompleteOffsets.add(wc.offset());
}
/**
@@ -196,4 +197,119 @@ public boolean isRemoved() {
return false;
}
+ public Optional getCommitDataIfDirty() {
+ if (isDirty())
+ return of(createOffsetAndMetadata());
+ else
+ return empty();
+ }
+
+ private OffsetAndMetadata createOffsetAndMetadata() {
+ Optional payloadOpt = tryToEncodeOffsets();
+ long nextOffset = getNextExpectedPolledOffset();
+ return payloadOpt
+ .map(s -> new OffsetAndMetadata(nextOffset, s))
+ .orElseGet(() -> new OffsetAndMetadata(nextOffset));
+ }
+
+ private long getNextExpectedPolledOffset() {
+ return getOffsetHighestSequentialSucceeded() + 1;
+ }
+
+ /**
+ * @return all incomplete offsets of buffered work in this shard, even if higher than the highest succeeded
+ */
+ public Set getAllIncompleteOffsets() {
+ //noinspection FuseStreamOperations - only in java 10
+ return Collections.unmodifiableSet(incompleteOffsets.parallelStream()
+ .collect(Collectors.toSet()));
+ }
+
+ /**
+ * @return incomplete offsets which are lower than the highest succeeded
+ */
+ public Set getIncompleteOffsetsBelowHighestSucceeded() {
+ long highestSucceeded = getOffsetHighestSucceeded();
+ return Collections.unmodifiableSet(incompleteOffsets.parallelStream()
+ // todo less than or less than and equal?
+ .filter(x -> x < highestSucceeded)
+ .collect(Collectors.toSet()));
+ }
+
+ public long getOffsetHighestSequentialSucceeded() {
+ if (this.incompleteOffsets.isEmpty()) {
+ return this.offsetHighestSeen;
+ } else {
+ return this.incompleteOffsets.first() - 1;
+ }
+ }
+
+ /**
+ * Tries to encode the incomplete offsets for this partition. This may not be possible if there are none, or if no
+ * encodings are possible ({@link NoEncodingPossibleException}. Encoding may not be possible of - see {@link
+ * OffsetMapCodecManager#makeOffsetMetadataPayload}.
+ *
+ * @return if possible, the String encoded offset map
+ */
+ private Optional tryToEncodeOffsets() {
+ if (incompleteOffsets.isEmpty()) {
+ setAllowedMoreRecords(true);
+ return empty();
+ }
+
+ try {
+ // todo refactor use of null shouldn't be needed. Is OffsetMapCodecManager stateful? remove null #233
+ OffsetMapCodecManager om = new OffsetMapCodecManager<>(null);
+ long offsetOfNextExpectedMessage = getNextExpectedPolledOffset();
+ String offsetMapPayload = om.makeOffsetMetadataPayload(offsetOfNextExpectedMessage, this);
+ boolean mustStrip = updateBlockFromEncodingResult(offsetMapPayload);
+ if (mustStrip) {
+ return empty();
+ } else {
+ return of(offsetMapPayload);
+ }
+ } catch (NoEncodingPossibleException e) {
+ setAllowedMoreRecords(false);
+ log.warn("No encodings could be used to encode the offset map, skipping. Warning: messages might be replayed on rebalance.", e);
+ return empty();
+ }
+ }
+
+ /**
+ * @return true if the payload is too large and must be stripped
+ */
+ private boolean updateBlockFromEncodingResult(String offsetMapPayload) {
+ int metaPayloadLength = offsetMapPayload.length();
+ boolean mustStrip = false;
+
+ if (metaPayloadLength > DefaultMaxMetadataSize) {
+ // exceeded maximum API allowed, strip the payload
+ mustStrip = true;
+ setAllowedMoreRecords(false);
+ log.warn("Offset map data too large (size: {}) to fit in metadata payload hard limit of {} - cannot include in commit. " +
+ "Warning: messages might be replayed on rebalance. " +
+ "See kafka.coordinator.group.OffsetConfig#DefaultMaxMetadataSize = {} and issue #47.",
+ metaPayloadLength, DefaultMaxMetadataSize, DefaultMaxMetadataSize);
+ } else if (metaPayloadLength > getPressureThresholdValue()) { // and thus metaPayloadLength <= DefaultMaxMetadataSize
+ // try to turn on back pressure before max size is reached
+ setAllowedMoreRecords(false);
+ log.warn("Payload size {} higher than threshold {}, but still lower than max {}. Will write payload, but will " +
+ "not allow further messages, in order to allow the offset data to shrink (via succeeding messages).",
+ metaPayloadLength, getPressureThresholdValue(), DefaultMaxMetadataSize);
+
+ } else { // and thus (metaPayloadLength <= pressureThresholdValue)
+ setAllowedMoreRecords(true);
+ log.debug("Payload size {} within threshold {}", metaPayloadLength, getPressureThresholdValue());
+ }
+
+ return mustStrip;
+ }
+
+ private double getPressureThresholdValue() {
+ return DefaultMaxMetadataSize * PartitionMonitor.getUSED_PAYLOAD_THRESHOLD_MULTIPLIER();
+ }
+
+ public void onPartitionsRemoved(ShardManager sm) {
+ sm.removeAnyShardsReferencedBy(getCommitQueue());
+ }
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java
index c1a6ae65a..22ff310fd 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java
@@ -12,10 +12,7 @@
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.Optional;
+import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
@@ -90,7 +87,7 @@ public WorkContainer remove(long offset) {
ArrayList> getWorkIfAvailable(int workToGetDelta) {
log.trace("Looking for work on shardQueueEntry: {}", getKey());
- var slowWork = new ArrayList>();
+ var slowWork = new HashSet>();
var workTaken = new ArrayList>();
var iterator = entries.entrySet().iterator();
@@ -125,7 +122,7 @@ ArrayList> getWorkIfAvailable(int workToGetDelta) {
return workTaken;
}
- private void logSlowWork(ArrayList> slowWork) {
+ private void logSlowWork(Set> slowWork) {
// log
if (!slowWork.isEmpty()) {
List slowTopics = slowWork.parallelStream()
@@ -137,7 +134,7 @@ private void logSlowWork(ArrayList> slowWork) {
}
}
- private void addToSlowWorkMaybe(ArrayList> slowWork, WorkContainer, ?> workContainer) {
+ private void addToSlowWorkMaybe(Set> slowWork, WorkContainer, ?> workContainer) {
var msgTemplate = "Can't take as work: Work ({}). Must all be true: Delay passed= {}. Is not in flight= {}. Has not succeeded already= {}. Time spent in execution queue: {}.";
Duration timeInFlight = workContainer.getTimeInFlight();
var msg = msg(msgTemplate, workContainer, workContainer.hasDelayPassed(), workContainer.isNotInFlight(), !workContainer.isUserFunctionSucceeded(), timeInFlight);
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/RemovedPartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/RemovedPartitionState.java
index f1928c61b..50b2f9749 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/RemovedPartitionState.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/RemovedPartitionState.java
@@ -6,10 +6,8 @@
import io.confluent.csid.utils.KafkaUtils;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
-import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
@@ -62,14 +60,6 @@ public void addWorkContainer(final WorkContainer wc) {
log.warn("Dropping new work container for partition no longer assigned. WC: {}", wc);
}
- @Override
- public void remove(final LinkedList> workToRemove) {
- if (!workToRemove.isEmpty()) {
- // no-op
- log.debug("Dropping work container to remove for partition no longer assigned. WC: {}", workToRemove);
- }
- }
-
/**
* Don't allow more records to be processed for this partition. Eventually these records triggering this check will
* be cleaned out.
@@ -83,55 +73,24 @@ boolean isAllowedMoreRecords() {
}
@Override
- public Set getIncompleteOffsets() {
+ public Set getIncompleteOffsetsBelowHighestSucceeded() {
log.debug(NO_OP);
//noinspection unchecked - by using unsave generics, we are able to share one static instance
return READ_ONLY_EMPTY_SET;
}
@Override
- public @NonNull Long getOffsetHighestSeen() {
+ public long getOffsetHighestSeen() {
log.debug(NO_OP);
return -1L;
}
@Override
- public long getOffsetHighestSucceeded() { // NOSONAR
+ public long getOffsetHighestSucceeded() {
log.debug(NO_OP);
return -1L;
}
- @Override
- NavigableMap> getCommitQueue() {
- //noinspection unchecked - by using unsave generics, we are able to share one static instance
- return READ_ONLY_EMPTY_MAP;
- }
-
- @Override
- public void setIncompleteOffsets(final Set incompleteOffsets) {
- log.debug(NO_OP);
- }
-
- @Override
- void setAllowedMoreRecords(final boolean allowedMoreRecords) {
- log.debug(NO_OP);
- }
-
- @Override
- public void maybeRaiseHighestSeenOffset(final long highestSeen) {
- log.debug(NO_OP);
- }
-
- @Override
- public void truncateOffsets(final long nextExpectedOffset) {
- log.debug(NO_OP);
- }
-
- @Override
- public void onOffsetCommitSuccess(final OffsetAndMetadata committed) {
- log.debug(NO_OP);
- }
-
@Override
public boolean isRecordPreviouslyCompleted(final ConsumerRecord rec) {
log.debug("Ignoring previously completed request for partition no longer assigned. Partition: {}", KafkaUtils.toTopicPartition(rec));
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java
index 71c9f71ba..fdd100016 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java
@@ -196,7 +196,8 @@ public List> getWorkIfAvailable(final int requestedMaxWorkTo
* @return the number of extra records ingested due to request
*/
// todo rename - shunt messages from internal buffer into queues
- int tryToEnsureQuantityOfWorkQueuedAvailable(final int requestedMaxWorkToRetrieve) {
+ // visible for testing
+ public int tryToEnsureQuantityOfWorkQueuedAvailable(final int requestedMaxWorkToRetrieve) {
// todo this counts all partitions as a whole - this may cause some partitions to starve. need to round robin it?
long available = sm.getNumberOfWorkQueuedInShardsAwaitingSelection();
long extraNeededFromInboxToSatisfy = requestedMaxWorkToRetrieve - available;
@@ -238,6 +239,7 @@ public void onOffsetCommitSuccess(Map committ
public void onFailureResult(WorkContainer wc) {
// error occurred, put it back in the queue if it can be retried
wc.endFlight();
+ pm.onFailure(wc);
sm.onFailure(wc);
numberRecordsOutForProcessing--;
}
@@ -250,9 +252,8 @@ public Integer getAmountOfWorkQueuedWaitingIngestion() {
return wmbm.getAmountOfWorkQueuedWaitingIngestion();
}
- // todo rename
- public Map findCompletedEligibleOffsetsAndRemove() {
- return pm.findCompletedEligibleOffsetsAndRemove();
+ public Map collectCommitDataForDirtyPartitions() {
+ return pm.collectDirtyCommitData();
}
/**
diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/DbTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/DbTest.java
index 6e620da24..f0bcbb2cf 100644
--- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/DbTest.java
+++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/DbTest.java
@@ -1,8 +1,8 @@
+package io.confluent.parallelconsumer.integrationTests;
/*-
- * Copyright (C) 2020-2021 Confluent, Inc.
+ * Copyright (C) 2020-2022 Confluent, Inc.
*/
-package io.confluent.parallelconsumer.integrationTests;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -19,14 +19,15 @@
import static org.assertj.core.api.Assertions.assertThat;
/**
- * Simulate real forward pressure, back pressure and error conditions by testing against a real database
+ * Simulate real forward pressure, back pressure and error conditions by testing against a real database, instead of
+ * just simulating "work" with a random sleep.
*/
@Slf4j
public class DbTest extends BrokerIntegrationTest {
protected static final PostgreSQLContainer dbc;
- /**
+ /*
* https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers
* https://github.com/testcontainers/testcontainers-java/pull/1781
*/
diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java
index 167828bc2..f12d7f7b3 100644
--- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java
+++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java
@@ -28,7 +28,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
-import java.util.stream.Collectors;
import static io.confluent.csid.utils.GeneralTestUtils.time;
import static io.confluent.csid.utils.Range.range;
@@ -96,7 +95,7 @@ void load(CommitMode commitMode) {
List