Skip to content

Commit 1d19e37

Browse files
committed
START: scatter-gather POC
1 parent 20b494b commit 1d19e37

File tree

8 files changed

+245
-21
lines changed

8 files changed

+245
-21
lines changed

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public void start(String managedExecutorService) {
7878
try {
7979
executorService = InitialContext.doLookup(managedExecutorService);
8080
} catch (NamingException e) {
81-
log.debug("Using Java SE Thread",e);
81+
log.debug("Using Java SE Thread", e);
8282
executorService = Executors.newSingleThreadExecutor();
8383
}
8484
Future<Boolean> submit = executorService.submit(this::controlLoop);
@@ -140,7 +140,7 @@ private boolean controlLoop() {
140140
log.debug("Broker poll thread finished, returning true to future");
141141
return true;
142142
} catch (Exception e) {
143-
log.error("Unknown error", e);
143+
log.error("Unexpected error:", e);
144144
throw e;
145145
}
146146
}

parallel-consumer-core/src/test/java/io/confluent/csid/utils/WireMockUtils.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,48 @@
55
*/
66

77
import com.github.tomakehurst.wiremock.WireMockServer;
8-
import com.github.tomakehurst.wiremock.client.MappingBuilder;
9-
import com.github.tomakehurst.wiremock.client.WireMock;
108
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
9+
import com.github.tomakehurst.wiremock.http.UniformDistribution;
10+
11+
import static com.github.tomakehurst.wiremock.client.WireMock.*;
12+
import static org.mockito.BDDMockito.willReturn;
1113

