From 1fb5aa4081a28186dde9d3a71e5fe99f25342cd5 Mon Sep 17 00:00:00 2001 From: rsakin Date: Fri, 29 Jul 2022 15:46:28 +0300 Subject: [PATCH 1/2] feat: kafka-clients dep added to create topics --- pom.xml | 5 ++ .../config/KafkaConfiguration.java | 69 +++++++++++++++++++ .../config/KakfaConfiguration.java | 37 ---------- 3 files changed, 74 insertions(+), 37 deletions(-) create mode 100644 src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KafkaConfiguration.java delete mode 100644 src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KakfaConfiguration.java diff --git a/pom.xml b/pom.xml index 1d45db2..b73ca6c 100644 --- a/pom.xml +++ b/pom.xml @@ -39,6 +39,11 @@ spring-boot-starter-test test + + org.apache.kafka + kafka-clients + 3.1.1 + diff --git a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KafkaConfiguration.java b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KafkaConfiguration.java new file mode 100644 index 0000000..b759b73 --- /dev/null +++ b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KafkaConfiguration.java @@ -0,0 +1,69 @@ +package com.techprimers.kafka.springbootkafkaproducerexample.config; + +import com.techprimers.kafka.springbootkafkaproducerexample.model.User; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.HashMap; +import java.util.Map; + +@EnableKafka +@Configuration +public class KafkaConfiguration { + + public static final String KAFKA_EXAMPLE = "Kafka_Example"; + public static final String KAFKA_EXAMPLE_JSON = "Kafka_Example_User"; + + @Value("${kafka.bootstrapAddress}") + private String bootstrapAddress; + + // Create related kafka topics + @Bean + public NewTopic testTopic() { + return new NewTopic(KAFKA_EXAMPLE, 1, (short) 1); + } + + @Bean + public NewTopic testJsonTopic() { + return new NewTopic(KAFKA_EXAMPLE_JSON, 1, (short) 1); + } + + @Bean + public KafkaTemplate kafkaSimpleMessageTemplate() { + return new KafkaTemplate<>(simpleMessageProducerFactory()); + } + + // Default Factory is to send String messages + private ProducerFactory simpleMessageProducerFactory() { + final Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return new DefaultKafkaProducerFactory<>(configProps); + } + + // Object producer factory & template + @Bean + public KafkaTemplate kafkaObjectTemplate() { + return new KafkaTemplate<>(objectProducerFactory()); + } + + private ProducerFactory objectProducerFactory() { + final Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + + return new DefaultKafkaProducerFactory<>(configProps); + } +} \ No newline at end of file diff --git a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KakfaConfiguration.java b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KakfaConfiguration.java deleted file mode 100644 index 38165d4..0000000 --- a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KakfaConfiguration.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.techprimers.kafka.springbootkafkaproducerexample.config; - -import com.techprimers.kafka.springbootkafkaproducerexample.model.User; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.support.serializer.JsonSerializer; - -import java.util.HashMap; -import java.util.Map; - -@Configuration -public class KakfaConfiguration { - - @Bean - public ProducerFactory producerFactory() { - Map config = new HashMap<>(); - - config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); - config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); - - return new DefaultKafkaProducerFactory<>(config); - } - - - @Bean - public KafkaTemplate kafkaTemplate() { - return new KafkaTemplate<>(producerFactory()); - } - - -} From 39bbfd3160d5310a47a079fe6b86b484afd8223b Mon Sep 17 00:00:00 2001 From: rsakin Date: Fri, 29 Jul 2022 15:47:35 +0300 Subject: [PATCH 2/2] feat: some refactoring on endpoint to publish user or message separately --- .../model/User.java | 12 ++++++++ .../resource/UserResource.java | 30 +++++++++++++------ src/main/resources/application.properties | 2 +- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/model/User.java b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/model/User.java index 5b66a54..d738dc7 100644 --- a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/model/User.java +++ b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/model/User.java @@ -6,6 +6,9 @@ public class User { private String dept; private Long salary; + public User() { + } + public User(String name, String dept, Long salary) { this.name = name; this.dept = dept; @@ -35,4 +38,13 @@ public Long getSalary() { public void setSalary(Long salary) { this.salary = salary; } + + @Override + public String toString() { + return "User{" + + "name='" + name + + ", dept='" + dept + + ", salary=" + salary + + '}'; + } } diff --git a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/resource/UserResource.java b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/resource/UserResource.java index cd477ab..27b8c6b 100644 --- a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/resource/UserResource.java +++ b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/resource/UserResource.java @@ -1,27 +1,39 @@ package com.techprimers.kafka.springbootkafkaproducerexample.resource; +import com.techprimers.kafka.springbootkafkaproducerexample.config.KafkaConfiguration; import com.techprimers.kafka.springbootkafkaproducerexample.model.User; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController -@RequestMapping("kafka") +@RequestMapping("/kafka") public class UserResource { @Autowired - private KafkaTemplate kafkaTemplate; + @Qualifier("kafkaSimpleMessageTemplate") + private KafkaTemplate kafkaSimpleMessageTemplate; - private static final String TOPIC = "Kafka_Example"; - - @GetMapping("/publish/{name}") - public String post(@PathVariable("name") final String name) { + @Autowired + @Qualifier("kafkaObjectTemplate") + private KafkaTemplate kafkaObjectTemplate; - kafkaTemplate.send(TOPIC, new User(name, "Technology", 12000L)); + @GetMapping("/publish/{message}") + public String publishMessage(@PathVariable final String message) { + kafkaSimpleMessageTemplate.send(KafkaConfiguration.KAFKA_EXAMPLE, message); + return "Message : " + message + " published successfully"; + } - return "Published successfully"; + @PostMapping("/publish") + public String publishUser(@RequestBody final User user) { + kafkaObjectTemplate.send(KafkaConfiguration.KAFKA_EXAMPLE_JSON, user); + return "User : " + user + "published successfully"; } -} + +} \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index bafddce..a0c8a98 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1 +1 @@ -server.port=8081 \ No newline at end of file +kafka.bootstrapAddress=localhost:9092 \ No newline at end of file