21
21
import java .util .function .BiFunction ;
22
22
23
23
import io .netty .buffer .ByteBufAllocator ;
24
+ import org .apache .commons .logging .Log ;
25
+ import org .apache .commons .logging .LogFactory ;
24
26
import reactor .core .publisher .Flux ;
25
27
import reactor .netty .Connection ;
26
28
import reactor .netty .NettyInbound ;
29
31
import org .springframework .core .io .buffer .DataBuffer ;
30
32
import org .springframework .core .io .buffer .NettyDataBufferFactory ;
31
33
import org .springframework .http .HttpHeaders ;
34
+ import org .springframework .http .HttpMethod ;
32
35
import org .springframework .http .HttpStatus ;
33
36
import org .springframework .http .ResponseCookie ;
34
37
import org .springframework .lang .Nullable ;
35
- import org .springframework .util .Assert ;
36
38
import org .springframework .util .CollectionUtils ;
37
39
import org .springframework .util .LinkedMultiValueMap ;
38
40
import org .springframework .util .MultiValueMap ;
46
48
*/
47
49
class ReactorClientHttpResponse implements ClientHttpResponse {
48
50
51
+ private static final Log logger = LogFactory .getLog (ReactorClientHttpResponse .class );
52
+
49
53
private final NettyDataBufferFactory bufferFactory ;
50
54
51
55
private final HttpClientResponse response ;
52
56
53
57
private final NettyInbound inbound ;
54
58
55
59
@ Nullable
56
- private final Connection connection ;
60
+ private final String logPrefix ;
57
61
58
- // 0 - not subscribed, 1 - subscribed, 2 - cancelled
62
+ // 0 - not subscribed, 1 - subscribed, 2 - cancelled, 3 - cancelled via connector (before subscribe)
59
63
private final AtomicInteger state = new AtomicInteger (0 );
60
64
61
65
@@ -68,7 +72,7 @@ public ReactorClientHttpResponse(HttpClientResponse response, Connection connect
68
72
this .response = response ;
69
73
this .inbound = connection .inbound ();
70
74
this .bufferFactory = new NettyDataBufferFactory (connection .outbound ().alloc ());
71
- this .connection = connection ;
75
+ this .logPrefix = ( logger . isDebugEnabled () ? "[" + connection . channel (). id (). asShortText () + "] " : "" ) ;
72
76
}
73
77
74
78
/**
@@ -80,22 +84,28 @@ public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbou
80
84
this .response = response ;
81
85
this .inbound = inbound ;
82
86
this .bufferFactory = new NettyDataBufferFactory (alloc );
83
- this .connection = null ;
87
+ this .logPrefix = "" ;
84
88
}
85
89
86
90
87
91
@ Override
88
92
public Flux <DataBuffer > getBody () {
89
93
return this .inbound .receive ()
90
94
.doOnSubscribe (s -> {
91
- if (!this .state .compareAndSet (0 , 1 )) {
92
- // https://github.com/reactor/reactor-netty/issues/503
93
- // FluxReceive rejects multiple subscribers, but not after a cancel().
94
- // Subsequent subscribers after cancel() will not be rejected, but will hang instead.
95
- // So we need to reject once in cancelled state.
96
- if (this .state .get () == 2 ) {
97
- throw new IllegalStateException ("The client response body can only be consumed once." );
98
- }
95
+ if (this .state .compareAndSet (0 , 1 )) {
96
+ return ;
97
+ }
98
+ // https://github.com/reactor/reactor-netty/issues/503
99
+ // FluxReceive rejects multiple subscribers, but not after a cancel().
100
+ // Subsequent subscribers after cancel() will not be rejected, but will hang instead.
101
+ // So we need to reject once in cancelled state.
102
+ if (this .state .get () == 2 ) {
103
+ throw new IllegalStateException (
104
+ "The client response body can only be consumed once." );
105
+ }
106
+ else if (this .state .get () == 3 ) {
107
+ throw new IllegalStateException (
108
+ "The client response body has been released already due to cancellation." );
99
109
}
100
110
})
101
111
.doOnCancel (() -> this .state .compareAndSet (1 , 2 ))
@@ -127,6 +137,7 @@ public MultiValueMap<String, ResponseCookie> getCookies() {
127
137
MultiValueMap <String , ResponseCookie > result = new LinkedMultiValueMap <>();
128
138
this .response .cookies ().values ().stream ().flatMap (Collection ::stream )
129
139
.forEach (c ->
140
+
130
141
result .add (c .name (), ResponseCookie .fromClientResponse (c .name (), c .value ())
131
142
.domain (c .domain ())
132
143
.path (c .path ())
@@ -138,18 +149,25 @@ public MultiValueMap<String, ResponseCookie> getCookies() {
138
149
}
139
150
140
151
/**
141
- * For use by {@link ReactorClientHttpConnector}.
152
+ * Called by {@link ReactorClientHttpConnector} when a cancellation is detected
153
+ * but the content has not been subscribed to. If the subscription never
154
+ * materializes then the content will remain not drained. Or it could still
155
+ * materialize if the cancellation happened very early, or the response
156
+ * reading was delayed for some reason.
142
157
*/
143
- boolean bodyNotSubscribed () {
144
- return this .state .get () == 0 ;
158
+ void releaseAfterCancel (HttpMethod method ) {
159
+ if (mayHaveBody (method ) && this .state .compareAndSet (0 , 3 )) {
160
+ if (logger .isDebugEnabled ()) {
161
+ logger .debug (this .logPrefix + "Releasing body, not yet subscribed." );
162
+ }
163
+ this .inbound .receive ().doOnNext (byteBuf -> {}).subscribe (byteBuf -> {}, ex -> {});
164
+ }
145
165
}
146
166
147
- /**
148
- * For use by {@link ReactorClientHttpConnector}.
149
- */
150
- Connection getConnection () {
151
- Assert .notNull (this .connection , "Constructor with connection wasn't used" );
152
- return this .connection ;
167
+ private boolean mayHaveBody (HttpMethod method ) {
168
+ int code = this .getRawStatusCode ();
169
+ return !((code >= 100 && code < 200 ) || code == 204 || code == 205 ||
170
+ method .equals (HttpMethod .HEAD ) || getHeaders ().getContentLength () == 0 );
153
171
}
154
172
155
173
@ Override
0 commit comments