1214
public class WireMockUtils {
1315

1416
public static final String stubResponse = "Good times.";
1517

1618
public WireMockServer setupWireMock() {
1719
WireMockServer stubServer;
18-
WireMockConfiguration options = WireMockConfiguration.wireMockConfig().dynamicPort();
20+
WireMockConfiguration options = WireMockConfiguration.wireMockConfig()
21+
.dynamicPort()
22+
.containerThreads(100); // make sure we can respond in parallel
1923
stubServer = new WireMockServer(options);
20-
MappingBuilder mappingBuilder = WireMock.get(WireMock.urlPathEqualTo("/"))
21-
.willReturn(WireMock.aResponse()
22-
.withBody(stubResponse));
23-
stubServer.stubFor(mappingBuilder);
24-
stubServer.stubFor(WireMock.get(WireMock.urlPathEqualTo("/api")).
25-
willReturn(WireMock.aResponse().withBody(stubResponse)));
24+
25+
stubServer.stubFor(get(urlPathEqualTo("/"))
26+
.willReturn(aResponse()
27+
.withBody(stubResponse)));
28+
29+
stubServer.stubFor(get(urlPathEqualTo("/api")).
30+
willReturn(aResponse()
31+
.withBody(stubResponse)));
32+
33+
stubServer.stubFor(get(urlPathEqualTo("/delay/")).
34+
willReturn(aResponse()
35+
.withFixedDelay(1000)
36+
.withBody(stubResponse)));
37+
38+
stubServer.stubFor(get(urlPathEqualTo("/randomDelay/")).
39+
willReturn(aResponse()
40+
.withRandomDelay(new UniformDistribution(1000, 2000))
41+
.withBody(stubResponse)));
42+
43+
stubServer.stubFor(get(urlPathEqualTo("/error/"))
44+
.willReturn(aResponse()
45+
.withStatus(500)
46+
.withRandomDelay(new UniformDistribution(1000, 2000))
47+
.withBody(stubResponse)));
48+
49+
2650
stubServer.start();
2751
return stubServer;
2852
}

parallel-consumer-examples/parallel-consumer-example-reactor/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
<name>Confluent Parallel Consumer Example - Project Reactor.io</name>
1818

1919
<dependencies>
20+
<dependency>
21+
<groupId>io.projectreactor.netty</groupId>
22+
<artifactId>reactor-netty-http</artifactId>
23+
<version>1.0.13</version>
24+
</dependency>
2025
<!-- tag::exampleDep[] -->
2126
<dependency>
2227
<groupId>io.confluent.parallelconsumer</groupId>

parallel-consumer-examples/parallel-consumer-example-reactor/src/main/java/io/confluent/parallelconsumer/examples/reactor/ReactorApp.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,19 @@ public class ReactorApp {
2525

2626
static String inputTopic = "input-topic-" + RandomUtils.nextInt();
2727

28-
Consumer<String, String> getKafkaConsumer() {
28+
Consumer<String, String> createKafkaConsumer() {
2929
return new KafkaConsumer<>(new Properties());
3030
}
3131

32-
Producer<String, String> getKafkaProducer() {
32+
Producer<String, String> createKafkaProducer() {
3333
return new KafkaProducer<>(new Properties());
3434
}
3535

3636
ReactorProcessor<String, String> parallelConsumer;
3737

38-
3938
void run() {
40-
Consumer<String, String> kafkaConsumer = getKafkaConsumer();
41-
Producer<String, String> kafkaProducer = getKafkaProducer();
39+
Consumer<String, String> kafkaConsumer = createKafkaConsumer();
40+
Producer<String, String> kafkaProducer = createKafkaProducer();
4241
var options = ParallelConsumerOptions.<String, String>builder()
4342
.ordering(ParallelConsumerOptions.ProcessingOrder.KEY)
4443
.consumer(kafkaConsumer)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package io.confluent.parallelconsumer.examples.reactor;
2+
3+
import com.github.tomakehurst.wiremock.WireMockServer;
4+
import io.confluent.parallelconsumer.reactor.ReactorProcessor;
5+
import lombok.extern.slf4j.Slf4j;
6+
import org.apache.kafka.clients.consumer.ConsumerRecord;
7+
import org.apache.kafka.clients.producer.Producer;
8+
import org.apache.kafka.clients.producer.ProducerRecord;
9+
import reactor.core.publisher.Flux;
10+
import reactor.core.publisher.Mono;
11+
import reactor.util.retry.RetrySpec;
12+
13+
import java.net.URI;
14+
import java.net.URLEncoder;
15+
import java.net.http.HttpClient;
16+
import java.net.http.HttpRequest;
17+
import java.net.http.HttpResponse;
18+
import java.time.Duration;
19+
import java.util.List;
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.stream.Collectors;
22+
import java.util.stream.IntStream;
23+
24+
import static pl.tlinkowski.unij.api.UniLists.of;
25+
26+
@Slf4j
27+
public class ParallelServiceRouter {
28+
29+
private final WireMockServer wireMock;
30+
Duration TIMEOUT = Duration.ofMinutes(1);
31+
ReactorProcessor<String, String> pc;
32+
HttpClient client = HttpClient.newHttpClient();
33+
HttpResponse.BodyHandler<String> responseBodyHandler = HttpResponse.BodyHandlers.ofString();
34+
int NUMBER_OF_ENDPOINTS_TO_SIMULATE = 2;
35+
Producer<String, String> producer;
36+
37+
public ParallelServiceRouter(ReactorProcessor<String, String> pc, WireMockServer wireMockServer) {
38+
this.pc = pc;
39+
this.wireMock = wireMockServer;
40+
41+
pc.subscribe(of(ReactorApp.inputTopic));
42+
43+
pc.react(rec -> {
44+
List<String> endpoints = getEndPointsForRecord(rec);
45+
List<HttpRequest> requests = buildRequests(endpoints);
46+
47+
Mono<List<HttpResponse<String>>> results = scatterGather(requests);
48+
results.subscribe(allResults -> {
49+
log.info("Subscribe all count: {}: {}", allResults.size(), allResults);
50+
});
51+
return results;
52+
});
53+
}
54+
55+
private Mono<List<HttpResponse<String>>> scatterGather(List<HttpRequest> requests) {
56+
Flux<HttpResponse<String>> httpResponseFlux = sendRequests(requests);
57+
return gatherResults(httpResponseFlux);
58+
}
59+
60+
private Mono<List<HttpResponse<String>>> gatherResults(Flux<HttpResponse<String>> httpResponseFlux) {
61+
return httpResponseFlux.collectList();
62+
}
63+
64+
private Flux<HttpResponse<String>> sendRequests(List<HttpRequest> requests) {
65+
return Flux.fromIterable(requests)
66+
.flatMap(request -> {
67+
log.info("Sending: {}", request);
68+
CompletableFuture<HttpResponse<String>> future = client.sendAsync(request, responseBodyHandler);
69+
future.whenComplete((stringHttpResponse, throwable) -> {
70+
processComplete(request, stringHttpResponse, throwable);
71+
});
72+
73+
Mono<HttpResponse<String>> httpResponseMono = Mono.fromFuture(future);
74+
return httpResponseMono;
75+
})
76+
.doOnEach(httpResponseSignal ->
77+
log.info("Response received: {}", httpResponseSignal));
78+
}
79+
80+
/**
81+
* Either:
82+
*
83+
* retry locally,
84+
*
85+
* throw exception to have PC retry whole record,
86+
*
87+
* skip and log,
88+
*
89+
* or publish to a DLQ for handling elsewhere, with metadata attached
90+
*/
91+
private void processComplete(HttpRequest request, HttpResponse<String> stringHttpResponse, Throwable throwable) {
92+
log.info("When complete {}.{}", stringHttpResponse, throwable);
93+
boolean failed = false;
94+
if (failed) {
95+
// build DLQ record with whatever metadata you need for later retry
96+
ProducerRecord<String, String> dlq = new ProducerRecord<>("DLQ", request.toString(), stringHttpResponse.toString());
97+
producer.send(dlq);
98+
}
99+
}
100+
101+
/**
102+
* Uses the simulated delay endpoint
103+
*
104+
* @see io.confluent.csid.utils.WireMockUtils
105+
*/
106+
private List<HttpRequest> buildRequests(List<String> endpoints) {
107+
HttpRequest.Builder builder = HttpRequest.newBuilder();
108+
return endpoints.stream().map(x -> builder.GET()
109+
.uri(URI.create(wireMock.baseUrl() + "/error/?endpoint=" + x))
110+
.build())
111+
.collect(Collectors.toList());
112+
}
113+
114+
/**
115+
* Given a record, returns a list of filtered servers and end points that it should be routed to
116+
*/
117+
private List<String> getEndPointsForRecord(ConsumerRecord<String, String> rec) {
118+
// String payload = "Offset " + rec.offset() + " value " + rec.value();
119+
String payload = "o " + rec.offset();
120+
return IntStream.range(0, NUMBER_OF_ENDPOINTS_TO_SIMULATE)
121+
.mapToObj(x -> {
122+
String requestPath = "EP " + x + " " + payload;
123+
return URLEncoder.encode(requestPath);
124+
})
125+
.collect(Collectors.toList());
126+
}
127+
128+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package io.confluent.parallelconsumer.examples.reactor;
2+
3+
import com.github.tomakehurst.wiremock.WireMockServer;
4+
import io.confluent.csid.utils.KafkaTestUtils;
5+
import io.confluent.csid.utils.LongPollingMockConsumer;
6+
import io.confluent.csid.utils.WireMockUtils;
7+
import io.confluent.parallelconsumer.ParallelConsumerOptions;
8+
import io.confluent.parallelconsumer.reactor.ReactorProcessor;
9+
import io.confluent.parallelconsumer.truth.LongPollingMockConsumerSubject;
10+
import org.apache.kafka.clients.consumer.ConsumerRecord;
11+
import org.awaitility.Awaitility;
12+
import org.junit.jupiter.api.Test;
13+
import org.junit.jupiter.api.Timeout;
14+
15+
import java.time.Duration;
16+
17+
import static io.confluent.parallelconsumer.examples.reactor.ReactorApp.inputTopic;
18+
import static pl.tlinkowski.unij.api.UniLists.of;
19+
20+
21+
@Timeout(20)
22+
class ParallelServiceRouterTest {
23+
24+
WireMockServer wireMockServer = new WireMockUtils().setupWireMock();
25+
int port = wireMockServer.port();
26+
27+
@Test
28+
void sendParallelWebRequests() {
29+
ReactorAppTest.ReactorAppAppUnderTest app = new ReactorAppTest.ReactorAppAppUnderTest(port);
30+
LongPollingMockConsumer<String, String> mockConsumer = (LongPollingMockConsumer<String, String>)app.createKafkaConsumer();
31+
var options = ParallelConsumerOptions.<String, String>builder()
32+
.ordering(ParallelConsumerOptions.ProcessingOrder.KEY)
33+
.consumer(mockConsumer)
34+
.producer(app.createKafkaProducer())
35+
.build();
36+
37+
ReactorProcessor<String, String> pc = new ReactorProcessor<>(options);
38+
39+
ParallelServiceRouter router = new ParallelServiceRouter(pc, wireMockServer);
40+
mockConsumer.subscribeWithRebalanceAndAssignment(of(inputTopic), 1);
41+
42+
String s = "a key 1";
43+
String a_value = "a value";
44+
addRecord(mockConsumer, s, a_value, 0);
45+
addRecord(mockConsumer, "a key 2", a_value, 1);
46+
addRecord(mockConsumer, "a key 3", a_value, 2);
47+
48+
Awaitility.await().pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
49+
KafkaTestUtils.assertLastCommitIs(mockConsumer, 3);
50+
});
51+
mockConsumer.close();
52+
}
53+
54+
private void addRecord(LongPollingMockConsumer<String, String> mc, String s, String a_value, int i) {
55+
mc.addRecord(new ConsumerRecord<>(inputTopic, 0, i, s, a_value));
56+
}
57+
58+
static class ParallelReactorAppAppUnderTest extends ReactorAppTest.ReactorAppAppUnderTest {
59+
60+
public ParallelReactorAppAppUnderTest(int port) {
61+
super(port);
62+
}
63+
64+
}
65+
66+
}

parallel-consumer-examples/parallel-consumer-example-reactor/src/test/java/io/confluent/parallelconsumer/examples/reactor/ReactorAppTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.confluent.csid.utils.KafkaTestUtils;
99
import io.confluent.csid.utils.LongPollingMockConsumer;
1010
import io.confluent.csid.utils.WireMockUtils;
11+
import lombok.Getter;
1112
import lombok.RequiredArgsConstructor;
1213
import lombok.SneakyThrows;
1314
import lombok.extern.slf4j.Slf4j;
@@ -31,7 +32,7 @@
3132
@Slf4j
3233
class ReactorAppTest {
3334

34-
TopicPartition tp = new TopicPartition(ReactorApp.inputTopic, 0);
35+
static TopicPartition tp = new TopicPartition(ReactorApp.inputTopic, 0);
3536

3637
@Timeout(20)
3738
@SneakyThrows
@@ -57,14 +58,15 @@ void test() {
5758
}
5859

5960
@RequiredArgsConstructor
60-
class ReactorAppAppUnderTest extends ReactorApp {
61+
static class ReactorAppAppUnderTest extends ReactorApp {
6162

6263
private final int port;
6364

65+
@Getter
6466
LongPollingMockConsumer<String, String> mockConsumer = Mockito.spy(new LongPollingMockConsumer<>(OffsetResetStrategy.EARLIEST));
6567

6668
@Override
67-
Consumer<String, String> getKafkaConsumer() {
69+
Consumer<String, String> createKafkaConsumer() {
6870
HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
6971
beginningOffsets.put(tp, 0L);
7072
mockConsumer.updateBeginningOffsets(beginningOffsets);
@@ -73,7 +75,7 @@ Consumer<String, String> getKafkaConsumer() {
7375
}
7476

7577
@Override
76-
Producer<String, String> getKafkaProducer() {
78+
Producer<String, String> createKafkaProducer() {
7779
return new MockProducer<>(true, null, null);
7880
}
7981

parallel-consumer-examples/parallel-consumer-example-reactor/src/test/resources/logback-test.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
</root>
1818

1919
<logger name="io.confluent.csid" level="info"/>
20-
<!-- <logger name="io.confluent.csid" level="debug" />-->
20+
<logger name="io.confluent.parallelconsumer.reactor" level="debug" />
2121
<!-- <logger name="io.confluent.csid" level="trace" />-->
2222

2323
<!-- <logger name="io.confluent.csid.asyncconsumer.WorkManager" level="info" />-->

0 commit comments

Comments
 (0)