Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ include::{kotlin-examples}/topics/Config.kt[tag=brokerProps]
----
======

Starting with version 2.7, you can declare multiple `NewTopic` s in a single `KafkaAdmin.NewTopics` bean definition:
Starting with version 2.7, you can declare multiple `NewTopic`+++s+++ in a single `KafkaAdmin.NewTopics` bean definition:

[tabs]
======
Expand All @@ -63,7 +63,7 @@ include::{kotlin-examples}/topics/Config.kt[tag=newTopicsBean]
======


IMPORTANT: When using Spring Boot, a `KafkaAdmin` bean is automatically registered so you only need the `NewTopic` (and/or `NewTopics`) `@Bean` s.
IMPORTANT: When using Spring Boot, a `KafkaAdmin` bean is automatically registered so you only need the `NewTopic` (and/or `NewTopics`) `@Bean`+++s+++.

By default, if the broker is not available, a message is logged, but the context continues to load.
You can programmatically invoke the admin's `initialize()` method to try again later.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[[connecting]]
= Connecting to Kafka

* `KafkaAdmin` - see <<configuring-topics>>
* `KafkaAdmin` - see xref:kafka/configuring-topics.adoc[Configuring Topics]
* `ProducerFactory` - see xref:kafka/sending-messages.adoc[Sending Messages]
* `ConsumerFactory` - see xref:kafka/receiving-messages.adoc[Receiving Messages]

Starting with version 2.5, each of these extends `KafkaResourceFactory`.
This allows changing the bootstrap servers at runtime by adding a `Supplier<String>` to their configuration: `setBootstrapServersSupplier(() -> ...)`.
This allows changing the bootstrap servers at runtime by adding a `Supplier<String>` to their configuration: `setBootstrapServersSupplier(() +++->+++ ...)`.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> is special char in AsciiDoc, so we could use +++ to render it literally

This will be called for all new connections to get the list of servers.
Consumers and Producers are generally long-lived.
To close existing Producers, call `reset()` on the `DefaultKafkaProducerFactory`.
Expand All @@ -15,7 +15,7 @@ To close existing Consumers, call `stop()` (and then `start()`) on the `KafkaLis
For convenience, the framework also provides an `ABSwitchCluster` which supports two sets of bootstrap servers; one of which is active at any time.
Configure the `ABSwitchCluster` and add it to the producer and consumer factories, and the `KafkaAdmin`, by calling `setBootstrapServersSupplier()`.
When you want to switch, call `primary()` or `secondary()` and call `reset()` on the producer factory to establish new connection(s); for consumers, `stop()` and `start()` all listener containers.
When using `@KafkaListener` s, `stop()` and `start()` the `KafkaListenerEndpointRegistry` bean.
When using `@KafkaListener`+++s+++, `stop()` and `start()` the `KafkaListenerEndpointRegistry` bean.
Copy link
Contributor Author

@NathanQingyangXu NathanQingyangXu Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in AsciiDoc, `s will be rendered as
Screenshot 2023-10-29 at 21 17 19

that was why a space was inserted to avoid it. however, it looks weird:
Screenshot 2023-10-29 at 21 18 08

using +++ solves the issue:
Screenshot 2023-10-29 at 21 20 01


See the Javadocs for more information.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,21 @@ List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

// Flush the producer.
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

T doInOperations(KafkaOperations<K, V> operations);

}
----

Expand All @@ -62,7 +69,7 @@ The `sendDefault` API requires that a default topic has been provided to the tem

The API takes in a `timestamp` as a parameter and stores this timestamp in the record.
How the user-provided timestamp is stored depends on the timestamp type configured on the Kafka topic.
If the topic is configured to use `CREATE_TIME`, the user specified timestamp is recorded (or generated if not specified).
If the topic is configured to use `CREATE_TIME`, the user-specified timestamp is recorded (or generated if not specified).
If the topic is configured to use `LOG_APPEND_TIME`, the user-specified timestamp is ignored and the broker adds in the local broker time.

The `metrics` and `partitionsFor` methods delegate to the same methods on the underlying https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/producer/Producer.html[`Producer`].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Gradle::
+
[source,groovy,subs="+attributes",role="secondary"]
----
compile 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.kafka:spring-kafka'
----
======

Expand All @@ -58,7 +58,7 @@ However, the quickest way to get started is to use https://start.spring.io[start

This quick tour works with the following versions:

* Apache Kafka Clients 3.5.x
* Apache Kafka Clients 3.6.x
* Spring Framework 6.1.x
* Minimum Java version: 17

Expand Down Expand Up @@ -124,11 +124,11 @@ include::{kotlin-examples}/started/producer/Application.kt[tag=startedProducer]
=== With Java Configuration (No Spring Boot)

IMPORTANT: Spring for Apache Kafka is designed to be used in a Spring Application Context.
For example, if you create the listener container yourself outside of a Spring context, not all functions will work unless you satisfy all of the `...Aware` interfaces that the container implements.
For example, if you create the listener container yourself outside of a Spring context, not all functions will work unless you satisfy all of the `+++...+++Aware` interfaces that the container implements.

Here is an example of an application that does not use Spring Boot; it has both a `Consumer` and `Producer`.

.Without Boot
.Without Spring Boot
[tabs]
======
Java::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,20 @@
// tag::startedNoBootSender[]
public class Sender {

public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
context.getBean(Sender.class).send("test", 42);
}
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
context.getBean(Sender.class).send("test", 42);
}

private final KafkaTemplate<Integer, String> template;
private final KafkaTemplate<Integer, String> template;

public Sender(KafkaTemplate<Integer, String> template) {
this.template = template;
}
public Sender(KafkaTemplate<Integer, String> template) {
this.template = template;
}

public void send(String toSend, int key) {
this.template.send("topic1", key, toSend);
}
public void send(String toSend, int key) {
this.template.send("topic1", key, toSend);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we use 4 spaces in lieu of TAB in code snippets in reference. The above code uses TAB instead, ending up with ugly inconsitent indentation in the same code example:
Screenshot 2023-10-29 at 21 14 01


}
// end::startedNoBootSender[]
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package org.springframework.kafka.jdocs.topics;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.admin.AdminClientConfig;
Expand Down Expand Up @@ -67,9 +67,9 @@ public NewTopic topic2() {
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.assignReplicas(0, List.of(0, 1))
.assignReplicas(1, List.of(1, 2))
.assignReplicas(2, List.of(2, 0))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary, but we have used Java 17, why not use the handy collection constant feature introduced since Java 9?

.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
Expand Down