Skip to content

Commit 3716111

Browse files
garyrussellartembilan
authored andcommitted
GH-568: S-I-K Gateways Documentation
Resolves #568 Also fix some PDF overflows.
1 parent 31431fa commit 3716111

File tree

3 files changed

+131
-5
lines changed

3 files changed

+131
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,14 @@ public KafkaTemplate(ProducerFactory<K, V> producerFactory) {
8686

8787
/**
8888
* Create an instance using the supplied producer factory and autoFlush setting.
89-
* Set autoFlush to true if you wish to synchronously interact with Kafka, calling
90-
* {@link java.util.concurrent.Future#get()} on the result.
89+
* <p>
90+
* Set autoFlush to {@code true} if you have configured the producer's
91+
* {@code linger.ms} to a non-default value and wish send operations on this template
92+
* to occur immediately, regardless of that setting, or if you wish to block until the
93+
* broker has acknowledged receipt according to the producer's {@code acks} property.
9194
* @param producerFactory the producer factory.
9295
* @param autoFlush true to flush after each send.
96+
* @see Producer#flush()
9397
*/
9498
public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
9599
this.producerFactory = producerFactory;

src/reference/asciidoc/kafka.adoc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,13 +373,15 @@ public class KRequestingApplication {
373373
}
374374
375375
@Bean
376-
public ReplyingKafkaTemplate<String, String, String> kafkaTemplate(ProducerFactory<String, String> pf,
376+
public ReplyingKafkaTemplate<String, String, String> kafkaTemplate(
377+
ProducerFactory<String, String> pf,
377378
KafkaMessageListenerContainer<String, String> replyContainer) {
378379
return new ReplyingKafkaTemplate<>(pf, replyContainer);
379380
}
380381
381382
@Bean
382-
public KafkaMessageListenerContainer<String, String> replyContainer(ConsumerFactory<String, String> cf) {
383+
public KafkaMessageListenerContainer<String, String> replyContainer(
384+
ConsumerFactory<String, String> cf) {
383385
ContainerProperties containerProperties = new ContainerProperties("kReplies");
384386
return new KafkaMessageListenerContainer<>(cf, containerProperties);
385387
}
@@ -1467,7 +1469,7 @@ public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offse
14671469
Notice that you can still access the batch headers too.
14681470

14691471
Starting with _versions 2.1.1_, the `org.springframework.core.convert.ConversionService` used by the default
1470-
`org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory` to reslove parameters for the invocation
1472+
`o.s.messaging.handler.annotation.support.MessageHandlerMethodFactory` to reslove parameters for the invocation
14711473
of a listener method is supplied with all beans implementing any of the following interfaces:
14721474

14731475
- `org.springframework.core.convert.converter.Converter`

src/reference/asciidoc/si-kafka.adoc

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,125 @@ In most cases, this will be an `ErrorMessageSendingRecoverer` which will send th
277277
When building `ErrorMessage` (for use in the `error-channel` or `recovery-callback`), you can customize the error message using the `error-message-strategy` property.
278278
By default, a `RawRecordHeaderErrorMessageStrategy` is used; providing access to the converted message as well as the raw `ConsumerRecord`.
279279

280+
281+
[[si-outbound-gateway]]
282+
==== Outbound Gateway
283+
284+
The outbound gateway is for request/reply operations; it is different to most Spring Integration gateways in that the sending thread does not block in the gateway, the reply is processed on the reply listener container thread.
285+
Of course, if user code invokes the gateway behind a synchronous https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#gateway[Messaging Gateway], the user thread will block there until the reply is received (or a timeout occurs).
286+
287+
IMPORTANT: the gateway will not accept requests until the reply container has been assigned its topics and partitions.
288+
It is suggested that you add a `ConsumerRebalanceListener` to the template's reply container properties and wait for the `onPartitionsAssigned` call before sending messages to the gateway.
289+
290+
Here is an example of configuring a gateway, with Java Configuration:
291+
292+
[source, java]
293+
----
294+
@Bean
295+
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
296+
public KafkaProducerMessageHandler<String, String> outGateway(
297+
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
298+
return new KafkaProducerMessageHandler<>(kafkaTemplate);
299+
}
300+
----
301+
302+
Notice that the same class as the <<si-outbound, outbound channel adapter>> is used, the only difference being that the kafka template passed into the constructor is a `ReplyingKafkaTemplate` - see <<replying-template>> for more information.
303+
304+
The outbound topic, partition, key etc, are determined the same way as the outbound adapter.
305+
The reply topic is determined as follows:
306+
307+
1. A message header `KafkaHeaders.REPLY_TOPIC`, if present (must have a `String` or `byte[]` value) - validated against the template's reply container subscribed topics.
308+
2. If the template's `replyContainer` is subscribed to just one topic, it will be used.
309+
310+
You can also specify a `KafkaHeaders.REPLY_PARTITION` header to determine a specific partition to be used for replies.
311+
Again, this is validated against the template's reply container subscriptions.
312+
313+
Configuring with the Java DSL:
314+
315+
[source, java]
316+
----
317+
@Bean
318+
public IntegrationFlow outboundGateFlow(
319+
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
320+
return IntegrationFlows.from("kafkaRequests")
321+
.handle(Kafka.outboundGateway(kafkaTemplate))
322+
.channel("kafkaReplies")
323+
.get();
324+
}
325+
----
326+
327+
Or:
328+
329+
[source, java]
330+
----
331+
@Bean
332+
public IntegrationFlow outboundGateFlow() {
333+
return IntegrationFlows.from("kafkaRequests")
334+
.handle(Kafka.outboundGateway(producerFactory(), replyContainer())
335+
.configureKafkaTemplate(t -> t.replyTimeout(30_000)))
336+
.channel("kafkaReplies")
337+
.get();
338+
}
339+
----
340+
341+
XML configuration is not currently available for this component.
342+
343+
[[si-inbound-gateway]]
344+
==== Inbound Gateway
345+
346+
The inbound gateway is for request/reply operations.
347+
348+
Configuring an inbound gateway with Java Configuration:
349+
350+
[source, java]
351+
----
352+
@Bean
353+
public KafkaInboundGateway<Integer, String, String> inboundGateway(
354+
AbstractMessageListenerContainer<Integer, String>container,
355+
KafkaTemplate<Integer, String> replyTemplate) {
356+
357+
KafkaInboundGateway<Integer, String, String> gateway =
358+
new KafkaInboundGateway<>(container, replyTemplate);
359+
gateway.setRequestChannel(requests);
360+
gateway.setReplyChannel(replies);
361+
gateway.setReplyTimeout(30_000);
362+
return gateway;
363+
}
364+
----
365+
366+
Configuring a simple upper case converter with the Java DSL:
367+
368+
[source, java]
369+
----
370+
@Bean
371+
public IntegrationFlow serverGateway(
372+
ConcurrentMessageListenerContainer<Integer, String> container,
373+
KafkaTemplate<Integer, String> replyTemplate) {
374+
return IntegrationFlows
375+
.from(Kafka.inboundGateway(container, template)
376+
.replyTimeout(30_000))
377+
.<String, String>transform(String::toUpperCase)
378+
.get();
379+
}
380+
----
381+
382+
Or:
383+
384+
[source, java]
385+
----
386+
@Bean
387+
public IntegrationFlow serverGateway() {
388+
return IntegrationFlows
389+
.from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
390+
producerFactory())
391+
.replyTimeout(30_000))
392+
.<String, String>transform(String::toUpperCase)
393+
.get();
394+
}
395+
----
396+
397+
XML configuration is not currently available for this component.
398+
280399
[[message-conversion]]
281400
==== Message Conversion
282401

@@ -350,3 +469,4 @@ The 2.3.x branch introduced the following changes:
350469
* Support `ConsumerAwareMessageListener` (`Consumer` is available in a message header)
351470
* Update to Spring Integration 5.0 and Java 8
352471
* Moved Java DSL to main project
472+
* Added inbound and outbound gateways (3.0.2)

0 commit comments

Comments
 (0)