Skip to content

Commit 31431fa

Browse files
garyrussellartembilan
authored andcommitted
More Spring Integration Kafka Support
- don't map message channel headers, unless they are String - tolerate `byte[]` in topic header. * Fix NPE - a null topic header is ok, as long as there's a default provided.
1 parent 5944e17 commit 31431fa

File tree

5 files changed

+44
-7
lines changed

5 files changed

+44
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.commons.logging.Log;
2525
import org.apache.commons.logging.LogFactory;
2626

27+
import org.springframework.messaging.MessageHeaders;
2728
import org.springframework.util.Assert;
2829
import org.springframework.util.PatternMatchUtils;
2930

@@ -64,6 +65,21 @@ public AbstractKafkaHeaderMapper(String... patterns) {
6465
}
6566
}
6667

68+
protected boolean matches(String header, Object value) {
69+
if (matches(header)) {
70+
if ((header.equals(MessageHeaders.REPLY_CHANNEL) || header.equals(MessageHeaders.ERROR_CHANNEL))
71+
&& !(value instanceof String)) {
72+
if (this.logger.isDebugEnabled()) {
73+
this.logger.debug("Cannot map " + header + " when type is [" + value.getClass()
74+
+ "]; it must be a String");
75+
}
76+
return false;
77+
}
78+
return true;
79+
}
80+
return false;
81+
}
82+
6783
protected boolean matches(String header) {
6884
for (SimplePatternBasedHeaderMatcher matcher : this.matchers) {
6985
if (matcher.matchHeader(header)) {

spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public void addTrustedPackages(String... trustedPackages) {
168168
public void fromHeaders(MessageHeaders headers, Headers target) {
169169
final Map<String, String> jsonHeaders = new HashMap<>();
170170
headers.forEach((k, v) -> {
171-
if (matches(k)) {
171+
if (matches(k, v)) {
172172
if (v instanceof byte[]) {
173173
target.add(new RecordHeader(k, (byte[]) v));
174174
}

spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public SimpleKafkaHeaderMapper(String... patterns) {
6565
@Override
6666
public void fromHeaders(MessageHeaders headers, Headers target) {
6767
headers.forEach((k, v) -> {
68-
if (v instanceof byte[] && matches(k)) {
68+
if (v instanceof byte[] && matches(k, v)) {
6969
target.add(new RecordHeader(k, (byte[]) v));
7070
}
7171
});

spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.support.converter;
1818

1919
import java.lang.reflect.Type;
20+
import java.nio.charset.StandardCharsets;
2021
import java.util.Map;
2122

2223
import org.apache.commons.logging.Log;
@@ -36,6 +37,7 @@
3637
import org.springframework.messaging.Message;
3738
import org.springframework.messaging.MessageHeaders;
3839
import org.springframework.messaging.support.MessageBuilder;
40+
import org.springframework.util.Assert;
3941

4042
/**
4143
* A Messaging {@link MessageConverter} implementation for a message listener that
@@ -133,7 +135,21 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
133135
@Override
134136
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
135137
MessageHeaders headers = message.getHeaders();
136-
String topic = headers.get(KafkaHeaders.TOPIC, String.class);
138+
Object topicHeader = headers.get(KafkaHeaders.TOPIC);
139+
String topic = null;
140+
if (topicHeader instanceof byte[]) {
141+
topic = new String(((byte[]) topicHeader), StandardCharsets.UTF_8);
142+
}
143+
else if (topicHeader instanceof String) {
144+
topic = (String) topicHeader;
145+
}
146+
else if (topicHeader == null) {
147+
Assert.state(defaultTopic != null, "With no topic header, a defaultTopic is required");
148+
}
149+
else {
150+
throw new IllegalStateException(KafkaHeaders.TOPIC + " must be a String or byte[], not "
151+
+ topicHeader.getClass());
152+
}
137153
Integer partition = headers.get(KafkaHeaders.PARTITION_ID, Integer.class);
138154
Object key = headers.get(KafkaHeaders.MESSAGE_KEY);
139155
Object payload = convertPayload(message);

spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
import org.springframework.kafka.support.DefaultKafkaHeaderMapper.NonTrustedHeaderType;
2929
import org.springframework.messaging.Message;
3030
import org.springframework.messaging.MessageHeaders;
31+
import org.springframework.messaging.support.ExecutorSubscribableChannel;
3132
import org.springframework.messaging.support.MessageBuilder;
3233
import org.springframework.util.MimeType;
3334
import org.springframework.util.MimeTypeUtils;
@@ -47,12 +48,14 @@ public void test() {
4748
.setHeader("foo", "bar".getBytes())
4849
.setHeader("baz", "qux")
4950
.setHeader("fix", new Foo())
51+
.setHeader(MessageHeaders.REPLY_CHANNEL, new ExecutorSubscribableChannel())
52+
.setHeader(MessageHeaders.ERROR_CHANNEL, "errors")
5053
.setHeader(MessageHeaders.CONTENT_TYPE, utf8Text)
5154
.setHeader("simpleContentType", MimeTypeUtils.TEXT_PLAIN)
5255
.build();
5356
RecordHeaders recordHeaders = new RecordHeaders();
5457
mapper.fromHeaders(message.getHeaders(), recordHeaders);
55-
assertThat(recordHeaders.toArray().length).isEqualTo(6); // 5 + json_types
58+
assertThat(recordHeaders.toArray().length).isEqualTo(7); // 6 + json_types
5659
Map<String, Object> headers = new HashMap<>();
5760
mapper.toHeaders(recordHeaders, headers);
5861
assertThat(headers.get("foo")).isInstanceOf(byte[].class);
@@ -61,18 +64,20 @@ public void test() {
6164
assertThat(headers.get("fix")).isInstanceOf(NonTrustedHeaderType.class);
6265
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo(utf8Text);
6366
assertThat(headers.get("simpleContentType")).isEqualTo(MimeTypeUtils.TEXT_PLAIN);
67+
assertThat(headers.get(MessageHeaders.REPLY_CHANNEL)).isNull();
68+
assertThat(headers.get(MessageHeaders.ERROR_CHANNEL)).isEqualTo("errors");
6469
NonTrustedHeaderType ntht = (NonTrustedHeaderType) headers.get("fix");
6570
assertThat(ntht.getHeaderValue()).isNotNull();
6671
assertThat(ntht.getUntrustedType()).isEqualTo(Foo.class.getName());
67-
assertThat(headers.size()).isEqualTo(5);
72+
assertThat(headers.size()).isEqualTo(6);
6873
mapper.addTrustedPackages(getClass().getPackage().getName());
6974
headers = new HashMap<>();
7075
mapper.toHeaders(recordHeaders, headers);
7176
assertThat(headers.get("foo")).isInstanceOf(byte[].class);
7277
assertThat(new String((byte[]) headers.get("foo"))).isEqualTo("bar");
7378
assertThat(headers.get("baz")).isEqualTo("qux");
7479
assertThat(headers.get("fix")).isEqualTo(new Foo());
75-
assertThat(headers.size()).isEqualTo(5);
80+
assertThat(headers.size()).isEqualTo(6);
7681
}
7782

7883
public static final class Foo {

0 commit comments

Comments
 (0)