Skip to content

Commit 809851c

Browse files
committed
Replace MonoProcessor with AtomicRef for RSocket RESPONSE_HEADER
See gh-25884
1 parent 5b1b20c commit 809851c

File tree

2 files changed

+24
-27
lines changed

2 files changed

+24
-27
lines changed

spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/MessagingRSocket.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import java.util.Map;
2020
import java.util.concurrent.atomic.AtomicBoolean;
21-
import java.util.function.Function;
21+
import java.util.concurrent.atomic.AtomicReference;
2222

2323
import io.rsocket.ConnectionSetupPayload;
2424
import io.rsocket.Payload;
@@ -162,8 +162,8 @@ private int refCount(DataBuffer dataBuffer) {
162162

163163
@SuppressWarnings("deprecation")
164164
private Flux<Payload> handleAndReply(Payload firstPayload, FrameType frameType, Flux<Payload> payloads) {
165-
reactor.core.publisher.MonoProcessor<Flux<Payload>> replyMono = reactor.core.publisher.MonoProcessor.create();
166-
MessageHeaders headers = createHeaders(firstPayload, frameType, replyMono);
165+
AtomicReference<Flux<Payload>> responseRef = new AtomicReference<>();
166+
MessageHeaders headers = createHeaders(firstPayload, frameType, responseRef);
167167

168168
AtomicBoolean read = new AtomicBoolean();
169169
Flux<DataBuffer> buffers = payloads.map(this::retainDataAndReleasePayload).doOnSubscribe(s -> read.set(true));
@@ -176,18 +176,17 @@ private Flux<Payload> handleAndReply(Payload firstPayload, FrameType frameType,
176176
firstPayload.release();
177177
}
178178
})
179-
.thenMany(Flux.defer(() -> replyMono.isTerminated() ?
180-
replyMono.flatMapMany(Function.identity()) :
179+
.thenMany(Flux.defer(() -> responseRef.get() != null ?
180+
responseRef.get() :
181181
Mono.error(new IllegalStateException("Something went wrong: reply Mono not set"))));
182182
}
183183

184184
private DataBuffer retainDataAndReleasePayload(Payload payload) {
185185
return PayloadUtils.retainDataAndReleasePayload(payload, this.strategies.dataBufferFactory());
186186
}
187187

188-
@SuppressWarnings("deprecation")
189-
private MessageHeaders createHeaders(Payload payload, FrameType frameType,
190-
@Nullable reactor.core.publisher.MonoProcessor<?> replyMono) {
188+
private MessageHeaders createHeaders(
189+
Payload payload, FrameType frameType, @Nullable AtomicReference<Flux<Payload>> responseRef) {
191190

192191
MessageHeaderAccessor headers = new MessageHeaderAccessor();
193192
headers.setLeaveMutable(true);
@@ -208,8 +207,8 @@ private MessageHeaders createHeaders(Payload payload, FrameType frameType,
208207
headers.setContentType(this.dataMimeType);
209208
headers.setHeader(RSocketFrameTypeMessageCondition.FRAME_TYPE_HEADER, frameType);
210209
headers.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, this.requester);
211-
if (replyMono != null) {
212-
headers.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER, replyMono);
210+
if (responseRef != null) {
211+
headers.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER, responseRef);
213212
}
214213
headers.setHeader(HandlerMethodReturnValueHandler.DATA_BUFFER_FACTORY_HEADER,
215214
this.strategies.dataBufferFactory());

spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketPayloadReturnValueHandler.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.messaging.rsocket.annotation.support;
1818

1919
import java.util.List;
20+
import java.util.concurrent.atomic.AtomicReference;
2021

2122
import io.rsocket.Payload;
2223
import reactor.core.publisher.Flux;
@@ -35,16 +36,16 @@
3536
/**
3637
* Extension of {@link AbstractEncoderMethodReturnValueHandler} that
3738
* {@link #handleEncodedContent handles} encoded content by wrapping data buffers
38-
* as RSocket payloads and by passing those to the {@link reactor.core.publisher.MonoProcessor}
39-
* from the {@link #RESPONSE_HEADER} header.
39+
* as RSocket payloads and by passing those through the {@link #RESPONSE_HEADER}
40+
* header.
4041
*
4142
* @author Rossen Stoyanchev
4243
* @since 5.2
4344
*/
4445
public class RSocketPayloadReturnValueHandler extends AbstractEncoderMethodReturnValueHandler {
4546

4647
/**
47-
* Message header name that is expected to have a {@link reactor.core.publisher.MonoProcessor}
48+
* Message header name that is expected to have an {@link java.util.concurrent.atomic.AtomicReference}
4849
* which will receive the {@code Flux<Payload>} that represents the response.
4950
*/
5051
public static final String RESPONSE_HEADER = "rsocketResponse";
@@ -56,33 +57,30 @@ public RSocketPayloadReturnValueHandler(List<Encoder<?>> encoders, ReactiveAdapt
5657

5758

5859
@Override
59-
@SuppressWarnings({"unchecked", "deprecation"})
6060
protected Mono<Void> handleEncodedContent(
6161
Flux<DataBuffer> encodedContent, MethodParameter returnType, Message<?> message) {
6262

63-
reactor.core.publisher.MonoProcessor<Flux<Payload>> replyMono = getReplyMono(message);
64-
Assert.notNull(replyMono, "Missing '" + RESPONSE_HEADER + "'");
65-
replyMono.onNext(encodedContent.map(PayloadUtils::createPayload));
66-
replyMono.onComplete();
63+
AtomicReference<Flux<Payload>> responseRef = getResponseReference(message);
64+
Assert.notNull(responseRef, "Missing '" + RESPONSE_HEADER + "'");
65+
responseRef.set(encodedContent.map(PayloadUtils::createPayload));
6766
return Mono.empty();
6867
}
6968

7069
@Override
71-
@SuppressWarnings("deprecation")
7270
protected Mono<Void> handleNoContent(MethodParameter returnType, Message<?> message) {
73-
reactor.core.publisher.MonoProcessor<Flux<Payload>> replyMono = getReplyMono(message);
74-
if (replyMono != null) {
75-
replyMono.onComplete();
71+
AtomicReference<Flux<Payload>> responseRef = getResponseReference(message);
72+
if (responseRef != null) {
73+
responseRef.set(Flux.empty());
7674
}
7775
return Mono.empty();
7876
}
7977

8078
@Nullable
81-
@SuppressWarnings({"unchecked", "deprecation"})
82-
private reactor.core.publisher.MonoProcessor<Flux<Payload>> getReplyMono(Message<?> message) {
79+
@SuppressWarnings("unchecked")
80+
private AtomicReference<Flux<Payload>> getResponseReference(Message<?> message) {
8381
Object headerValue = message.getHeaders().get(RESPONSE_HEADER);
84-
Assert.state(headerValue == null || headerValue instanceof reactor.core.publisher.MonoProcessor, "Expected MonoProcessor");
85-
return (reactor.core.publisher.MonoProcessor<Flux<Payload>>) headerValue;
82+
Assert.state(headerValue == null || headerValue instanceof AtomicReference, "Expected AtomicReference");
83+
return (AtomicReference<Flux<Payload>>) headerValue;
8684
}
8785

8886
}

0 commit comments

Comments
 (0)