Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
65b78d8
START: Partition state tracking
astubbs Mar 8, 2022
a4d0317
update
astubbs Mar 23, 2022
1e36a3f
review
astubbs Mar 23, 2022
ed5f566
minor
astubbs Mar 23, 2022
2f453e9
license
astubbs Mar 23, 2022
4fb7631
delete dead code paths
astubbs Mar 23, 2022
fa2dd8b
review
astubbs Mar 23, 2022
8b5a9a5
review
astubbs Mar 23, 2022
b8b685f
review
astubbs Mar 23, 2022
aecd95e
save
astubbs Mar 23, 2022
420a6b3
Merge remote-tracking branch 'origin/master' into features/partition-…
astubbs Mar 24, 2022
785e4e8
save
astubbs Mar 24, 2022
5c6efc1
step: go back to tracking offsets not work containers
astubbs Mar 24, 2022
46e8f50
step
astubbs Mar 24, 2022
8e70c59
use -1 instead of optional
astubbs Mar 24, 2022
1d49046
save
astubbs Mar 24, 2022
432f197
save
astubbs Mar 24, 2022
63093fc
save
astubbs Mar 24, 2022
f4e275a
save
astubbs Mar 24, 2022
bc4f1dc
save
astubbs Mar 24, 2022
c12b9d6
save
astubbs Mar 24, 2022
25b0540
mostly passing
astubbs Mar 24, 2022
e874d7c
step
astubbs Mar 24, 2022
d617d36
step
astubbs Mar 24, 2022
39fbcfd
OffsetEncodingTests pass
astubbs Mar 24, 2022
ddae089
step
astubbs Mar 24, 2022
7d9cb58
step: fix state - wc removal, incompletes etc
astubbs Mar 25, 2022
7766c69
step: remove tuple result
astubbs Mar 25, 2022
59eebbb
don't need to filter incompletes to see if contains
astubbs Mar 25, 2022
4e8bd57
rename find eligible and remove
astubbs Mar 25, 2022
853c798
license and move utilities
astubbs Mar 25, 2022
1a8cc4f
step
astubbs Mar 25, 2022
fc2576d
test update
astubbs Mar 25, 2022
424c197
review
astubbs Mar 25, 2022
4581ca4
review
astubbs Mar 25, 2022
d1c1e8d
java 8
astubbs Mar 25, 2022
f9f4475
java 8
astubbs Mar 25, 2022
8ba4cba
review
astubbs Mar 25, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions parallel-consumer-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
</properties>

<dependencies>
<!-- Main -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand All @@ -37,6 +38,7 @@
<version>1.1.8.4</version>
<scope>compile</scope>
</dependency>

<!-- Testing -->
<dependency>
<groupId>org.awaitility</groupId>
Expand Down Expand Up @@ -81,6 +83,7 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
Expand All @@ -103,20 +106,18 @@
<artifactId>truth-java8-extension</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.flogger</groupId>
<artifactId>flogger</artifactId>
</dependency>
<dependency>
<groupId>com.google.flogger</groupId>
<artifactId>flogger-slf4j-backend</artifactId>
</dependency>
<dependency>
<groupId>org.threeten</groupId>
<artifactId>threeten-extra</artifactId>
<version>1.7.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>one.util</groupId>
<artifactId>streamex</artifactId>
<version>0.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.time.Duration.ofMillis;

Expand Down Expand Up @@ -39,4 +41,19 @@ public static boolean isGreaterThan(Duration compare, Duration to) {
return compare.compareTo(to) > 0;
}

/**
* A shortcut for changing only the values of a Map.
* <p>
* https://stackoverflow.com/a/50740570/105741
*/
public static <K, V1, V2> Map<K, V2> remap(Map<K, V1> map,
Function<? super V1, ? extends V2> function) {
return map.entrySet()
.stream() // or parallel
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> function.apply(e.getValue())
));
}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

