Skip to content

Commit 5bf070a

Browse files
committed
More flexible RSocket metadata support
The responding side now relies on a new MetadataExtractor which decodes metadata entries of interest, and adds them to an output map whose values are then added as Message headers, and are hence accessible to controller methods. Decoded metadata entry values can be added to the output map one for one, or translated to any number of values (e.g. JSON properties), as long as one of the resulting pairs has a key called "route". On the requesting side, now any metadata can be sent, and a String route for example is not required to be provided explicitly. Instead an application could create any metadata (e.g. JSON properties) as long as the server can work out the route from it. The commit contains further refinements on the requesting side so that any mime type can be used, not only composite or routing metadata, e.g. a route in an "text/plain" entry. Closes gh-23157
1 parent 3e41f5e commit 5bf070a

File tree

10 files changed

+761
-226
lines changed

10 files changed

+761
-226
lines changed

spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java

+32-22
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616

1717
package org.springframework.messaging.rsocket;
1818

19-
import java.util.Arrays;
2019
import java.util.Collections;
2120
import java.util.LinkedHashMap;
22-
import java.util.List;
2321
import java.util.Map;
2422

2523
import io.netty.buffer.ByteBuf;
@@ -59,8 +57,6 @@ final class DefaultRSocketRequester implements RSocketRequester {
5957

6058
static final MimeType ROUTING = new MimeType("message", "x.rsocket.routing.v0");
6159

62-
static final List<MimeType> METADATA_MIME_TYPES = Arrays.asList(COMPOSITE_METADATA, ROUTING);
63-
6460

6561
private static final Map<String, Object> EMPTY_HINTS = Collections.emptyMap();
6662

@@ -85,9 +81,6 @@ final class DefaultRSocketRequester implements RSocketRequester {
8581
Assert.notNull(metadataMimeType, "'metadataMimeType' is required");
8682
Assert.notNull(strategies, "RSocketStrategies is required");
8783

88-
Assert.isTrue(METADATA_MIME_TYPES.contains(metadataMimeType),
89-
() -> "Unexpected metadatata mime type: '" + metadataMimeType + "'");
90-
9184
this.rsocket = rsocket;
9285
this.dataMimeType = dataMimeType;
9386
this.metadataMimeType = metadataMimeType;
@@ -113,7 +106,13 @@ public MimeType metadataMimeType() {
113106

114107
@Override
115108
public RequestSpec route(String route) {
116-
return new DefaultRequestSpec(route);
109+
Assert.notNull(route, "'route' is required");
110+
return new DefaultRequestSpec(route, metadataMimeType().equals(COMPOSITE_METADATA) ? ROUTING : null);
111+
}
112+
113+
@Override
114+
public RequestSpec metadata(Object metadata, @Nullable MimeType mimeType) {
115+
return new DefaultRequestSpec(metadata, mimeType);
117116
}
118117

119118

@@ -131,16 +130,22 @@ private class DefaultRequestSpec implements RequestSpec {
131130
private final Map<Object, MimeType> metadata = new LinkedHashMap<>(4);
132131

133132

134-
public DefaultRequestSpec(String route) {
135-
Assert.notNull(route, "'route' is required");
136-
metadata(route, ROUTING);
133+
public DefaultRequestSpec(Object metadata, @Nullable MimeType mimeType) {
134+
mimeType = (mimeType == null && !isCompositeMetadata() ? metadataMimeType() : mimeType);
135+
Assert.notNull(mimeType, "MimeType is required for composite metadata");
136+
metadata(metadata, mimeType);
137137
}
138138

139+
private boolean isCompositeMetadata() {
140+
return metadataMimeType().equals(COMPOSITE_METADATA);
141+
}
139142

140143
@Override
141144
public RequestSpec metadata(Object metadata, MimeType mimeType) {
142-
Assert.isTrue(this.metadata.isEmpty() || metadataMimeType().equals(COMPOSITE_METADATA),
143-
"Additional metadata entries supported only with composite metadata");
145+
Assert.notNull(metadata, "Metadata content is required");
146+
Assert.notNull(mimeType, "MimeType is required");
147+
Assert.isTrue(this.metadata.isEmpty() || isCompositeMetadata(),
148+
"Composite metadata required for multiple metadata entries.");
144149
this.metadata.put(metadata, mimeType);
145150
return this;
146151
}
@@ -250,22 +255,27 @@ private Mono<Payload> emptyPayload() {
250255
}
251256

252257
private DataBuffer getMetadata() {
253-
if (metadataMimeType().equals(COMPOSITE_METADATA)) {
258+
if (isCompositeMetadata()) {
254259
CompositeByteBuf metadata = getAllocator().compositeBuffer();
255-
this.metadata.forEach((key, value) -> {
256-
DataBuffer dataBuffer = encodeMetadata(key, value);
257-
CompositeMetadataFlyweight.encodeAndAddMetadata(metadata, getAllocator(), value.toString(),
260+
this.metadata.forEach((value, mimeType) -> {
261+
DataBuffer dataBuffer = encodeMetadata(value, mimeType);
262+
CompositeMetadataFlyweight.encodeAndAddMetadata(metadata, getAllocator(), mimeType.toString(),
258263
dataBuffer instanceof NettyDataBuffer ?
259264
((NettyDataBuffer) dataBuffer).getNativeBuffer() :
260265
Unpooled.wrappedBuffer(dataBuffer.asByteBuffer()));
261266
});
262267
return asDataBuffer(metadata);
263268
}
264-
Assert.isTrue(this.metadata.size() < 2, "Composite metadata required for multiple entries");
265-
Map.Entry<Object, MimeType> entry = this.metadata.entrySet().iterator().next();
266-
Assert.isTrue(metadataMimeType().equals(entry.getValue()),
267-
() -> "Expected metadata MimeType '" + metadataMimeType() + "', actual " + this.metadata);
268-
return encodeMetadata(entry.getKey(), entry.getValue());
269+
else {
270+
Assert.isTrue(this.metadata.size() == 1, "Composite metadata required for multiple entries");
271+
Map.Entry<Object, MimeType> entry = this.metadata.entrySet().iterator().next();
272+
if (!metadataMimeType().equals(entry.getValue())) {
273+
throw new IllegalArgumentException(
274+
"Connection configured for metadata mime type " +
275+
"'" + metadataMimeType() + "', but actual is `" + this.metadata + "`");
276+
}
277+
return encodeMetadata(entry.getKey(), entry.getValue());
278+
}
269279
}
270280

271281
@SuppressWarnings("unchecked")

spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java

+25-14
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,25 @@ public interface RSocketRequester {
6767

6868

6969
/**
70-
* Begin to specify a new request with the given route to a handler on the
71-
* remote side. The route will be encoded in the metadata of the first
72-
* payload.
70+
* Begin to specify a new request with the given route to a remote handler.
71+
* <p>If the connection is set to use composite metadata, the route is
72+
* encoded as {@code "message/x.rsocket.routing.v0"}. Otherwise the route
73+
* is encoded according to the mime type for the connection.
7374
* @param route the route to a handler
7475
* @return a spec for further defining and executing the request
7576
*/
7677
RequestSpec route(String route);
7778

79+
/**
80+
* Begin to specify a new request with the given metadata.
81+
* <p>If using composite metadata then the mime type argument is required.
82+
* Otherwise the mime type should be {@code null}, or it must match the
83+
* mime type for the connection.
84+
* @param metadata the metadata value to encode
85+
* @param mimeType the mime type that describes the metadata;
86+
*/
87+
RequestSpec metadata(Object metadata, @Nullable MimeType mimeType);
88+
7889

7990
/**
8091
* Obtain a builder for an {@link RSocketRequester} by connecting to an
@@ -110,24 +121,24 @@ static RSocketRequester wrap(
110121
interface Builder {
111122

112123
/**
113-
* Configure the MimeType to use for payload data. This is then
114-
* specified on the {@code SETUP} frame for the whole connection.
115-
* <p>By default this is set to the first concrete MimeType supported
124+
* Configure the MimeType for payload data which is then specified
125+
* on the {@code SETUP} frame and applies to the whole connection.
126+
* <p>By default this is set to the first concrete mime type supported
116127
* by the configured encoders and decoders.
117128
* @param mimeType the data MimeType to use
118129
*/
119130
RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType);
120131

121132
/**
122-
* Configure the MimeType to use for payload metadata. This is then
123-
* specified on the {@code SETUP} frame for the whole connection.
124-
* <p>At present the metadata MimeType must be
125-
* {@code "message/x.rsocket.routing.v0"} to allow the request
126-
* {@link RSocketRequester#route(String) route} to be encoded, or it
127-
* could also be {@code "message/x.rsocket.composite-metadata.v0"} in
128-
* which case the route can be encoded along with other metadata entries.
133+
* Configure the MimeType for payload metadata which is then specified
134+
* on the {@code SETUP} frame and applies to the whole connection.
129135
* <p>By default this is set to
130-
* {@code "message/x.rsocket.composite-metadata.v0"}.
136+
* {@code "message/x.rsocket.composite-metadata.v0"} in which case the
137+
* route, if provided, is encoded as a
138+
* {@code "message/x.rsocket.routing.v0"} metadata entry, potentially
139+
* with other metadata entries added too. If this is set to any other
140+
* mime type, and a route is provided, it is assumed the mime type is
141+
* for the route.
131142
* @param mimeType the data MimeType to use
132143
*/
133144
RSocketRequester.Builder metadataMimeType(MimeType mimeType);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.messaging.rsocket.annotation.support;
17+
18+
import java.util.Collections;
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
import java.util.function.BiConsumer;
22+
23+
import io.netty.buffer.ByteBuf;
24+
import io.rsocket.Payload;
25+
import io.rsocket.metadata.CompositeMetadata;
26+
27+
import org.springframework.core.ParameterizedTypeReference;
28+
import org.springframework.core.ResolvableType;
29+
import org.springframework.core.codec.Decoder;
30+
import org.springframework.core.io.buffer.DataBuffer;
31+
import org.springframework.core.io.buffer.DataBufferFactory;
32+
import org.springframework.core.io.buffer.NettyDataBufferFactory;
33+
import org.springframework.lang.Nullable;
34+
import org.springframework.messaging.rsocket.RSocketStrategies;
35+
import org.springframework.util.Assert;
36+
import org.springframework.util.MimeType;
37+
38+
/**
39+
* Default {@link MetadataExtractor} implementation that relies on {@link Decoder}s
40+
* to deserialize the content of metadata entries.
41+
*
42+
* <p>By default only {@code "message/x.rsocket.routing.v0""} is extracted and
43+
* saved under {@link MetadataExtractor#ROUTE_KEY}. Use the
44+
* {@code metadataToExtract} methods to specify other metadata mime types of
45+
* interest to extract.
46+
*
47+
* @author Rossen Stoyanchev
48+
* @since 5.2
49+
*/
50+
public class DefaultMetadataExtractor implements MetadataExtractor {
51+
52+
private final RSocketStrategies rsocketStrategies;
53+
54+
private final Map<String, EntryProcessor<?>> entryProcessors = new HashMap<>();
55+
56+
57+
/**
58+
* Default constructor with {@link RSocketStrategies}.
59+
*/
60+
public DefaultMetadataExtractor(RSocketStrategies strategies) {
61+
Assert.notNull(strategies, "RSocketStrategies is required");
62+
this.rsocketStrategies = strategies;
63+
// TODO: remove when rsocket-core API available
64+
metadataToExtract(MessagingRSocket.ROUTING, String.class, ROUTE_KEY);
65+
}
66+
67+
68+
/**
69+
* Decode metadata entries with the given {@link MimeType} to the specified
70+
* target class, and store the decoded value in the output map under the
71+
* given name.
72+
* @param mimeType the mime type of metadata entries to extract
73+
* @param targetType the target value type to decode to
74+
* @param name assign a name for the decoded value; if not provided, then
75+
* the mime type is used as the key
76+
*/
77+
public void metadataToExtract(
78+
MimeType mimeType, Class<?> targetType, @Nullable String name) {
79+
80+
String key = name != null ? name : mimeType.toString();
81+
metadataToExtract(mimeType, targetType, (value, map) -> map.put(key, value));
82+
}
83+
84+
/**
85+
* Variant of {@link #metadataToExtract(MimeType, Class, String)} that accepts
86+
* {@link ParameterizedTypeReference} instead of {@link Class} for
87+
* specifying a target type with generic parameters.
88+
*/
89+
public void metadataToExtract(
90+
MimeType mimeType, ParameterizedTypeReference<?> targetType, @Nullable String name) {
91+
92+
String key = name != null ? name : mimeType.toString();
93+
metadataToExtract(mimeType, targetType, (value, map) -> map.put(key, value));
94+
}
95+
96+
/**
97+
* Variant of {@link #metadataToExtract(MimeType, Class, String)} that allows
98+
* custom logic to be used to map the decoded value to any number of values
99+
* in the output map.
100+
* @param mimeType the mime type of metadata entries to extract
101+
* @param targetType the target value type to decode to
102+
* @param mapper custom logic to add the decoded value to the output map
103+
* @param <T> the target value type
104+
*/
105+
public <T> void metadataToExtract(
106+
MimeType mimeType, Class<T> targetType,
107+
BiConsumer<T, Map<String, Object>> mapper) {
108+
109+
EntryProcessor<T> spec = new EntryProcessor<>(mimeType, targetType, mapper);
110+
this.entryProcessors.put(mimeType.toString(), spec);
111+
}
112+
113+
/**
114+
* Variant of {@link #metadataToExtract(MimeType, Class, BiConsumer)} that
115+
* accepts {@link ParameterizedTypeReference} instead of {@link Class} for
116+
* specifying a target type with generic parameters.
117+
* @param mimeType the mime type of metadata entries to extract
118+
* @param targetType the target value type to decode to
119+
* @param mapper custom logic to add the decoded value to the output map
120+
* @param <T> the target value type
121+
*/
122+
public <T> void metadataToExtract(
123+
MimeType mimeType, ParameterizedTypeReference<T> targetType,
124+
BiConsumer<T, Map<String, Object>> mapper) {
125+
126+
EntryProcessor<T> spec = new EntryProcessor<>(mimeType, targetType, mapper);
127+
this.entryProcessors.put(mimeType.toString(), spec);
128+
}
129+
130+
131+
@Override
132+
public Map<String, Object> extract(Payload payload, MimeType metadataMimeType) {
133+
Map<String, Object> result = new HashMap<>();
134+
if (metadataMimeType.equals(MessagingRSocket.COMPOSITE_METADATA)) {
135+
for (CompositeMetadata.Entry entry : new CompositeMetadata(payload.metadata(), false)) {
136+
processEntry(entry.getContent(), entry.getMimeType(), result);
137+
}
138+
}
139+
else {
140+
processEntry(payload.metadata(), metadataMimeType.toString(), result);
141+
}
142+
return result;
143+
}
144+
145+
private void processEntry(ByteBuf content, @Nullable String mimeType, Map<String, Object> result) {
146+
EntryProcessor<?> entryProcessor = this.entryProcessors.get(mimeType);
147+
if (entryProcessor != null) {
148+
content.retain();
149+
entryProcessor.process(content, result);
150+
return;
151+
}
152+
if (MessagingRSocket.ROUTING.toString().equals(mimeType)) {
153+
// TODO: use rsocket-core API when available
154+
}
155+
}
156+
157+
158+
/**
159+
* Helps to decode a metadata entry and add the resulting value to the
160+
* output map.
161+
*/
162+
private class EntryProcessor<T> {
163+
164+
private final MimeType mimeType;
165+
166+
private final ResolvableType targetType;
167+
168+
private final BiConsumer<T, Map<String, Object>> accumulator;
169+
170+
private final Decoder<T> decoder;
171+
172+
173+
public EntryProcessor(
174+
MimeType mimeType, Class<T> targetType,
175+
BiConsumer<T, Map<String, Object>> accumulator) {
176+
177+
this(mimeType, ResolvableType.forClass(targetType), accumulator);
178+
}
179+
180+
public EntryProcessor(
181+
MimeType mimeType, ParameterizedTypeReference<T> targetType,
182+
BiConsumer<T, Map<String, Object>> accumulator) {
183+
184+
this(mimeType, ResolvableType.forType(targetType), accumulator);
185+
}
186+
187+
private EntryProcessor(
188+
MimeType mimeType, ResolvableType targetType,
189+
BiConsumer<T, Map<String, Object>> accumulator) {
190+
191+
this.mimeType = mimeType;
192+
this.targetType = targetType;
193+
this.accumulator = accumulator;
194+
this.decoder = rsocketStrategies.decoder(targetType, mimeType);
195+
}
196+
197+
198+
public void process(ByteBuf byteBuf, Map<String, Object> result) {
199+
DataBufferFactory factory = rsocketStrategies.dataBufferFactory();
200+
DataBuffer buffer = factory instanceof NettyDataBufferFactory ?
201+
((NettyDataBufferFactory) factory).wrap(byteBuf) :
202+
factory.wrap(byteBuf.nioBuffer());
203+
204+
T value = this.decoder.decode(buffer, this.targetType, this.mimeType, Collections.emptyMap());
205+
this.accumulator.accept(value, result);
206+
}
207+
}
208+
209+
}

0 commit comments

Comments
 (0)