Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Mon Mar 07 20:47:12 EST 2016
#Wed Mar 30 18:31:11 EDT 2016
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.11-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-2.12-bin.zip
10 changes: 3 additions & 7 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ case "`uname`" in
;;
esac

# For Cygwin, ensure paths are in UNIX format before anything is touched.
if $cygwin ; then
[ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
fi

# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
Expand All @@ -61,9 +56,9 @@ while [ -h "$PRG" ] ; do
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >&-
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >&-
cd "$SAVED" >/dev/null

CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar

Expand Down Expand Up @@ -114,6 +109,7 @@ fi
if $cygwin ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`

# We build the pattern for arguments to be converted via cygpath
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.kafka.clients.producer.RecordMetadata;

import org.springframework.messaging.Message;

/**
* The basic Kafka operations contract.
*
Expand Down Expand Up @@ -94,9 +96,18 @@ public interface KafkaOperations<K, V> {
*/
Future<RecordMetadata> send(String topic, int partition, K key, V data);

/**
* Send a message with routing information in message headers.
* @param message the message to send.
* @return a Future for the {@link RecordMetadata}.
* @see org.springframework.kafka.support.KafkaHeaders#TOPIC
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID
* @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY
*/
Future<RecordMetadata> send(Message<?> message);

// Sync methods

// Sync methods

/**
* Send the data to the default topic with no key or partition;
Expand Down Expand Up @@ -180,6 +191,19 @@ public interface KafkaOperations<K, V> {
RecordMetadata syncSend(String topic, int partition, K key, V data)
throws InterruptedException, ExecutionException;

/**
* Send a message with routing information in message headers.
* @param message the message to send.
* @return a Future for the {@link RecordMetadata}.
* @throws ExecutionException execution exception while awaiting result.
* @throws InterruptedException thread interrupted while awaiting result.
* @see org.springframework.kafka.support.KafkaHeaders#TOPIC
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID
* @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY
*/
RecordMetadata syncSend(Message<?> message)
throws InterruptedException, ExecutionException;

/**
* Flush the producer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.ProducerListenerInvokingCallback;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;


/**
Expand Down Expand Up @@ -88,28 +91,28 @@ public void setProducerListener(ProducerListener<K, V> producerListener) {
}

@Override
public Future<RecordMetadata> send(V data) {
public Future<RecordMetadata> send(V data) {
return send(this.defaultTopic, data);
}

@Override
public Future<RecordMetadata> send(K key, V data) {
public Future<RecordMetadata> send(K key, V data) {
return send(this.defaultTopic, key, data);
}

@Override
public Future<RecordMetadata> send(int partition, K key, V data) {
public Future<RecordMetadata> send(int partition, K key, V data) {
return send(this.defaultTopic, partition, key, data);
}

@Override
public Future<RecordMetadata> send(String topic, V data) {
public Future<RecordMetadata> send(String topic, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
return doSend(producerRecord);
}

@Override
public Future<RecordMetadata> send(String topic, K key, V data) {
public Future<RecordMetadata> send(String topic, K key, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
return doSend(producerRecord);
}
Expand All @@ -121,11 +124,16 @@ public Future<RecordMetadata> send(String topic, int partition, V data) {
}

@Override
public Future<RecordMetadata> send(String topic, int partition, K key, V data) {
public Future<RecordMetadata> send(String topic, int partition, K key, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);
return doSend(producerRecord);
}

@Override
public Future<RecordMetadata> send(Message<?> message) {
ProducerRecord<K, V> producerRecord = messageToProducerRecord(message);
return doSend(producerRecord);
}

@Override
public RecordMetadata syncSend(V data) throws InterruptedException, ExecutionException {
Expand Down Expand Up @@ -180,6 +188,19 @@ public RecordMetadata syncSend(String topic, int partition, K key, V data)
return future.get();
}

@Override
public RecordMetadata syncSend(Message<?> message)
throws InterruptedException, ExecutionException {
Future<RecordMetadata> future = send(message);
flush();
return future.get();
}

@Override
public void flush() {
this.producer.flush();
}

/**
* Send the producer record.
* @param producerRecord the producer record.
Expand Down Expand Up @@ -211,9 +232,14 @@ protected Future<RecordMetadata> doSend(ProducerRecord<K, V> producerRecord) {
return future;
}

@Override
public void flush() {
this.producer.flush();
@SuppressWarnings({ "rawtypes", "unchecked" })
private ProducerRecord<K, V> messageToProducerRecord(Message<?> message) {
MessageHeaders headers = message.getHeaders();
String topic = headers.get(KafkaHeaders.TOPIC, String.class);
Integer partition = headers.get(KafkaHeaders.PARTITION_ID, Integer.class);
Object key = headers.get(KafkaHeaders.MESSAGE_KEY);
Object payload = message.getPayload();
return new ProducerRecord(topic == null ? this.defaultTopic : topic, partition, key, payload);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,24 @@
*
* @author Artem Bilan
* @author Marius Bogoevici
* @author Gary Russell
*/
public abstract class KafkaHeaders {

private static final String PREFIX = "kafka_";

/**
* The header for topic.
* The header containing the topic when sending data to Kafka.
*/
public static final String TOPIC = PREFIX + "topic";


/**
* The header for message key.
* The header containing the message key when sending data to Kafka.
*/
public static final String MESSAGE_KEY = PREFIX + "messageKey";

/**
* The header for topic partition.
* The header containing the topic partition when sending data to Kafka.
*/
public static final String PARTITION_ID = PREFIX + "partitionId";

Expand All @@ -52,4 +52,19 @@ public abstract class KafkaHeaders {
*/
public static final String ACKNOWLEDGMENT = PREFIX + "acknowledgment";

/**
* The header containing the topic from which the message was received.
*/
public static final String RECEIVED_TOPIC = PREFIX + "receivedTopic";

/**
* The header containing the message key for the received message.
*/
public static final String RECEIVED_MESSAGE_KEY = PREFIX + "receivedMessageKey";

/**
* The header containing the topic partition for the received message.
*/
public static final String RECEIVED_PARTITION_ID = PREFIX + "receivedPartitionId";

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ public Message<?> toMessage(ConsumerRecord<K, V> record, Acknowledgment acknowle
KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp);

Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
rawHeaders.put(KafkaHeaders.MESSAGE_KEY, record.key());
rawHeaders.put(KafkaHeaders.TOPIC, record.topic());
rawHeaders.put(KafkaHeaders.PARTITION_ID, record.partition());
rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, record.key());
rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, record.topic());
rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, record.partition());
rawHeaders.put(KafkaHeaders.OFFSET, record.offset());

if (acknowledgment != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,12 @@ public void testSimple() throws Exception {
assertThat(this.listener.latch1.await(10, TimeUnit.SECONDS)).isTrue();

waitListening("bar");
template.send("annotated2", 0, "foo");
template.send("annotated2", 0, 123, "foo");
template.flush();
assertThat(this.listener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.key).isEqualTo(123);
assertThat(this.listener.partition).isNotNull();
assertThat(this.listener.topic).isEqualTo("annotated2");

waitListening("baz");
template.send("annotated3", 0, "foo");
Expand Down Expand Up @@ -200,14 +202,23 @@ public static class Listener {

private volatile Acknowledgment ack;

private Integer key;

private String topic;

@KafkaListener(id = "foo", topics = "annotated1")
public void listen1(String foo) {
this.latch1.countDown();
}

@KafkaListener(id = "bar", topicPattern = "annotated2")
public void listen2(@Payload String foo, @Header(KafkaHeaders.PARTITION_ID) int partitionHeader) {
this.partition = partitionHeader;
public void listen2(@Payload String foo,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
this.key = key;
this.partition = partition;
this.topic = topic;
this.latch2.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import org.springframework.kafka.listener.ContainerTestUtils;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.ProducerListenerAdapter;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.support.MessageBuilder;


/**
Expand All @@ -64,7 +66,6 @@ public void testTemplate() throws Exception {

@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println(record);
records.add(record);
}

Expand Down Expand Up @@ -93,6 +94,23 @@ public void onMessage(ConsumerRecord<Integer, String> record) {
assertThat(received).has(key((Integer) null));
assertThat(received).has(partition(0));
assertThat(received).has(value("qux"));
template.syncSend(MessageBuilder.withPayload("fiz")
.setHeader(KafkaHeaders.TOPIC, TEMPLATE_TOPIC)
.setHeader(KafkaHeaders.PARTITION_ID, 0)
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
.build());
received = records.poll(10, TimeUnit.SECONDS);
assertThat(received).has(key(2));
assertThat(received).has(partition(0));
assertThat(received).has(value("fiz"));
template.syncSend(MessageBuilder.withPayload("buz")
.setHeader(KafkaHeaders.PARTITION_ID, 0)
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
.build());
received = records.poll(10, TimeUnit.SECONDS);
assertThat(received).has(key(2));
assertThat(received).has(partition(0));
assertThat(received).has(value("buz"));
}

@Test
Expand Down
26 changes: 26 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Future<RecordMetadata> send(String topic, int partition, V data);

Future<RecordMetadata> send(String topic, int partition, K key, V data);

Future<RecordMetadata> send(Message<?> message);

// Sync methods

Expand All @@ -49,6 +50,9 @@ RecordMetadata syncSend(String topic, int partition, V data)
RecordMetadata syncSend(String topic, int partition, K key, V data)
throws InterruptedException, ExecutionException;

RecordMetadata syncSend(Message<?> message)
throws InterruptedException, ExecutionException;

// Flush the producer.

void flush();
Expand Down Expand Up @@ -81,6 +85,15 @@ The template can also be configured using standard `<bean/>` definitions.

Then, to use the template, simply invoke one of its methods.

When using the methods with a `Message<?>` parameter, topic, partition and key information is provided in a message
header:

- `KafkaHeaders.TOPIC`
- `KafkaHeaders.PARTITION_ID`
- `KafkaHeaders.MESSAGE_KEY`

with the message payload being the data.

Optionally, you can configure the `KafkaTemplate` with a `ProducerListener` to get an async callback with the
results of the send (success or failure) instead of waiting for the `Future` to complete.

Expand Down Expand Up @@ -276,3 +289,16 @@ public void listen(String data, Acknowledgment ack) {
ack.acknowledge();
}
----

Finally, metadata about the message is available from message headers:

[source, java]
----
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
----