Skip to content

Commit ca65727

Browse files
garyrussellartembilan
authored andcommitted
GH-691: Add CompositeProducerListener
Fixes #691 **cherry-pick to 2.1.x** * Polishing - PR Comments Improve test. (cherry picked from commit 2355bda)
1 parent 221ead5 commit ca65727

File tree

2 files changed

+105
-5
lines changed

2 files changed

+105
-5
lines changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support;
18+
19+
import java.util.Arrays;
20+
import java.util.List;
21+
import java.util.concurrent.CopyOnWriteArrayList;
22+
23+
import org.apache.kafka.clients.producer.ProducerRecord;
24+
import org.apache.kafka.clients.producer.RecordMetadata;
25+
26+
import org.springframework.util.Assert;
27+
28+
/**
29+
* A {@link ProducerListener} that delegates to a collection of listeners.
30+
*
31+
* @param <K> the key type.
32+
* @param <V> the value type.
33+
* @author Gary Russell
34+
*
35+
* @since 2.1.6
36+
*
37+
*/
38+
public class CompositeProducerListener<K, V> implements ProducerListener<K, V> {
39+
40+
private final List<ProducerListener<K, V>> delegates = new CopyOnWriteArrayList<>();
41+
42+
@SafeVarargs
43+
public CompositeProducerListener(ProducerListener<K, V>... delegates) {
44+
setDelegates(delegates);
45+
}
46+
47+
@SafeVarargs
48+
public final void setDelegates(ProducerListener<K, V>... delegates) {
49+
Assert.notNull(delegates, "'delegates' cannot be null");
50+
this.delegates.clear();
51+
this.delegates.addAll(Arrays.asList(delegates));
52+
}
53+
54+
protected List<ProducerListener<K, V>> getDelegates() {
55+
return this.delegates;
56+
}
57+
58+
public void addDelegate(ProducerListener<K, V> delegate) {
59+
this.delegates.add(delegate);
60+
}
61+
62+
public boolean removeDelegate(ProducerListener<K, V> delegate) {
63+
return this.delegates.remove(delegate);
64+
}
65+
66+
@Override
67+
public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
68+
this.delegates.forEach(d -> d.onSuccess(producerRecord, recordMetadata));
69+
}
70+
71+
@Override
72+
public void onError(ProducerRecord<K, V> producerRecord, Exception exception) {
73+
this.delegates.forEach(d -> d.onError(producerRecord, exception));
74+
}
75+
76+
}

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323
import static org.springframework.kafka.test.assertj.KafkaConditions.timestamp;
2424
import static org.springframework.kafka.test.assertj.KafkaConditions.value;
2525

26+
import java.util.ArrayList;
2627
import java.util.Iterator;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.concurrent.CountDownLatch;
3031
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicInteger;
3133
import java.util.concurrent.atomic.AtomicReference;
3234

3335
import org.apache.kafka.clients.consumer.Consumer;
@@ -49,6 +51,7 @@
4951
import org.junit.Test;
5052

5153
import org.springframework.kafka.support.Acknowledgment;
54+
import org.springframework.kafka.support.CompositeProducerListener;
5255
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
5356
import org.springframework.kafka.support.KafkaHeaders;
5457
import org.springframework.kafka.support.ProducerListener;
@@ -242,23 +245,44 @@ public void withListener() throws Exception {
242245
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
243246
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
244247
template.setDefaultTopic(INT_KEY_TOPIC);
245-
final CountDownLatch latch = new CountDownLatch(1);
246-
template.setProducerListener(new ProducerListener<Integer, String>() {
248+
final CountDownLatch latch = new CountDownLatch(2);
249+
final List<ProducerRecord<Integer, String>> records = new ArrayList<>();
250+
final List<RecordMetadata> meta = new ArrayList<>();
251+
final AtomicInteger onErrorDelegateCalls = new AtomicInteger();
252+
class PL implements ProducerListener<Integer, String> {
247253

248254
@Override
249-
public void onSuccess(String topic, Integer partition, Integer key, String value,
250-
RecordMetadata recordMetadata) {
255+
public void onSuccess(ProducerRecord<Integer, String> record, RecordMetadata recordMetadata) {
256+
records.add(record);
257+
meta.add(recordMetadata);
251258
latch.countDown();
252259
}
253260

254-
});
261+
@Override
262+
public void onError(ProducerRecord<Integer, String> producerRecord, Exception exception) {
263+
assertThat(producerRecord).isNotNull();
264+
assertThat(exception).isNotNull();
265+
onErrorDelegateCalls.incrementAndGet();
266+
}
267+
268+
}
269+
PL pl1 = new PL();
270+
PL pl2 = new PL();
271+
CompositeProducerListener<Integer, String> cpl = new CompositeProducerListener<>(pl1, pl2);
272+
template.setProducerListener(cpl);
255273
template.sendDefault("foo");
256274
template.flush();
257275
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
276+
assertThat(records.get(0).value()).isEqualTo("foo");
277+
assertThat(records.get(1).value()).isEqualTo("foo");
278+
assertThat(meta.get(0).topic()).isEqualTo(INT_KEY_TOPIC);
279+
assertThat(meta.get(1).topic()).isEqualTo(INT_KEY_TOPIC);
258280

259281
//Drain the topic
260282
KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
261283
pf.destroy();
284+
cpl.onError(records.get(0), new RuntimeException("x"));
285+
assertThat(onErrorDelegateCalls.get()).isEqualTo(2);
262286
}
263287

264288
@Test

0 commit comments

Comments
 (0)