diff --git a/build.gradle b/build.gradle index 738c041fac..9b352ae917 100644 --- a/build.gradle +++ b/build.gradle @@ -64,7 +64,7 @@ ext { jaywayJsonPathVersion = '2.6.0' junit4Version = '4.13.2' junitJupiterVersion = '5.9.0' - kafkaVersion = '3.2.3' + kafkaVersion = '3.3.1' log4jVersion = '2.18.0' micrometerVersion = '1.10.0-M6' micrometerTracingVersion = '1.0.0-M8' diff --git a/spring-kafka-docs/src/main/asciidoc/appendix.adoc b/spring-kafka-docs/src/main/asciidoc/appendix.adoc index cbbd51d60a..cbf0b942c7 100644 --- a/spring-kafka-docs/src/main/asciidoc/appendix.adoc +++ b/spring-kafka-docs/src/main/asciidoc/appendix.adoc @@ -9,7 +9,7 @@ If you wish to use a different version of `kafka-clients` or `kafka-streams`, an .Maven ---- - 3.2.3 + 3.3.1 diff --git a/spring-kafka-docs/src/main/asciidoc/quick-tour.adoc b/spring-kafka-docs/src/main/asciidoc/quick-tour.adoc index 4fc28f4c3d..08192855b0 100644 --- a/spring-kafka-docs/src/main/asciidoc/quick-tour.adoc +++ b/spring-kafka-docs/src/main/asciidoc/quick-tour.adoc @@ -51,7 +51,7 @@ However, the quickest way to get started is to use https://start.spring.io[start This quick tour works with the following versions: -* Apache Kafka Clients 3.2.x +* Apache Kafka Clients 3.3.x * Spring Framework 6.0.x * Minimum Java version: 17 diff --git a/spring-kafka-docs/src/main/asciidoc/streams.adoc b/spring-kafka-docs/src/main/asciidoc/streams.adoc index 7090f48e6a..e82c8249a3 100644 --- a/spring-kafka-docs/src/main/asciidoc/streams.adoc +++ b/spring-kafka-docs/src/main/asciidoc/streams.adoc @@ -239,12 +239,13 @@ Starting with version 2.7, the default is to never clean up local state. [[streams-header-enricher]] ==== Header Enricher -Version 2.3 added the `HeaderEnricher` implementation of `Transformer`. +Version 3.0 added the `HeaderEnricherProcessor` extension of `ContextualProcessor`; providing the same functionality as the deprecated `HeaderEnricher` which implemented the deprecated `Transformer` interface. This can be used to add headers within the stream processing; the header values are SpEL expressions; the root object of the expression evaluation has 3 properties: -* `context` - the `ProcessorContext`, allowing access to the current record metadata +* `record` - the `org.apache.kafka.streams.processor.api.Record` (`key`, `value`, `timestamp`, `headers`) * `key` - the key of the current record * `value` - the value of the current record +* `context` - the `ProcessorContext`, allowing access to the current record metadata The expressions must return a `byte[]` or a `String` (which will be converted to `byte[]` using `UTF-8`). @@ -253,18 +254,18 @@ To use the enricher within a stream: ==== [source, java] ---- -.transform(() -> enricher) +.process(() -> new HeaderEnricherProcessor(expressions)) ---- ==== -The transformer does not change the `key` or `value`; it simply adds headers. +The processor does not change the `key` or `value`; it simply adds headers. -IMPORTANT: If your stream is multi-threaded, you need a new instance for each record. +IMPORTANT: You need a new instance for each record. ==== [source, java] ---- -.transform(() -> new HeaderEnricher<..., ...>(expressionMap)) +.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap)) ---- ==== @@ -276,19 +277,20 @@ Here is a simple example, adding one literal header and one variable: Map headers = new HashMap<>(); headers.put("header1", new LiteralExpression("value1")); SpelExpressionParser parser = new SpelExpressionParser(); -headers.put("header2", parser.parseExpression("context.timestamp() + ' @' + context.offset()")); -HeaderEnricher enricher = new HeaderEnricher<>(headers); +headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()")); +ProcessorSupplier supplier = () -> new HeaderEnricher enricher = new HeaderEnricher<>(headers); KStream stream = builder.stream(INPUT); stream - .transform(() -> enricher) + .process(() -> supplier) .to(OUTPUT); ---- ==== [[streams-messaging]] -==== `MessagingTransformer` +==== `MessagingProcessor` -Version 2.3 added the `MessagingTransformer` this allows a Kafka Streams topology to interact with a Spring Messaging component, such as a Spring Integration flow. +Version 3.0 added the `MessagingProcessor` extension of `ContextualProcessor`; providing the same functionality as the deprecated `MessagingTransformer` which implemented the deprecated `Transformer` interface. +This allows a Kafka Streams topology to interact with a Spring Messaging component, such as a Spring Integration flow. The transformer requires an implementation of `MessagingFunction`. ==== diff --git a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc index e45431ec80..d9b14aa3ba 100644 --- a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc +++ b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc @@ -6,7 +6,7 @@ For changes in earlier version, see <>. [[x30-kafka-client]] ==== Kafka Client Version -This version requires the 3.2.0 `kafka-clients`. +This version requires the 3.3.1 `kafka-clients`. [[x30-eos]] ==== Exactly Once Semantics diff --git a/spring-kafka/src/main/java/org/springframework/kafka/aot/KafkaRuntimeHints.java b/spring-kafka/src/main/java/org/springframework/kafka/aot/KafkaRuntimeHints.java index 2c603fd584..f8115190ff 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/aot/KafkaRuntimeHints.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/aot/KafkaRuntimeHints.java @@ -27,8 +27,6 @@ import org.apache.kafka.clients.consumer.StickyAssignor; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.RoundRobinPartitioner; -import org.apache.kafka.clients.producer.UniformStickyPartitioner; -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.protocol.Message; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -97,6 +95,7 @@ */ public class KafkaRuntimeHints implements RuntimeHintsRegistrar { + @SuppressWarnings("deprecation") @Override public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader) { ReflectionHints reflectionHints = hints.reflection(); @@ -147,9 +146,9 @@ public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader) RoundRobinAssignor.class, StickyAssignor.class, // standard partitioners - DefaultPartitioner.class, + org.apache.kafka.clients.producer.internals.DefaultPartitioner.class, RoundRobinPartitioner.class, - UniformStickyPartitioner.class, + org.apache.kafka.clients.producer.UniformStickyPartitioner.class, // standard serialization ByteArrayDeserializer.class, ByteArraySerializer.class, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/streams/HeaderEnricher.java b/spring-kafka/src/main/java/org/springframework/kafka/streams/HeaderEnricher.java index aabcb7f627..5a9854e3fd 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/streams/HeaderEnricher.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/streams/HeaderEnricher.java @@ -36,8 +36,10 @@ * * @author Gary Russell * @since 2.3 + * @deprecated in favor of {@link HeaderEnricherProcessor}. * */ +@Deprecated public class HeaderEnricher implements Transformer> { private final Map headerExpressions = new HashMap<>(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/streams/HeaderEnricherProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/streams/HeaderEnricherProcessor.java new file mode 100644 index 0000000000..c2847ffca2 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/streams/HeaderEnricherProcessor.java @@ -0,0 +1,117 @@ +/* + * Copyright 2019-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.streams; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; + +import org.springframework.expression.Expression; + +/** + * Manipulate the headers. + * + * @param the input key type. + * @param the input value type. + * + * @author Gary Russell + * @since 3.0 + * + */ +public class HeaderEnricherProcessor extends ContextualProcessor { + + private final Map headerExpressions = new HashMap<>(); + + /** + * Construct an instance with the provided header expressions. + * @param headerExpressions the header expressions; name:expression. + */ + public HeaderEnricherProcessor(Map headerExpressions) { + this.headerExpressions.putAll(headerExpressions); + } + + @Override + public void process(Record record) { + Headers headers = record.headers(); + Container container = new Container<>(context(), record.key(), record.value(), record); + this.headerExpressions.forEach((name, expression) -> { + Object headerValue = expression.getValue(container); + if (headerValue instanceof String) { + headerValue = ((String) headerValue).getBytes(StandardCharsets.UTF_8); + } + else if (!(headerValue instanceof byte[])) { + throw new IllegalStateException("Invalid header value type: " + headerValue.getClass()); + } + headers.add(new RecordHeader(name, (byte[]) headerValue)); + }); + context().forward(record); + } + + @Override + public void close() { + // NO-OP + } + + /** + * Container object for SpEL evaluation. + * + * @param the key type. + * @param the value type. + * + */ + public static final class Container { + + private final ProcessorContext context; + + private final K key; + + private final V value; + + private final Record record; + + Container(ProcessorContext context, K key, V value, Record record) { + this.context = context; + this.key = key; + this.value = value; + this.record = record; + } + + public ProcessorContext getContext() { + return this.context; + } + + public K getKey() { + return this.key; + } + + public V getValue() { + return this.value; + } + + public Record getRecord() { + return this.record; + } + + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/streams/messaging/MessagingProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/streams/messaging/MessagingProcessor.java new file mode 100644 index 0000000000..fe9e00eb2c --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/streams/messaging/MessagingProcessor.java @@ -0,0 +1,103 @@ +/* + * Copyright 2019-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.streams.messaging; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; + +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.converter.MessagingMessageConverter; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +/** + * A {@link Transformer} implementation that invokes a {@link MessagingFunction} + * converting to/from spring-messaging {@link Message}. Can be used, for example, + * to invoke a Spring Integration flow. + * + * @param the input key type. + * @param the input value type. + * @param the output key type. + * @param the output value type. + * + * @author Gary Russell + * @since 2.3 + * + */ +public class MessagingProcessor extends ContextualProcessor { + + private final MessagingFunction function; + + private final MessagingMessageConverter converter; + + /** + * Construct an instance with the provided function and converter. + * @param function the function. + * @param converter the converter. + */ + public MessagingProcessor(MessagingFunction function, MessagingMessageConverter converter) { + Assert.notNull(function, "'function' cannot be null"); + Assert.notNull(converter, "'converter' cannot be null"); + this.function = function; + this.converter = converter; + } + + @SuppressWarnings("unchecked") + @Override + public void process(Record record) { + ProcessorContext context = context(); + RecordMetadata meta = context.recordMetadata().orElse(null); + Assert.state(meta != null, "No record metadata present"); + Headers headers = record.headers(); + ConsumerRecord rebuilt = new ConsumerRecord(meta.topic(), + meta.partition(), meta.offset(), + record.timestamp(), TimestampType.NO_TIMESTAMP_TYPE, + 0, 0, + record.key(), record.value(), + headers, Optional.empty()); + Message message = this.converter.toMessage(rebuilt, null, null, null); + message = this.function.exchange(message); + List headerList = new ArrayList<>(); + headers.forEach(header -> headerList.add(header.key())); + headerList.forEach(name -> headers.remove(name)); + ProducerRecord fromMessage = this.converter.fromMessage(message, "dummy"); + fromMessage.headers().forEach(header -> { + if (!header.key().equals(KafkaHeaders.TOPIC)) { + headers.add(header); + } + }); + context.forward(new Record<>((Kout) message.getHeaders().get(KafkaHeaders.KEY), (Vout) message.getPayload(), + record.timestamp(), headers)); + } + + @Override + public void close() { + // NO-OP + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/streams/messaging/MessagingTransformer.java b/spring-kafka/src/main/java/org/springframework/kafka/streams/messaging/MessagingTransformer.java index bef1c2d104..102bf0c632 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/streams/messaging/MessagingTransformer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/streams/messaging/MessagingTransformer.java @@ -44,8 +44,10 @@ * * @author Gary Russell * @since 2.3 + * @deprecated in favor of {@link MessagingProcessor}. * */ +@Deprecated public class MessagingTransformer implements Transformer> { private final MessagingFunction function; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java index b14d469db0..73fc0e0ff6 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,14 +27,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.Test; @@ -174,19 +174,12 @@ public KStream testStream(StreamsBuilder kStreamBuilder) { KStream stream = kStreamBuilder.stream("test_topic"); stream - .transform(() -> new Transformer>() { - @Override - public void init(ProcessorContext context) { - } + .process(() -> new ContextualProcessor() { @Override - public KeyValue transform(String key, String value) { - return null; + public void process(Record record) { } - @Override - public void close() { - } }, "testStateStore") .to("test_output"); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/HeaderEnricherTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/HeaderEnricherProcessorTests.java similarity index 95% rename from spring-kafka/src/test/java/org/springframework/kafka/streams/HeaderEnricherTests.java rename to spring-kafka/src/test/java/org/springframework/kafka/streams/HeaderEnricherProcessorTests.java index 48f7423bbb..7f5c2f92cc 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/HeaderEnricherTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/HeaderEnricherProcessorTests.java @@ -43,7 +43,7 @@ * @since 2.3 * */ -public class HeaderEnricherTests { +public class HeaderEnricherProcessorTests { private static final String INPUT = "input"; @@ -56,10 +56,9 @@ void testWithDriver() { headers.put("foo", new LiteralExpression("bar")); SpelExpressionParser parser = new SpelExpressionParser(); headers.put("spel", parser.parseExpression("context.timestamp() + new String(key) + new String(value)")); - HeaderEnricher enricher = new HeaderEnricher<>(headers); KStream stream = builder.stream(INPUT); stream - .transform(() -> enricher) + .process(() -> new HeaderEnricherProcessor<>(headers)) .to(OUTPUT); Properties config = new Properties(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java index c443738b67..c449037c77 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java @@ -222,7 +222,6 @@ public KStream kStream(StreamsBuilder kStreamBuilder) { headers.put("foo", new LiteralExpression("bar")); SpelExpressionParser parser = new SpelExpressionParser(); headers.put("spel", parser.parseExpression("context.timestamp() + key + value")); - HeaderEnricher enricher = new HeaderEnricher<>(headers); stream.mapValues((ValueMapper) String::toUpperCase) .mapValues(Foo::new) .repartition(Repartitioned.with(Serdes.Integer(), new JsonSerde() { })) @@ -233,7 +232,7 @@ public KStream kStream(StreamsBuilder kStreamBuilder) { .toStream() .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value)) .filter((i, s) -> s.length() > 40) - .transform(() -> enricher) + .process(() -> new HeaderEnricherProcessor<>(headers)) .to(streamingTopic2); stream.print(Printed.toSysOut()); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/messaging/MessagingTransformerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/messaging/MessagingProcessorTests.java similarity index 93% rename from spring-kafka/src/test/java/org/springframework/kafka/streams/messaging/MessagingTransformerTests.java rename to spring-kafka/src/test/java/org/springframework/kafka/streams/messaging/MessagingProcessorTests.java index 9a346b9cf4..0ef2fffeb4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/messaging/MessagingTransformerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/messaging/MessagingProcessorTests.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; @@ -45,7 +46,7 @@ * @since 2.3 * */ -public class MessagingTransformerTests { +public class MessagingProcessorTests { private static final String INPUT = "input"; @@ -55,7 +56,8 @@ public class MessagingTransformerTests { void testWithDriver() { MessagingMessageConverter converter = new MessagingMessageConverter(); converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); - MessagingTransformer messagingTransformer = new MessagingTransformer<>(message -> + ProcessorSupplier messagingTransformer = () -> + new MessagingProcessor<>(message -> MessageBuilder.withPayload("bar".getBytes()) .copyHeaders(message.getHeaders()) .setHeader("baz", "qux".getBytes()) @@ -64,7 +66,7 @@ void testWithDriver() { StreamsBuilder builder = new StreamsBuilder(); KStream stream = builder.stream(INPUT); stream - .transform(() -> messagingTransformer) + .process(messagingTransformer) .to(OUTPUT); Properties config = new Properties();