Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
21f08f6
START: the basics of a single queue
astubbs Mar 8, 2022
3f1f3ff
step: remove work mailbox manager
astubbs Mar 8, 2022
92b50cb
step: BROKEN: assign epoch to record immediately
astubbs Mar 8, 2022
1653bc2
step - trying to test perf
astubbs Mar 17, 2022
103c677
update
astubbs Mar 17, 2022
eed2190
logs
astubbs Mar 17, 2022
02ee3cd
fix: Debug output for sorted encoding pairs
astubbs Mar 17, 2022
83fda73
save
astubbs Mar 17, 2022
2a37d46
rebase update
astubbs Mar 25, 2022
b17c838
step
astubbs Apr 4, 2022
e1141e4
save
astubbs Apr 5, 2022
5a3bb55
save
astubbs Apr 5, 2022
6a1464c
save: unit test version of offset encoding backpressure test
astubbs Apr 5, 2022
0604934
save
astubbs Apr 5, 2022
4eeb008
omg - hashsets vs queues, wow
astubbs Apr 5, 2022
bcfc9c1
review
astubbs Apr 6, 2022
6054ac5
review
astubbs Apr 6, 2022
c968629
review
astubbs Apr 6, 2022
0f993dd
review
astubbs Apr 6, 2022
416fd2f
Merge remote-tracking branch 'confluent/master' into features/single-…
astubbs Apr 21, 2022
189dc59
step
astubbs Apr 21, 2022
908d8ed
step
astubbs Apr 21, 2022
7547ec6
fix test
astubbs Apr 21, 2022
939a15e
step - test fix?
astubbs Apr 21, 2022
3fa6ae3
step - test fix?
astubbs Apr 21, 2022
c44f50a
step - test fix? - make sure unit test also cleans up
astubbs Apr 21, 2022
eff0b13
step - test fix? - make sure unit test also cleans up
astubbs Apr 21, 2022
74e0efb
step: reduce consumer max poll
astubbs Apr 21, 2022
ae1ce22
step: loosen duplicate check a bit for jenkins
astubbs Apr 21, 2022
62ffa63
step: fix generics
astubbs Apr 21, 2022
bf4452e
step: Experiment: synchronisation no longer needed due to stronger ep…
astubbs Apr 21, 2022
94ebc5c
turn max poll back to default (500)
astubbs Apr 21, 2022
854c0fa
Draft for pause/resume functionality (confluentinc/parallel-consumer#…
nioertel Feb 17, 2022
1401a28
Improve logging + add unit tests (confluentinc/parallel-consumer#193).
nioertel Feb 18, 2022
cada440
Fix copyrights.
nioertel Feb 19, 2022
2761631
Don't pause the BrokerPollSystem when pausing the consumer. Improve j…
nioertel Feb 28, 2022
dcc01bc
Polish: Make MDC key for work container descriptor a constant.
nioertel Feb 28, 2022
0d9a4fc
step: tweeks
astubbs Mar 2, 2022
77520e4
step: simplify
astubbs Mar 14, 2022
439ad74
step: review
astubbs Mar 14, 2022
753ea81
step: turn back up default timeout to 30 - 10 is too low for some things
astubbs Mar 14, 2022
d7f9978
update
astubbs Mar 23, 2022
74eac43
review
astubbs Mar 23, 2022
76c6fef
rebase compile update
astubbs Mar 25, 2022
b85fd2d
review
astubbs Apr 22, 2022
c6056fe
review
astubbs Apr 22, 2022
333ccac
review
astubbs Apr 22, 2022
d751aa4
review
astubbs Apr 22, 2022
aa5c0e1
fix
astubbs Apr 22, 2022
3b51ffe
START: Rename PartitionMonitor to PartitionStateManager
astubbs Apr 22, 2022
e336064
Merge branch 'features/single-queue' into features/pause-resume
astubbs Apr 22, 2022
cc5debf
START: Rename PartitionMonitor to PartitionStateManager
astubbs Apr 22, 2022
cd4b59a
Merge branch 'pm-rename' into features/pause-resume
astubbs Apr 22, 2022
4f137fa
Merge remote-tracking branch 'confluent/master' into features/pause-r…
astubbs Apr 22, 2022
64a9f1e
Merge remote-tracking branch 'confluent/master' into features/pause-r…
astubbs May 16, 2022
b6ad2ec
docs
astubbs May 16, 2022
9164c44
review
astubbs May 16, 2022
968704b
review
astubbs May 16, 2022
2a9e5c8
fix test - processing order will affect the actual offsets that are c…
astubbs May 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,17 @@ endif::[]

== Next Version

== v0.5.1.0

=== Features

* #193: Pause / Resume PC (circuit breaker) without unsubscribing from topics

== v0.5.0.1

=== Fixes and Improvements

- fixes: #225 Build support for Java 17, 18 (#289)
* fixes: #225 Build support for Java 17, 18 (#289)

== v0.5.0.0

Expand Down
11 changes: 9 additions & 2 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ without operational burden or harming the cluster's performance
* Java 8 compatibility
* Throttle control and broker liveliness management
* Clean draining shutdown cycle
* Manual Pause / Resume of entire PC without unsubscribing from topics (useful for implementing a simplistic https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern[circuit breaker])
** Note: Pausing of a partition is also automatic, whenever back pressure has built up on a given partition

//image:https://codecov.io/gh/astubbs/parallel-consumer/branch/master/graph/badge.svg["Coverage",https://codecov.io/gh/astubbs/parallel-consumer]
//image:https://travis-ci.com/astubbs/parallel-consumer.svg?branch=master["Build Status", link="https://travis-ci.com/astubbs/parallel-consumer"]
Expand Down Expand Up @@ -475,7 +477,6 @@ See {issues_link}/12[issue #12], and the `ParallelConsumer` JavaDoc:
* @param <K> key consume / produce key type
* @param <V> value consume / produce value type
* @see AbstractParallelEoSStreamProcessor
* @see #poll(Consumer)
*/
----

Expand Down Expand Up @@ -1162,11 +1163,17 @@ endif::[]

== Next Version

== v0.5.1.0

=== Features

* #193: Pause / Resume PC (circuit breaker) without unsubscribing from topics

== v0.5.0.1

=== Fixes and Improvements

- fixes: #225 Build support for Java 17, 18 (#289)
* fixes: #225 Build support for Java 17, 18 (#289)

== v0.5.0.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collection;
import java.util.function.Consumer;
import java.util.regex.Pattern;

// tag::javadoc[]
Expand All @@ -24,31 +24,60 @@
* @param <K> key consume / produce key type
* @param <V> value consume / produce value type
* @see AbstractParallelEoSStreamProcessor
* @see #poll(Consumer)
*/
// end::javadoc[]
public interface ParallelConsumer<K, V> extends DrainingCloseable {

/**
* @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)
* @see KafkaConsumer#subscribe(Collection)
*/
void subscribe(Collection<String> topics);

/**
* @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Pattern)
* @see KafkaConsumer#subscribe(Pattern)
*/
void subscribe(Pattern pattern);

/**
* @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection, ConsumerRebalanceListener)
* @see KafkaConsumer#subscribe(Collection, ConsumerRebalanceListener)
*/
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);

/**
* @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Pattern, ConsumerRebalanceListener)
* @see KafkaConsumer#subscribe(Pattern, ConsumerRebalanceListener)
*/
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);

/**
* Pause this consumer (i.e. stop processing of messages).
* <p>
* This operation only has an effect if the consumer is currently running. In all other cases calling this method
* will be silent a no-op.
* <p>
* Once the consumer is paused, the system will stop submitting work to the processing pool. Already submitted in
* flight work however will be finished. This includes work that is currently being processed inside a user function
* as well as work that has already been submitted to the processing pool but has not been picked up by a free
* worker yet.
* <p>
* General remarks:
* <ul>
* <li>A paused consumer may still keep polling for new work until internal buffers are filled.</li>
* <li>This operation does not actively pause the subscription on the underlying Kafka Broker (compared to
* {@link KafkaConsumer#pause KafkaConsumer#pause}).</li>
* <li>Pending offset commits will still be performed when the consumer is paused.</li>
* </p>
*/
void pauseIfRunning();

/**
* Resume this consumer (i.e. continue processing of messages).
* <p>
* This operation only has an effect if the consumer is currently paused. In all other cases calling this method
* will be a silent no-op.
* </p>
*/
void resumeIfPaused();

/**
* A simple tuple structure.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class ParallelEoSStreamProcessor<K, V> extends AbstractParallelEoSStreamP
implements ParallelStreamProcessor<K, V> {

/**
* Construct the AsyncConsumer by wrapping this passed in conusmer and producer, which can be configured any which
* Construct the AsyncConsumer by wrapping this passed in consumer and producer, which can be configured any which
* way as per normal.
*
* @see ParallelConsumerOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.confluent.parallelconsumer.state.WorkContainer;
import lombok.Getter;
import lombok.ToString;
import lombok.experimental.Delegate;
import org.apache.kafka.clients.consumer.ConsumerRecord;

Expand All @@ -16,6 +17,7 @@
/**
* Internal only view on the {@link PollContext}.
*/
@ToString
public class PollContextInternal<K, V> {

@Delegate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

import io.confluent.parallelconsumer.state.WorkContainer;
import lombok.Getter;
import lombok.ToString;

/**
* Internal only view of the {@link RecordContext} class.
*/
@ToString
public class RecordContextInternal<K, V> {

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public abstract class AbstractParallelEoSStreamProcessor<K, V> implements Parall
public static final String MDC_INSTANCE_ID = "pcId";
public static final String MDC_OFFSET_MARKER = "offset";

/**
* Key for the work container descriptor that will be added to the {@link MDC diagnostic context} while inside a
* user function.
*/
private static final String MDC_WORK_CONTAINER_DESCRIPTOR = "offset";

@Getter(PROTECTED)
protected final ParallelConsumerOptions options;

Expand Down Expand Up @@ -664,7 +670,7 @@ private <R> void controlLoop(Function<PollContextInternal<K, V>, List<R>> userFu
log.trace("Loop: Process mailbox");
processWorkCompleteMailBox();

if (state == running) {
if (isIdlingOrRunning()) {
// offsets will be committed when the consumer has its partitions revoked
log.trace("Loop: Maybe commit");
commitOffsetsMaybe();
Expand Down Expand Up @@ -947,9 +953,9 @@ private void processWorkCompleteMailBox() {
wm.registerWork(action.getConsumerRecords());
} else {
WorkContainer<K, V> work = action.getWorkContainer();
MDC.put(MDC_OFFSET_MARKER, work.toString());
MDC.put(MDC_WORK_CONTAINER_DESCRIPTOR, work.toString());
wm.handleFutureResult(work);
MDC.remove(MDC_OFFSET_MARKER);
MDC.remove(MDC_WORK_CONTAINER_DESCRIPTOR);
}
}
}
Expand Down Expand Up @@ -984,6 +990,11 @@ private Duration getTimeToBlockFor() {
return effectiveCommitAttemptDelay;
}

private boolean isIdlingOrRunning() {
return state == running || state == draining || state == paused;
}


/**
* Conditionally commit offsets to broker
*/
Expand Down Expand Up @@ -1045,7 +1056,7 @@ private boolean lingeringOnCommitWouldBeBeneficial() {

private Duration getTimeToNextCommitCheck() {
// draining is a normal running mode for the controller
if (state == running || state == draining) {
if (isIdlingOrRunning()) {
Duration timeSinceLastCommit = getTimeSinceLastCheck();
Duration timeBetweenCommits = getTimeBetweenCommits();
@SuppressWarnings("UnnecessaryLocalVariable")
Expand All @@ -1065,6 +1076,7 @@ private Duration getTimeSinceLastCheck() {
private void commitOffsetsThatAreReady() {
log.debug("Committing offsets that are ready...");
synchronized (commitCommand) {
log.debug("Committing offsets that are ready...");
committer.retrieveOffsetsAndCommit();
clearCommitCommand();
this.lastCommitTime = Instant.now();
Expand All @@ -1086,7 +1098,7 @@ protected <R> List<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> runUserFunct
try {
if (log.isDebugEnabled()) {
// first offset of the batch
MDC.put("offset", workContainerBatch.get(0).offset() + "");
MDC.put(MDC_WORK_CONTAINER_DESCRIPTOR, workContainerBatch.get(0).offset() + "");
}
log.trace("Pool received: {}", workContainerBatch);

Expand Down Expand Up @@ -1212,4 +1224,25 @@ private void clearCommitCommand() {
}
}

@Override
public void pauseIfRunning() {
if (this.state == State.running) {
log.info("Transitioning parallel consumer to state paused.");
this.state = State.paused;
} else {
log.debug("Skipping transition of parallel consumer to state paused. Current state is {}.", this.state);
}
}

@Override
public void resumeIfPaused() {
if (this.state == State.paused) {
log.info("Transitioning paarallel consumer to state running.");
this.state = State.running;
notifySomethingToDo();
} else {
log.debug("Skipping transition of parallel consumer to state running. Current state is {}.", this.state);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public class BrokerPollSystem<K, V> implements OffsetCommitter {

private Optional<Future<Boolean>> pollControlThreadFuture;

/**
* While {@link io.confluent.parallelconsumer.internal.State#paused paused} is an externally controlled state that
* temporarily stops polling and work registration, the {@code paused} flag is used internally to pause
* subscriptions if polling needs to be throttled.
*/
@Getter
private volatile boolean paused = false;

Expand Down Expand Up @@ -325,4 +330,36 @@ public void wakeupIfPaused() {
if (paused)
consumerManager.wakeup();
}

/**
* Pause polling from the underlying Kafka Broker.
* <p>
* Note: If the poll system is currently not in state {@link io.confluent.parallelconsumer.internal.State#running
* running}, calling this method will be a no-op.
* </p>
*/
public void pausePollingAndWorkRegistrationIfRunning() {
if (this.state == State.running) {
log.info("Transitioning broker poll system to state paused.");
this.state = State.paused;
} else {
log.info("Skipping transition of broker poll system to state paused. Current state is {}.", this.state);
}
}

/**
* Resume polling from the underlying Kafka Broker.
* <p>
* Note: If the poll system is currently not in state {@link io.confluent.parallelconsumer.internal.State#paused
* paused}, calling this method will be a no-op.
* </p>
*/
public void resumePollingAndWorkRegistrationIfPaused() {
if (this.state == State.paused) {
log.info("Transitioning broker poll system to state running.");
this.state = State.running;
} else {
log.info("Skipping transition of broker poll system to state running. Current state is {}.", this.state);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.internal;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

/**
Expand All @@ -10,6 +10,13 @@
public enum State {
unused,
running,
/**
* When paused, the system will stop submitting work to the processing pool. Polling for new work however may
* continue until internal buffers have been filled sufficiently and the auto-throttling takes effect.
* In flight work will not be affected by transitioning to this state (i.e. processing will finish without any
* interrupts being sent).
*/
paused,
/**
* When draining, the system will stop polling for more records, but will attempt to process all already downloaded
* records. Note that if you choose to close without draining, records already processed will still be committed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,19 @@ public void assertCommits(MockProducer mp, List<Integer> expectedOffsets, Option
}

public List<Integer> getProducerCommitsFlattened(MockProducer mp) {
return getProducerCommitsMeta(mp).stream().map(x -> (int) x.offset()).collect(Collectors.toList());
}

public List<OffsetAndMetadata> getProducerCommitsMeta(MockProducer mp) {
List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> history = mp.consumerGroupOffsetsHistory();

List<Integer> set = history.stream().flatMap(histories -> {
List<OffsetAndMetadata> set = history.stream().flatMap(histories -> {
// get all partition offsets and flatten
var results = new ArrayList<Integer>();
ArrayList<OffsetAndMetadata> results = new ArrayList<>();
var group = histories.get(CONSUMER_GROUP_ID);
for (var partitionOffsets : group.entrySet()) {
OffsetAndMetadata commit = partitionOffsets.getValue();
int offset = (int) commit.offset();
results.add(offset);
results.add(commit);
}
return results.stream();
}).collect(Collectors.toList()); // set - ignore repeated commits ({@link OffsetMap})
Expand Down
Loading