From a3fc47ebd27a460bcbc2efdf3041e0c169dabef2 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 30 Mar 2016 17:24:57 -0400 Subject: [PATCH 1/3] GH-45: Add RECEIVED... Headers Resolves #45 Avoid propagation of received headers when sending `o.s.messaging.Message`s. Also add `Message` methods to the template. --- gradle/wrapper/gradle-wrapper.properties | 2 +- .../kafka/core/KafkaOperations.java | 26 +++++++++++- .../kafka/core/KafkaTemplate.java | 40 +++++++++++++++++-- .../kafka/support/KafkaHeaders.java | 23 +++++++++-- .../converter/MessagingMessageConverter.java | 6 +-- .../EnableKafkaIntegrationTests.java | 17 ++++++-- .../kafka/core/KafkaTemplateTests.java | 20 +++++++++- src/reference/asciidoc/kafka.adoc | 26 ++++++++++++ 8 files changed, 144 insertions(+), 16 deletions(-) diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ab8d9dbee6..d25cee1dbc 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.11-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-2.12-bin.zip diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java index 75cd7ffb4c..766d67ab8c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java @@ -21,6 +21,8 @@ import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.messaging.Message; + /** * The basic Kafka operations contract. * @@ -94,9 +96,18 @@ public interface KafkaOperations { */ Future send(String topic, int partition, K key, V data); + /** + * Send a message with routing information in message headers. + * @param message the message to send. + * @return a Future for the {@link RecordMetadata}. + * @see org.springframework.kafka.support.KafkaHeaders#TOPIC + * @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID + * @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY + */ + Future send(Message message); - // Sync methods + // Sync methods /** * Send the data to the default topic with no key or partition; @@ -180,6 +191,19 @@ public interface KafkaOperations { RecordMetadata syncSend(String topic, int partition, K key, V data) throws InterruptedException, ExecutionException; + /** + * Send a message with routing information in message headers. + * @param message the message to send. + * @return a Future for the {@link RecordMetadata}. + * @throws ExecutionException execution exception while awaiting result. + * @throws InterruptedException thread interrupted while awaiting result. + * @see org.springframework.kafka.support.KafkaHeaders#TOPIC + * @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID + * @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY + */ + RecordMetadata syncSend(Message message) + throws InterruptedException, ExecutionException; + /** * Flush the producer. */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 1a1594c960..3328d5dd9b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -25,9 +25,13 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.ProducerListenerInvokingCallback; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.util.Assert; /** @@ -126,6 +130,11 @@ public Future send(String topic, int partition, K key, V data) return doSend(producerRecord); } + @Override + public Future send(Message message) { + ProducerRecord producerRecord = messageToProducerRecord(message); + return doSend(producerRecord); + } @Override public RecordMetadata syncSend(V data) throws InterruptedException, ExecutionException { @@ -180,6 +189,19 @@ public RecordMetadata syncSend(String topic, int partition, K key, V data) return future.get(); } + @Override + public RecordMetadata syncSend(Message message) + throws InterruptedException, ExecutionException { + Future future = send(message); + flush(); + return future.get(); + } + + @Override + public void flush() { + this.producer.flush(); + } + /** * Send the producer record. * @param producerRecord the producer record. @@ -211,9 +233,21 @@ protected Future doSend(ProducerRecord producerRecord) { return future; } - @Override - public void flush() { - this.producer.flush(); + private ProducerRecord messageToProducerRecord(Message message) { + MessageHeaders headers = message.getHeaders(); + Object topic = headers.get(KafkaHeaders.TOPIC); + Object partition = headers.get(KafkaHeaders.PARTITION_ID); + Object key = headers.get(KafkaHeaders.MESSAGE_KEY); + Object payload = message.getPayload(); + Assert.isTrue(topic == null || topic instanceof String, "topic header must be a String"); + Assert.isTrue(partition == null || partition instanceof Integer, "partition header must be an Integer"); + @SuppressWarnings({ "rawtypes", "unchecked" }) + ProducerRecord producerRecord = new ProducerRecord( + topic == null ? this.defaultTopic : (String) topic, + ((Integer) partition).intValue(), + key, + payload); + return producerRecord; } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java index ed128b6e78..9a5ce0b54d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java @@ -21,24 +21,24 @@ * * @author Artem Bilan * @author Marius Bogoevici + * @author Gary Russell */ public abstract class KafkaHeaders { private static final String PREFIX = "kafka_"; /** - * The header for topic. + * The header containing the topic when sending data to Kafka. */ public static final String TOPIC = PREFIX + "topic"; - /** - * The header for message key. + * The header containing the message key when sending data to Kafka. */ public static final String MESSAGE_KEY = PREFIX + "messageKey"; /** - * The header for topic partition. + * The header containing the topic partition when sending data to Kafka. */ public static final String PARTITION_ID = PREFIX + "partitionId"; @@ -52,4 +52,19 @@ public abstract class KafkaHeaders { */ public static final String ACKNOWLEDGMENT = PREFIX + "acknowledgment"; + /** + * The header containing the topic from which the message was received. + */ + public static final String RECEIVED_TOPIC = PREFIX + "receivedTopic"; + + /** + * The header containing the message key for the received message. + */ + public static final String RECEIVED_MESSAGE_KEY = PREFIX + "receivedMessageKey"; + + /** + * The header containing the topic partition for the received message. + */ + public static final String RECEIVED_PARTITION_ID = PREFIX + "receivedPartitionId"; + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java index cca74f41da..3ae649c847 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java @@ -66,9 +66,9 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp); Map rawHeaders = kafkaMessageHeaders.getRawHeaders(); - rawHeaders.put(KafkaHeaders.MESSAGE_KEY, record.key()); - rawHeaders.put(KafkaHeaders.TOPIC, record.topic()); - rawHeaders.put(KafkaHeaders.PARTITION_ID, record.partition()); + rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, record.key()); + rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, record.topic()); + rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, record.partition()); rawHeaders.put(KafkaHeaders.OFFSET, record.offset()); if (acknowledgment != null) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index b91476395a..ab206a1ca4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -83,10 +83,12 @@ public void testSimple() throws Exception { assertThat(this.listener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); waitListening("bar"); - template.send("annotated2", 0, "foo"); + template.send("annotated2", 0, 123, "foo"); template.flush(); assertThat(this.listener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.listener.key).isEqualTo(123); assertThat(this.listener.partition).isNotNull(); + assertThat(this.listener.topic).isEqualTo("annotated2"); waitListening("baz"); template.send("annotated3", 0, "foo"); @@ -200,14 +202,23 @@ public static class Listener { private volatile Acknowledgment ack; + private Integer key; + + private String topic; + @KafkaListener(id = "foo", topics = "annotated1") public void listen1(String foo) { this.latch1.countDown(); } @KafkaListener(id = "bar", topicPattern = "annotated2") - public void listen2(@Payload String foo, @Header(KafkaHeaders.PARTITION_ID) int partitionHeader) { - this.partition = partitionHeader; + public void listen2(@Payload String foo, + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + this.key = key; + this.partition = partition; + this.topic = topic; this.latch2.countDown(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java index c923c25a9b..98695840f2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java @@ -35,9 +35,11 @@ import org.springframework.kafka.listener.ContainerTestUtils; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.listener.MessageListener; +import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.ProducerListenerAdapter; import org.springframework.kafka.test.rule.KafkaEmbedded; import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.support.MessageBuilder; /** @@ -64,7 +66,6 @@ public void testTemplate() throws Exception { @Override public void onMessage(ConsumerRecord record) { - System.out.println(record); records.add(record); } @@ -93,6 +94,23 @@ public void onMessage(ConsumerRecord record) { assertThat(received).has(key((Integer) null)); assertThat(received).has(partition(0)); assertThat(received).has(value("qux")); + template.syncSend(MessageBuilder.withPayload("fiz") + .setHeader(KafkaHeaders.TOPIC, TEMPLATE_TOPIC) + .setHeader(KafkaHeaders.PARTITION_ID, 0) + .setHeader(KafkaHeaders.MESSAGE_KEY, 2) + .build()); + received = records.poll(10, TimeUnit.SECONDS); + assertThat(received).has(key(2)); + assertThat(received).has(partition(0)); + assertThat(received).has(value("fiz")); + template.syncSend(MessageBuilder.withPayload("buz") + .setHeader(KafkaHeaders.PARTITION_ID, 0) + .setHeader(KafkaHeaders.MESSAGE_KEY, 2) + .build()); + received = records.poll(10, TimeUnit.SECONDS); + assertThat(received).has(key(2)); + assertThat(received).has(partition(0)); + assertThat(received).has(value("buz")); } @Test diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 766bb9d415..752a58b2d2 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -24,6 +24,7 @@ Future send(String topic, int partition, V data); Future send(String topic, int partition, K key, V data); +Future send(Message message); // Sync methods @@ -49,6 +50,9 @@ RecordMetadata syncSend(String topic, int partition, V data) RecordMetadata syncSend(String topic, int partition, K key, V data) throws InterruptedException, ExecutionException; +RecordMetadata syncSend(Message message) + throws InterruptedException, ExecutionException; + // Flush the producer. void flush(); @@ -81,6 +85,15 @@ The template can also be configured using standard `` definitions. Then, to use the template, simply invoke one of its methods. +When using the methods with a `Message` parameter, topic, partition and key information is provided in a message +header: + +- `KafkaHeaders.TOPIC` +- `KafkaHeaders.PARTITION_ID` +- `KafkaHeaders.MESSAGE_KEY` + +with the message payload being the data. + Optionally, you can configure the `KafkaTemplate` with a `ProducerListener` to get an async callback with the results of the send (success or failure) instead of waiting for the `Future` to complete. @@ -276,3 +289,16 @@ public void listen(String data, Acknowledgment ack) { ack.acknowledge(); } ---- + +Finally, metadata about the message is available from message headers: + +[source, java] +---- +@KafkaListener(id = "qux", topicPattern = "myTopic1") +public void listen(@Payload String foo, + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + ... +} +---- From 213cfcc3d9afaa3718e4102050af3409b4e3face Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 30 Mar 2016 18:45:23 -0400 Subject: [PATCH 2/3] GH-45: Polishing --- gradle/wrapper/gradle-wrapper.jar | Bin 53638 -> 53639 bytes gradle/wrapper/gradle-wrapper.properties | 2 +- gradlew | 10 +++------- .../kafka/core/KafkaTemplate.java | 16 ++++------------ 4 files changed, 8 insertions(+), 20 deletions(-) diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 5ccda13e9cb94678ba179b32452cf3d60dc36353..2c6137b87896c8f70315ae454e00a969ef5f6019 100644 GIT binary patch delta 1762 zcmY*Z3rv$&6u$inv`UK;1cg>AAP<3I*QyZ#iC_c)5wQ50V{{aR$v}Zv1viU2n4rAw zHXk9|7`Qrh0WIp7z(AnoQJ^@Taf|a2Ky)&#+2S6eyZ^ZaY?J0Y_xrzd&i9{t+M-%+ zaV=LE7tOVri4dQUq%m2QLN7jn$jkc8K9xaR9n3lA91fb6coNBJH!cfCAAsjl7O*ep z9*a6VCYJ%?kktbqvaIWX&^huQY=H5zyG0q^Y^gOcE1W7Q(?4$`4;Zfn8yz6nFBecv z*>WdaV6@@SXF^aDdz%(4Oytq@(oKncK5-G5byoW!9(y<9ji>AU6QoPxr45a;WtU`2 z6gV_lHe()9e0DOx*@W|xJ@zjxZ^`PA3J$4Tqh=RYi36P*^Zepe8K#S-S>rwp3&X39 zuKZ}+>)vk3-r#Ei%4f$sxB9LaS)HujDXe^7zUybEDXb?bcx~Y`;brDnieS8Bhu^@# zi)Z9XTNK{gM>K{StzFB8klihJ?`O`x`sU5gV-}8QjAZ)j*LUVPyIrqWC5`6yt(%p0 z#!9U_neDrDxGwN_=a*k;wk^K$kGyU~?NHyU+9nJB^N}>+YkRTL^G?swiAc@;FTQL~ z`1XawRDG*RRQ%WZ;oFL92X>j6^@g&SuiX}TQM^~_&n2ikt^9;x11wiP1VWPf3J9HB z`a>EBcVG@Ys?C(}A?V7Ja3Of04x)i)!B5t}{HOVsivK=vg9nVMWQa0#N6s>K?2tb` z)i`&%Jwke4EG<}opXS-<4wkF!K|N7prd`c-cWH24d&vqO9X-dT&2arw`l#r_JGAtu zZWYz|es7}8M3aJQ6wR2+XS+6(y0oqhaBl8O1e~L%byfNlIQQyfrgz!Zu=cgJ-DwD62Zb99BF+ccXmEwoxIx5J zE3tII8JmOq(M($4;qUt9gR}lV5%c%} zu0H3E1x8q5>}C`(ohA5AN$}LL4-@M65lHSf${=xqP;1Hw<%16o(kqGY7cu46L2-sK*z`-)^Mgj{S93bIJ-#)}7{ zz{0)(5mR`Mcn_F*_e*UJxyMPrGh_uUZ=|?>s-Jk!o!-izh{?Y|XfYO)&SGB{JckcC zjXol?+ecbkuF)?#sBv@9N5XoObLlMC-@c~YRNFxkX96ALjV35h+ zD2{+Zvr%sKpq9kbB<)Nun7`{umQR(Dsi}T|C`9JO>Vw(zJA~TI_KVuYjpZG z+B8T*o6JW@BtrITb&jc0L_i%~`zkKSYp2zVgy#u7G$%19lCotq1Dz`XUaAwwT(i>w5|IGYWyjL<^G2gcLpdzR^1yh8|#Qoh3q7N^|BtmgcB zn+3p>`n{YFi{dRqY{1k|A!|SPd8kN4s!)f^PcFq{d;J&2YXXB+l|ib?8aGv?n@14# ziEx`o6GiTzhieZ`j&L~To$VXfBp0Vmy}5Wp^hl6PU;14cSf?F4LOr=2!c)lmPR{1u zDu|oX7Zv@Lr+RI)lv?8i#nYqH7K;7@PqaF;TsM|BDF|A<&pCZVYww=A@fnfdZ+xlzjFDU^>CNsOu?nmF*6<(c_Rciezti0&#Gq>uXKk((<6E5o#Z*5wiMSJ#WJQ>MRNPjTyoj+O%YOZ#EY@Y zxE8V(YIpUNlAf;92(9O6CQ~5$Pev)squVHg(uq1!|U1A7>LvfxWxfaC^-+{d|q|wvzPb&IvbN3|`e$ z%T+-d9<_*OKk7`6oR^AY8r5N5$y(?44abxtArU4B*)KrIi(@cgRd)as_f5BiN+~D3 ze)#SWRk(?6uIMXX&PSPF)48_qzEw&>=iDo+C#Q(aQ2$x`Orv#GZ_eiJ# zJv27Z;|K?akyk!5&^N@pf#a28S+5#w2YV&d^gVVS_br&S2D*dL{&- +cd "`dirname \"$PRG\"`/" >/dev/null APP_HOME="`pwd -P`" -cd "$SAVED" >&- +cd "$SAVED" >/dev/null CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar @@ -114,6 +109,7 @@ fi if $cygwin ; then APP_HOME=`cygpath --path --mixed "$APP_HOME"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` # We build the pattern for arguments to be converted via cygpath ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 3328d5dd9b..77d5535af8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -31,7 +31,6 @@ import org.springframework.kafka.support.ProducerListenerInvokingCallback; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; -import org.springframework.util.Assert; /** @@ -233,21 +232,14 @@ protected Future doSend(ProducerRecord producerRecord) { return future; } + @SuppressWarnings({ "rawtypes", "unchecked" }) private ProducerRecord messageToProducerRecord(Message message) { MessageHeaders headers = message.getHeaders(); - Object topic = headers.get(KafkaHeaders.TOPIC); - Object partition = headers.get(KafkaHeaders.PARTITION_ID); + String topic = headers.get(KafkaHeaders.TOPIC, String.class); + Integer partition = headers.get(KafkaHeaders.PARTITION_ID, Integer.class); Object key = headers.get(KafkaHeaders.MESSAGE_KEY); Object payload = message.getPayload(); - Assert.isTrue(topic == null || topic instanceof String, "topic header must be a String"); - Assert.isTrue(partition == null || partition instanceof Integer, "partition header must be an Integer"); - @SuppressWarnings({ "rawtypes", "unchecked" }) - ProducerRecord producerRecord = new ProducerRecord( - topic == null ? this.defaultTopic : (String) topic, - ((Integer) partition).intValue(), - key, - payload); - return producerRecord; + return new ProducerRecord(topic == null ? this.defaultTopic : topic, partition, key, payload); } } From 55b5e1206216ff6e4253373c6c9002f08a189577 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 31 Mar 2016 09:29:33 -0400 Subject: [PATCH 3/3] Remove Extra Spaces --- .../springframework/kafka/core/KafkaTemplate.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 77d5535af8..ba1b11aef8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -91,28 +91,28 @@ public void setProducerListener(ProducerListener producerListener) { } @Override - public Future send(V data) { + public Future send(V data) { return send(this.defaultTopic, data); } @Override - public Future send(K key, V data) { + public Future send(K key, V data) { return send(this.defaultTopic, key, data); } @Override - public Future send(int partition, K key, V data) { + public Future send(int partition, K key, V data) { return send(this.defaultTopic, partition, key, data); } @Override - public Future send(String topic, V data) { + public Future send(String topic, V data) { ProducerRecord producerRecord = new ProducerRecord<>(topic, data); return doSend(producerRecord); } @Override - public Future send(String topic, K key, V data) { + public Future send(String topic, K key, V data) { ProducerRecord producerRecord = new ProducerRecord<>(topic, key, data); return doSend(producerRecord); } @@ -124,7 +124,7 @@ public Future send(String topic, int partition, V data) { } @Override - public Future send(String topic, int partition, K key, V data) { + public Future send(String topic, int partition, K key, V data) { ProducerRecord producerRecord = new ProducerRecord<>(topic, partition, key, data); return doSend(producerRecord); }