Skip to content

Commit 6ed1b58

Browse files
committed
Separate step for retrieve in RSocketRequester
Closes gh-24073
1 parent b234c77 commit 6ed1b58

File tree

3 files changed

+42
-32
lines changed

3 files changed

+42
-32
lines changed

spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,9 @@ interface Builder {
238238
}
239239

240240
/**
241-
* Spec for providing input data for an RSocket request and triggering the exchange.
241+
* Spec to declare the input for an RSocket request.
242242
*/
243-
interface RequestSpec extends MetadataSpec<RequestSpec> {
243+
interface RequestSpec extends MetadataSpec<RequestSpec>, RetrieveSpec {
244244

245245
/**
246246
* Append additional metadata entries through a {@code Consumer}.
@@ -262,7 +262,7 @@ interface RequestSpec extends MetadataSpec<RequestSpec> {
262262
* @param data the Object value for the payload data
263263
* @return spec to declare the expected response
264264
*/
265-
RequestSpec data(Object data);
265+
RetrieveSpec data(Object data);
266266

267267
/**
268268
* Variant of {@link #data(Object)} that also accepts a hint for the
@@ -274,7 +274,7 @@ interface RequestSpec extends MetadataSpec<RequestSpec> {
274274
* @param elementClass the type of values to be produced
275275
* @return spec to declare the expected response
276276
*/
277-
RequestSpec data(Object producer, Class<?> elementClass);
277+
RetrieveSpec data(Object producer, Class<?> elementClass);
278278

279279
/**
280280
* Variant of {@link #data(Object, Class)} for when the type hint has
@@ -285,7 +285,38 @@ interface RequestSpec extends MetadataSpec<RequestSpec> {
285285
* @param elementTypeRef the type of values to be produced
286286
* @return spec to declare the expected response
287287
*/
288-
RequestSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
288+
RetrieveSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
289+
}
290+
291+
292+
/**
293+
* Spec for providing additional composite metadata entries.
294+
*
295+
* @param <S> a self reference to the spec type
296+
*/
297+
interface MetadataSpec<S extends MetadataSpec<S>> {
298+
299+
/**
300+
* Use this to append additional metadata entries when using composite
301+
* metadata. An {@link IllegalArgumentException} is raised if this
302+
* method is used when not using composite metadata.
303+
* The metadata value be a concrete value or any producer of a single
304+
* value that can be adapted to a {@link Publisher} via
305+
* {@link ReactiveAdapterRegistry}.
306+
* @param metadata an Object to be encoded with a suitable
307+
* {@link org.springframework.core.codec.Encoder Encoder}, or a
308+
* {@link org.springframework.core.io.buffer.DataBuffer DataBuffer}
309+
* @param mimeType the mime type that describes the metadata
310+
*/
311+
S metadata(Object metadata, MimeType mimeType);
312+
}
313+
314+
315+
/**
316+
* Spec to declare the expected output for an RSocket request.
317+
* @since 5.2.2
318+
*/
319+
interface RetrieveSpec {
289320

290321
/**
291322
* Perform a {@link RSocket#fireAndForget fireAndForget}.
@@ -330,26 +361,4 @@ interface RequestSpec extends MetadataSpec<RequestSpec> {
330361
<T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef);
331362
}
332363

333-
/**
334-
* Spec for specifying the metadata.
335-
*
336-
* @param <S> a self reference to the spec type
337-
*/
338-
interface MetadataSpec<S extends MetadataSpec<S>> {
339-
340-
/**
341-
* Use this to append additional metadata entries when using composite
342-
* metadata. An {@link IllegalArgumentException} is raised if this
343-
* method is used when not using composite metadata.
344-
* The metadata value be a concrete value or any producer of a single
345-
* value that can be adapted to a {@link Publisher} via
346-
* {@link ReactiveAdapterRegistry}.
347-
* @param metadata an Object to be encoded with a suitable
348-
* {@link org.springframework.core.codec.Encoder Encoder}, or a
349-
* {@link org.springframework.core.io.buffer.DataBuffer DataBuffer}
350-
* @param mimeType the mime type that describes the metadata
351-
*/
352-
S metadata(Object metadata, MimeType mimeType);
353-
}
354-
355364
}

spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ suspend fun RSocketRequester.Builder.connectWebSocketAndAwait(uri: URI): RSocket
6565
* @author Sebastien Deleuze
6666
* @since 5.2
6767
*/
68-
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(producer: Any): RSocketRequester.RequestSpec =
68+
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(producer: Any): RSocketRequester.RetrieveSpec =
6969
data(producer, object : ParameterizedTypeReference<T>() {})
7070

7171
/**
@@ -77,7 +77,7 @@ inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(producer:
7777
* @author Sebastien Deleuze
7878
* @since 5.2
7979
*/
80-
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(publisher: Publisher<T>): RSocketRequester.RequestSpec =
80+
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(publisher: Publisher<T>): RSocketRequester.RetrieveSpec =
8181
data(publisher, object : ParameterizedTypeReference<T>() {})
8282

8383
/**
@@ -89,7 +89,7 @@ inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(publisher
8989
* @author Sebastien Deleuze
9090
* @since 5.2
9191
*/
92-
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(flow: Flow<T>): RSocketRequester.RequestSpec =
92+
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(flow: Flow<T>): RSocketRequester.RetrieveSpec =
9393
data(flow, object : ParameterizedTypeReference<T>() {})
9494

9595

spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
4141
import org.springframework.lang.Nullable;
4242
import org.springframework.messaging.rsocket.RSocketRequester.RequestSpec;
43+
import org.springframework.messaging.rsocket.RSocketRequester.RetrieveSpec;
4344
import org.springframework.util.MimeType;
4445
import org.springframework.util.MimeTypeUtils;
4546

@@ -90,7 +91,7 @@ public void sendMono() {
9091
testSendMono(spec -> spec.data(Mono.delay(MILLIS_10).then(), Void.class), "");
9192
}
9293

93-
private void testSendMono(Function<RequestSpec, RequestSpec> mapper, String expectedValue) {
94+
private void testSendMono(Function<RequestSpec, RetrieveSpec> mapper, String expectedValue) {
9495
mapper.apply(this.requester.route("toA")).send().block(Duration.ofSeconds(5));
9596

9697
assertThat(this.rsocket.getSavedMethodName()).isEqualTo("fireAndForget");
@@ -114,7 +115,7 @@ public void sendFlux() {
114115
testSendFlux(spec -> spec.data(stringFlux.cast(Object.class), Object.class), values);
115116
}
116117

117-
private void testSendFlux(Function<RequestSpec, RequestSpec> mapper, String... expectedValues) {
118+
private void testSendFlux(Function<RequestSpec, RetrieveSpec> mapper, String... expectedValues) {
118119
this.rsocket.reset();
119120
mapper.apply(this.requester.route("toA")).retrieveFlux(String.class).blockLast(Duration.ofSeconds(5));
120121

0 commit comments

Comments
 (0)