Skip to content

Commit 36510cf

Browse files
committed
Server adapters release buffers on error/cancel
Review and update Servlet and Undertow adapters to release any data buffers they be holding on to at the time of error or cancellation. Also remove onDiscard hooks from Reactor and Undertow request body. For Reactor we expect it to be handled. For Undertow there isn't any Reactor Core upstream for the callback to be useful. Issue: SPR-17410
1 parent d492d7b commit 36510cf

File tree

9 files changed

+387
-37
lines changed

9 files changed

+387
-37
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,14 @@ public final void onError(Throwable ex) {
130130
*/
131131
protected abstract void readingPaused();
132132

133+
/**
134+
* Invoked after an I/O read error from the underlying server or after a
135+
* cancellation signal from the downstream consumer to allow sub-classes
136+
* to discard any current cached data they might have.
137+
* @since 5.1.2
138+
*/
139+
protected abstract void discardData();
140+
133141

134142
// Private methods for use in State...
135143

@@ -382,7 +390,10 @@ <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
382390
}
383391

384392
<T> void cancel(AbstractListenerReadPublisher<T> publisher) {
385-
if (!publisher.changeState(this, COMPLETED)) {
393+
if (publisher.changeState(this, COMPLETED)) {
394+
publisher.discardData();
395+
}
396+
else {
386397
publisher.state.get().cancel(publisher);
387398
}
388399
}
@@ -405,6 +416,7 @@ <T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
405416

406417
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) {
407418
if (publisher.changeState(this, COMPLETED)) {
419+
publisher.discardData();
408420
Subscriber<? super T> s = publisher.subscriber;
409421
if (s != null) {
410422
s.onError(t);

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,9 @@ public void cancel() {
118118

119119
@Override
120120
public final void subscribe(Subscriber<? super Void> subscriber) {
121+
// Technically, cancellation from the result subscriber should be propagated
122+
// to the upstream subscription. In practice, HttpHandler server adapters
123+
// don't have a reason to cancel the result subscription.
121124
this.resultPublisher.subscribe(subscriber);
122125
}
123126

@@ -136,8 +139,14 @@ public final void subscribe(Subscriber<? super Void> subscriber) {
136139
* data item for writing once that is possible.
137140
*/
138141
protected void dataReceived(T data) {
139-
if (this.currentData != null) {
140-
throw new IllegalStateException("Current data not processed yet: " + this.currentData);
142+
T prev = this.currentData;
143+
if (prev != null) {
144+
// This shouldn't happen:
145+
// 1. dataReceived can only be called from REQUESTED state
146+
// 2. currentData is cleared before requesting
147+
discardData(data);
148+
cancel();
149+
onError(new IllegalStateException("Received new data while current not processed yet."));
141150
}
142151
this.currentData = data;
143152
}
@@ -186,6 +195,16 @@ protected void writingComplete() {
186195
protected void writingFailed(Throwable ex) {
187196
}
188197

198+
/**
199+
* Invoked after any error (either from the upstream write Publisher, or
200+
* from I/O operations to the underlying server) and cancellation
201+
* to discard in-flight data that was in
202+
* the process of being written when the error took place.
203+
* @param data the data to be released
204+
* @since 5.1.2
205+
*/
206+
protected abstract void discardData(T data);
207+
189208

190209
// Private methods for use from State's...
191210

@@ -205,6 +224,7 @@ private void changeStateToReceived(State oldState) {
205224

206225
private void changeStateToComplete(State oldState) {
207226
if (changeState(oldState, State.COMPLETED)) {
227+
discardCurrentData();
208228
writingComplete();
209229
this.resultPublisher.publishComplete();
210230
}
@@ -223,6 +243,14 @@ private void writeIfPossible() {
223243
}
224244
}
225245

246+
private void discardCurrentData() {
247+
T data = this.currentData;
248+
this.currentData = null;
249+
if (data != null) {
250+
discardData(data);
251+
}
252+
}
253+
226254

227255
/**
228256
* Represents a state for the {@link Processor} to be in.
@@ -338,11 +366,14 @@ public <T> void onSubscribe(AbstractListenerWriteProcessor<T> processor, Subscri
338366
}
339367

340368
public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) {
341-
throw new IllegalStateException(toString());
369+
processor.discardData(data);
370+
processor.cancel();
371+
processor.onError(new IllegalStateException("Illegal onNext without demand"));
342372
}
343373

344374
public <T> void onError(AbstractListenerWriteProcessor<T> processor, Throwable ex) {
345375
if (processor.changeState(this, COMPLETED)) {
376+
processor.discardCurrentData();
346377
processor.writingComplete();
347378
processor.resultPublisher.publishError(ex);
348379
}

spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,11 @@ protected void readingPaused() {
290290
// no-op
291291
}
292292

293+
@Override
294+
protected void discardData() {
295+
// Nothing to discard since we pass data buffers on immediately..
296+
}
297+
293298

294299
private class RequestBodyPublisherReadListener implements ReadListener {
295300

spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ protected boolean write(DataBuffer dataBuffer) throws IOException {
318318
}
319319
int remaining = dataBuffer.readableByteCount();
320320
if (ready && remaining > 0) {
321+
// In case of IOException, onError handling should call discardData(DataBuffer)..
321322
int written = writeToOutputStream(dataBuffer);
322323
if (this.logger.isTraceEnabled()) {
323324
this.logger.trace("written: " + written + " total: " + remaining);
@@ -337,6 +338,11 @@ protected boolean write(DataBuffer dataBuffer) throws IOException {
337338
protected void writingComplete() {
338339
bodyProcessor = null;
339340
}
341+
342+
@Override
343+
protected void discardData(DataBuffer dataBuffer) {
344+
DataBufferUtils.release(dataBuffer);
345+
}
340346
}
341347

342348
}

spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,10 @@ else if (read == -1) {
194194
}
195195
}
196196

197+
@Override
198+
protected void discardData() {
199+
// Nothing to discard since we pass data buffers on immediately..
200+
}
197201
}
198202

199203
private static class UndertowDataBuffer implements PooledDataBuffer {

spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ protected boolean write(DataBuffer dataBuffer) throws IOException {
174174
// Track write listener calls from here on..
175175
this.writePossible = false;
176176

177+
// In case of IOException, onError handling should call discardData(DataBuffer)..
177178
int total = buffer.remaining();
178179
int written = writeByteBuffer(buffer);
179180

@@ -228,6 +229,11 @@ protected void writingFailed(Throwable ex) {
228229
cancel();
229230
onError(ex);
230231
}
232+
233+
@Override
234+
protected void discardData(DataBuffer dataBuffer) {
235+
DataBufferUtils.release(dataBuffer);
236+
}
231237
}
232238

233239

spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java

Lines changed: 93 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,54 +16,90 @@
1616

1717
package org.springframework.http.server.reactive;
1818

19-
import java.io.IOException;
20-
19+
import org.junit.Before;
2120
import org.junit.Test;
22-
import org.mockito.invocation.InvocationOnMock;
23-
import org.mockito.stubbing.Answer;
2421
import org.reactivestreams.Subscriber;
2522
import org.reactivestreams.Subscription;
2623

2724
import org.springframework.core.io.buffer.DataBuffer;
2825

29-
import static org.mockito.Mockito.doAnswer;
30-
import static org.mockito.Mockito.isA;
31-
import static org.mockito.Mockito.mock;
32-
import static org.junit.Assert.assertTrue;
26+
import static org.junit.Assert.*;
27+
import static org.mockito.Mockito.*;
3328

3429
/**
35-
* Unit tests for {@link AbstractListenerReadPublisher}
36-
*
30+
* Unit tests for {@link AbstractListenerReadPublisher}.
31+
*
3732
* @author Violeta Georgieva
38-
* @since 5.0
33+
* @author Rossen Stoyanchev
3934
*/
4035
public class ListenerReadPublisherTests {
4136

37+
private final TestListenerReadPublisher publisher = new TestListenerReadPublisher();
38+
39+
private final TestSubscriber subscriber = new TestSubscriber();
40+
41+
42+
@Before
43+
public void setup() {
44+
this.publisher.subscribe(this.subscriber);
45+
}
46+
47+
4248
@Test
43-
@SuppressWarnings("unchecked")
44-
public void testReceiveTwoRequestCallsWhenOnSubscribe() {
45-
Subscriber<DataBuffer> subscriber = mock(Subscriber.class);
46-
doAnswer(new SubscriptionAnswer()).when(subscriber).onSubscribe(isA(Subscription.class));
49+
public void twoReads() {
50+
51+
this.subscriber.getSubscription().request(2);
52+
this.publisher.onDataAvailable();
53+
54+
assertEquals(2, this.publisher.getReadCalls());
55+
}
56+
57+
@Test // SPR-17410
58+
public void discardDataOnError() {
4759

48-
TestListenerReadPublisher publisher = new TestListenerReadPublisher();
49-
publisher.subscribe(subscriber);
50-
publisher.onDataAvailable();
60+
this.subscriber.getSubscription().request(2);
61+
this.publisher.onDataAvailable();
62+
this.publisher.onError(new IllegalStateException());
5163

52-
assertTrue(publisher.getReadCalls() == 2);
64+
assertEquals(2, this.publisher.getReadCalls());
65+
assertEquals(1, this.publisher.getDiscardCalls());
5366
}
5467

55-
private static final class TestListenerReadPublisher extends AbstractListenerReadPublisher {
68+
@Test // SPR-17410
69+
public void discardDataOnCancel() {
70+
71+
this.subscriber.getSubscription().request(2);
72+
this.subscriber.setCancelOnNext(true);
73+
this.publisher.onDataAvailable();
74+
75+
assertEquals(1, this.publisher.getReadCalls());
76+
assertEquals(1, this.publisher.getDiscardCalls());
77+
}
78+
79+
80+
private static final class TestListenerReadPublisher extends AbstractListenerReadPublisher<DataBuffer> {
5681

5782
private int readCalls = 0;
5883

84+
private int discardCalls = 0;
85+
86+
87+
public int getReadCalls() {
88+
return this.readCalls;
89+
}
90+
91+
public int getDiscardCalls() {
92+
return this.discardCalls;
93+
}
94+
5995
@Override
6096
protected void checkOnDataAvailable() {
6197
// no-op
6298
}
6399

64100
@Override
65-
protected DataBuffer read() throws IOException {
66-
readCalls++;
101+
protected DataBuffer read() {
102+
this.readCalls++;
67103
return mock(DataBuffer.class);
68104
}
69105

@@ -72,22 +108,48 @@ protected void readingPaused() {
72108
// No-op
73109
}
74110

75-
public int getReadCalls() {
76-
return this.readCalls;
111+
@Override
112+
protected void discardData() {
113+
this.discardCalls++;
77114
}
78-
79115
}
80116

81-
private static final class SubscriptionAnswer implements Answer<Subscription> {
117+
118+
private static final class TestSubscriber implements Subscriber<DataBuffer> {
119+
120+
private Subscription subscription;
121+
122+
private boolean cancelOnNext;
123+
124+
125+
public Subscription getSubscription() {
126+
return this.subscription;
127+
}
128+
129+
public void setCancelOnNext(boolean cancelOnNext) {
130+
this.cancelOnNext = cancelOnNext;
131+
}
132+
133+
134+
@Override
135+
public void onSubscribe(Subscription subscription) {
136+
this.subscription = subscription;
137+
}
82138

83139
@Override
84-
public Subscription answer(InvocationOnMock invocation) throws Throwable {
85-
Subscription arg = (Subscription) invocation.getArguments()[0];
86-
arg.request(1);
87-
arg.request(1);
88-
return arg;
140+
public void onNext(DataBuffer dataBuffer) {
141+
if (this.cancelOnNext) {
142+
this.subscription.cancel();
143+
}
89144
}
90145

146+
@Override
147+
public void onError(Throwable t) {
148+
}
149+
150+
@Override
151+
public void onComplete() {
152+
}
91153
}
92154

93155
}

0 commit comments

Comments
 (0)