Skip to content

Commit 862dd23

Browse files
committed
Server adapters release buffers on error/cancel
Review and update Servlet and Undertow adapters to release any data buffers they be holding on to at the time of error or cancellation. Also remove onDiscard hooks from Reactor and Undertow request body. For Reactor we expect it to be handled. For Undertow there isn't any Reactor Core upstream for the callback to be useful. Issue: SPR-17410
1 parent 149d416 commit 862dd23

File tree

10 files changed

+387
-40
lines changed

10 files changed

+387
-40
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,14 @@ public final void onError(Throwable ex) {
163163
*/
164164
protected abstract void readingPaused();
165165

166+
/**
167+
* Invoked after an I/O read error from the underlying server or after a
168+
* cancellation signal from the downstream consumer to allow sub-classes
169+
* to discard any current cached data they might have.
170+
* @since 5.1.2
171+
*/
172+
protected abstract void discardData();
173+
166174

167175
// Private methods for use in State...
168176

@@ -416,7 +424,10 @@ <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
416424
}
417425

418426
<T> void cancel(AbstractListenerReadPublisher<T> publisher) {
419-
if (!publisher.changeState(this, COMPLETED)) {
427+
if (publisher.changeState(this, COMPLETED)) {
428+
publisher.discardData();
429+
}
430+
else {
420431
publisher.state.get().cancel(publisher);
421432
}
422433
}
@@ -439,6 +450,7 @@ <T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
439450

440451
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) {
441452
if (publisher.changeState(this, COMPLETED)) {
453+
publisher.discardData();
442454
Subscriber<? super T> s = publisher.subscriber;
443455
if (s != null) {
444456
s.onError(t);

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ public void cancel() {
158158

159159
@Override
160160
public final void subscribe(Subscriber<? super Void> subscriber) {
161+
// Technically, cancellation from the result subscriber should be propagated
162+
// to the upstream subscription. In practice, HttpHandler server adapters
163+
// don't have a reason to cancel the result subscription.
161164
this.resultPublisher.subscribe(subscriber);
162165
}
163166

@@ -176,8 +179,14 @@ public final void subscribe(Subscriber<? super Void> subscriber) {
176179
* data item for writing once that is possible.
177180
*/
178181
protected void dataReceived(T data) {
179-
if (this.currentData != null) {
180-
throw new IllegalStateException("Current data not processed yet: " + this.currentData);
182+
T prev = this.currentData;
183+
if (prev != null) {
184+
// This shouldn't happen:
185+
// 1. dataReceived can only be called from REQUESTED state
186+
// 2. currentData is cleared before requesting
187+
discardData(data);
188+
cancel();
189+
onError(new IllegalStateException("Received new data while current not processed yet."));
181190
}
182191
this.currentData = data;
183192
}
@@ -226,6 +235,16 @@ protected void writingComplete() {
226235
protected void writingFailed(Throwable ex) {
227236
}
228237

238+
/**
239+
* Invoked after any error (either from the upstream write Publisher, or
240+
* from I/O operations to the underlying server) and cancellation
241+
* to discard in-flight data that was in
242+
* the process of being written when the error took place.
243+
* @param data the data to be released
244+
* @since 5.1.2
245+
*/
246+
protected abstract void discardData(T data);
247+
229248

230249
// Private methods for use from State's...
231250

@@ -245,6 +264,7 @@ private void changeStateToReceived(State oldState) {
245264

246265
private void changeStateToComplete(State oldState) {
247266
if (changeState(oldState, State.COMPLETED)) {
267+
discardCurrentData();
248268
writingComplete();
249269
this.resultPublisher.publishComplete();
250270
}
@@ -263,6 +283,14 @@ private void writeIfPossible() {
263283
}
264284
}
265285

286+
private void discardCurrentData() {
287+
T data = this.currentData;
288+
this.currentData = null;
289+
if (data != null) {
290+
discardData(data);
291+
}
292+
}
293+
266294

267295
/**
268296
* Represents a state for the {@link Processor} to be in.
@@ -378,11 +406,14 @@ public <T> void onSubscribe(AbstractListenerWriteProcessor<T> processor, Subscri
378406
}
379407

380408
public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) {
381-
throw new IllegalStateException(toString());
409+
processor.discardData(data);
410+
processor.cancel();
411+
processor.onError(new IllegalStateException("Illegal onNext without demand"));
382412
}
383413

384414
public <T> void onError(AbstractListenerWriteProcessor<T> processor, Throwable ex) {
385415
if (processor.changeState(this, COMPLETED)) {
416+
processor.discardCurrentData();
386417
processor.writingComplete();
387418
processor.resultPublisher.publishError(ex);
388419
}

spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@
2929
import reactor.netty.http.server.HttpServerRequest;
3030

3131
import org.springframework.core.io.buffer.DataBuffer;
32-
import org.springframework.core.io.buffer.DataBufferUtils;
3332
import org.springframework.core.io.buffer.NettyDataBufferFactory;
34-
import org.springframework.core.io.buffer.PooledDataBuffer;
3533
import org.springframework.http.HttpCookie;
3634
import org.springframework.http.HttpHeaders;
3735
import org.springframework.lang.Nullable;
@@ -165,8 +163,7 @@ protected SslInfo initSslInfo() {
165163

166164
@Override
167165
public Flux<DataBuffer> getBody() {
168-
Flux<DataBuffer> body = this.request.receive().retain().map(this.bufferFactory::wrap);
169-
return body.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
166+
return this.request.receive().retain().map(this.bufferFactory::wrap);
170167
}
171168

172169
@SuppressWarnings("unchecked")

spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,11 @@ protected void readingPaused() {
302302
// no-op
303303
}
304304

305+
@Override
306+
protected void discardData() {
307+
// Nothing to discard since we pass data buffers on immediately..
308+
}
309+
305310

306311
private class RequestBodyPublisherReadListener implements ReadListener {
307312

spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ protected boolean write(DataBuffer dataBuffer) throws IOException {
334334
boolean ready = ServletServerHttpResponse.this.isWritePossible();
335335
int remaining = dataBuffer.readableByteCount();
336336
if (ready && remaining > 0) {
337+
// In case of IOException, onError handling should call discardData(DataBuffer)..
337338
int written = writeToOutputStream(dataBuffer);
338339
if (logger.isTraceEnabled()) {
339340
logger.trace(getLogPrefix() + "Wrote " + written + " of " + remaining + " bytes");
@@ -359,6 +360,11 @@ else if (rsWriteLogger.isTraceEnabled()) {
359360
protected void writingComplete() {
360361
bodyProcessor = null;
361362
}
363+
364+
@Override
365+
protected void discardData(DataBuffer dataBuffer) {
366+
DataBufferUtils.release(dataBuffer);
367+
}
362368
}
363369

364370
}

spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,7 @@ protected SslInfo initSslInfo() {
116116

117117
@Override
118118
public Flux<DataBuffer> getBody() {
119-
return Flux.from(this.body)
120-
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
119+
return Flux.from(this.body);
121120
}
122121

123122
@SuppressWarnings("unchecked")
@@ -201,6 +200,10 @@ else if (read == -1) {
201200
}
202201
}
203202

203+
@Override
204+
protected void discardData() {
205+
// Nothing to discard since we pass data buffers on immediately..
206+
}
204207
}
205208

206209
private static class UndertowDataBuffer implements PooledDataBuffer {

spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ protected boolean write(DataBuffer dataBuffer) throws IOException {
181181
// Track write listener calls from here on..
182182
this.writePossible = false;
183183

184+
// In case of IOException, onError handling should call discardData(DataBuffer)..
184185
int total = buffer.remaining();
185186
int written = writeByteBuffer(buffer);
186187

@@ -235,6 +236,11 @@ protected void writingFailed(Throwable ex) {
235236
cancel();
236237
onError(ex);
237238
}
239+
240+
@Override
241+
protected void discardData(DataBuffer dataBuffer) {
242+
DataBufferUtils.release(dataBuffer);
243+
}
238244
}
239245

240246

spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java

Lines changed: 93 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,58 +16,95 @@
1616

1717
package org.springframework.http.server.reactive;
1818

19-
import java.io.IOException;
20-
19+
import org.junit.Before;
2120
import org.junit.Test;
22-
import org.mockito.invocation.InvocationOnMock;
23-
import org.mockito.stubbing.Answer;
2421
import org.reactivestreams.Subscriber;
2522
import org.reactivestreams.Subscription;
2623

2724
import org.springframework.core.io.buffer.DataBuffer;
2825

29-
import static org.mockito.Mockito.doAnswer;
30-
import static org.mockito.Mockito.isA;
31-
import static org.mockito.Mockito.mock;
32-
import static org.junit.Assert.assertTrue;
26+
import static org.junit.Assert.*;
27+
import static org.mockito.Mockito.*;
3328

3429
/**
35-
* Unit tests for {@link AbstractListenerReadPublisher}
30+
* Unit tests for {@link AbstractListenerReadPublisher}.
3631
*
3732
* @author Violeta Georgieva
38-
* @since 5.0
33+
* @author Rossen Stoyanchev
3934
*/
4035
public class ListenerReadPublisherTests {
4136

37+
private final TestListenerReadPublisher publisher = new TestListenerReadPublisher();
38+
39+
private final TestSubscriber subscriber = new TestSubscriber();
40+
41+
42+
@Before
43+
public void setup() {
44+
this.publisher.subscribe(this.subscriber);
45+
}
46+
47+
4248
@Test
43-
@SuppressWarnings("unchecked")
44-
public void testReceiveTwoRequestCallsWhenOnSubscribe() {
45-
Subscriber<DataBuffer> subscriber = mock(Subscriber.class);
46-
doAnswer(new SubscriptionAnswer()).when(subscriber).onSubscribe(isA(Subscription.class));
49+
public void twoReads() {
50+
51+
this.subscriber.getSubscription().request(2);
52+
this.publisher.onDataAvailable();
53+
54+
assertEquals(2, this.publisher.getReadCalls());
55+
}
56+
57+
@Test // SPR-17410
58+
public void discardDataOnError() {
4759

48-
TestListenerReadPublisher publisher = new TestListenerReadPublisher();
49-
publisher.subscribe(subscriber);
50-
publisher.onDataAvailable();
60+
this.subscriber.getSubscription().request(2);
61+
this.publisher.onDataAvailable();
62+
this.publisher.onError(new IllegalStateException());
5163

52-
assertTrue(publisher.getReadCalls() == 2);
64+
assertEquals(2, this.publisher.getReadCalls());
65+
assertEquals(1, this.publisher.getDiscardCalls());
5366
}
5467

55-
private static final class TestListenerReadPublisher extends AbstractListenerReadPublisher {
68+
@Test // SPR-17410
69+
public void discardDataOnCancel() {
70+
71+
this.subscriber.getSubscription().request(2);
72+
this.subscriber.setCancelOnNext(true);
73+
this.publisher.onDataAvailable();
74+
75+
assertEquals(1, this.publisher.getReadCalls());
76+
assertEquals(1, this.publisher.getDiscardCalls());
77+
}
78+
79+
80+
private static final class TestListenerReadPublisher extends AbstractListenerReadPublisher<DataBuffer> {
5681

5782
private int readCalls = 0;
5883

84+
private int discardCalls = 0;
85+
86+
5987
public TestListenerReadPublisher() {
6088
super("");
6189
}
6290

91+
92+
public int getReadCalls() {
93+
return this.readCalls;
94+
}
95+
96+
public int getDiscardCalls() {
97+
return this.discardCalls;
98+
}
99+
63100
@Override
64101
protected void checkOnDataAvailable() {
65102
// no-op
66103
}
67104

68105
@Override
69-
protected DataBuffer read() throws IOException {
70-
readCalls++;
106+
protected DataBuffer read() {
107+
this.readCalls++;
71108
return mock(DataBuffer.class);
72109
}
73110

@@ -76,22 +113,48 @@ protected void readingPaused() {
76113
// No-op
77114
}
78115

79-
public int getReadCalls() {
80-
return this.readCalls;
116+
@Override
117+
protected void discardData() {
118+
this.discardCalls++;
81119
}
82-
83120
}
84121

85-
private static final class SubscriptionAnswer implements Answer<Subscription> {
122+
123+
private static final class TestSubscriber implements Subscriber<DataBuffer> {
124+
125+
private Subscription subscription;
126+
127+
private boolean cancelOnNext;
128+
129+
130+
public Subscription getSubscription() {
131+
return this.subscription;
132+
}
133+
134+
public void setCancelOnNext(boolean cancelOnNext) {
135+
this.cancelOnNext = cancelOnNext;
136+
}
137+
138+
139+
@Override
140+
public void onSubscribe(Subscription subscription) {
141+
this.subscription = subscription;
142+
}
86143

87144
@Override
88-
public Subscription answer(InvocationOnMock invocation) throws Throwable {
89-
Subscription arg = (Subscription) invocation.getArguments()[0];
90-
arg.request(1);
91-
arg.request(1);
92-
return arg;
145+
public void onNext(DataBuffer dataBuffer) {
146+
if (this.cancelOnNext) {
147+
this.subscription.cancel();
148+
}
93149
}
94150

151+
@Override
152+
public void onError(Throwable t) {
153+
}
154+
155+
@Override
156+
public void onComplete() {
157+
}
95158
}
96159

97160
}

0 commit comments

Comments
 (0)