/**
* This exception is only used when there is an exception thrown from code provided by the user.
*/
public class ErrorInUserFunctionException extends RuntimeException {
public class ErrorInUserFunctionException extends ParallelConsumerException {
public ErrorInUserFunctionException(final String message, final Throwable cause) {
super(message, cause);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/


/**
* Generic Parallel Consumer {@link RuntimeException} parent.
*/
public class ParallelConsumerException extends RuntimeException {
public ParallelConsumerException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.state.WorkManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -25,10 +26,9 @@ public abstract class AbstractOffsetCommitter<K, V> implements OffsetCommitter {
@Override
public void retrieveOffsetsAndCommit() {
log.debug("Commit starting - find completed work to commit offsets");
// todo shouldn't be removed until commit succeeds (there's no harm in committing the same offset twice)
preAcquireWork();
try {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = wm.findCompletedEligibleOffsetsAndRemove();
var offsetsToCommit = wm.collectCommitDataForDirtyPartitions();
if (offsetsToCommit.isEmpty()) {
log.debug("No offsets ready");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import com.google.common.flogger.FluentLogger;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.state.WorkContainer;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -18,8 +17,6 @@
@Slf4j
public abstract class ExternalEngine<K, V> extends AbstractParallelEoSStreamProcessor<K, V> {

private static final FluentLogger flog = FluentLogger.forEnclosingClass();

protected ExternalEngine(final ParallelConsumerOptions<K, V> newOptions) {
super(newOptions);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.confluent.parallelconsumer.internal;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

/**
* Generic Parallel Consumer parent exception.
*/
public class ParallelConsumerInternalException extends Exception {
public ParallelConsumerInternalException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.internal;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.csid.utils.TimeUtils;
Expand All @@ -21,7 +21,6 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.ConcurrentModificationException;
import java.util.Map;
import java.util.concurrent.Future;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package io.confluent.parallelconsumer.internal;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import lombok.Getter;
import lombok.SneakyThrows;

import java.time.Duration;

Expand All @@ -22,7 +21,6 @@ public RateLimiter(int seconds) {
this.rate = Duration.ofSeconds(seconds);
}

@SneakyThrows
public void performIfNotLimited(final Runnable action) {
if (isOkToCallAction()) {
lastFireMs = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package io.confluent.parallelconsumer.offsets;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.ParallelConsumerInternalException;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
*/
public class EncodingNotSupportedException extends Exception {
public class EncodingNotSupportedException extends ParallelConsumerInternalException {
public EncodingNotSupportedException(final String message) {
super(message);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.confluent.parallelconsumer.offsets;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.ParallelConsumerInternalException;

public class NoEncodingPossibleException extends ParallelConsumerInternalException {

public NoEncodingPossibleException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.offsets;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import lombok.SneakyThrows;
Expand All @@ -18,7 +18,7 @@ abstract class OffsetEncoder {

private final OffsetSimultaneousEncoder offsetSimultaneousEncoder;

public OffsetEncoder(OffsetSimultaneousEncoder offsetSimultaneousEncoder) {
protected OffsetEncoder(OffsetSimultaneousEncoder offsetSimultaneousEncoder) {
this.offsetSimultaneousEncoder = offsetSimultaneousEncoder;
}

Expand Down Expand Up @@ -50,7 +50,8 @@ void register() throws EncodingNotSupportedException {

private void register(final OffsetEncoding type, final byte[] bytes) {
log.debug("Registering {}, with site {}", type, bytes.length);
offsetSimultaneousEncoder.sortedEncodings.add(new EncodedOffsetPair(type, ByteBuffer.wrap(bytes)));
EncodedOffsetPair encodedPair = new EncodedOffsetPair(type, ByteBuffer.wrap(bytes));
offsetSimultaneousEncoder.sortedEncodings.add(encodedPair);
offsetSimultaneousEncoder.encodingMap.put(type, bytes);
}

Expand Down
Loading