Skip to content

Commit dd22b8f

Browse files
committed
Fix race condition with onCompletion/onError
Closes gh-23096
1 parent 8189c90 commit dd22b8f

File tree

3 files changed

+50
-27
lines changed

3 files changed

+50
-27
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java

+21-14
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -283,19 +283,7 @@ <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? supe
283283
publisher.subscriber = subscriber;
284284
subscriber.onSubscribe(subscription);
285285
publisher.changeState(SUBSCRIBING, NO_DEMAND);
286-
// Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND
287-
String logPrefix = publisher.getLogPrefix();
288-
if (publisher.completionBeforeDemand) {
289-
rsReadLogger.trace(logPrefix + "Completed before demand");
290-
publisher.state.get().onAllDataRead(publisher);
291-
}
292-
Throwable ex = publisher.errorBeforeDemand;
293-
if (ex != null) {
294-
if (rsReadLogger.isTraceEnabled()) {
295-
rsReadLogger.trace(logPrefix + "Completed with error before demand: " + ex);
296-
}
297-
publisher.state.get().onError(publisher, ex);
298-
}
286+
handleCompletionOrErrorBeforeDemand(publisher);
299287
}
300288
else {
301289
throw new IllegalStateException("Failed to transition to SUBSCRIBING, " +
@@ -306,11 +294,30 @@ <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? supe
306294
@Override
307295
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
308296
publisher.completionBeforeDemand = true;
297+
handleCompletionOrErrorBeforeDemand(publisher);
309298
}
310299

311300
@Override
312301
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
313302
publisher.errorBeforeDemand = ex;
303+
handleCompletionOrErrorBeforeDemand(publisher);
304+
}
305+
306+
private <T> void handleCompletionOrErrorBeforeDemand(AbstractListenerReadPublisher<T> publisher) {
307+
if (publisher.state.get().equals(NO_DEMAND)) {
308+
if (publisher.completionBeforeDemand) {
309+
rsReadLogger.trace(publisher.getLogPrefix() + "Completed before demand");
310+
publisher.state.get().onAllDataRead(publisher);
311+
}
312+
Throwable ex = publisher.errorBeforeDemand;
313+
if (ex != null) {
314+
if (rsReadLogger.isTraceEnabled()) {
315+
String prefix = publisher.getLogPrefix();
316+
rsReadLogger.trace(prefix + "Completed with error before demand: " + ex);
317+
}
318+
publisher.state.get().onError(publisher, ex);
319+
}
320+
}
314321
}
315322
},
316323

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java

+20-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -282,17 +282,7 @@ public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor)
282282
}
283283
if (processor.changeState(this, REQUESTED)) {
284284
if (processor.subscriberCompleted) {
285-
if (processor.isFlushPending()) {
286-
// Ensure the final flush
287-
processor.changeState(REQUESTED, FLUSHING);
288-
processor.flushIfPossible();
289-
}
290-
else if (processor.changeState(REQUESTED, COMPLETED)) {
291-
processor.resultPublisher.publishComplete();
292-
}
293-
else {
294-
processor.state.get().onComplete(processor);
295-
}
285+
handleSubscriberCompleted(processor);
296286
}
297287
else {
298288
Assert.state(processor.subscription != null, "No subscription");
@@ -303,6 +293,24 @@ else if (processor.changeState(REQUESTED, COMPLETED)) {
303293
@Override
304294
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
305295
processor.subscriberCompleted = true;
296+
// A competing write might have completed very quickly
297+
if (processor.state.get().equals(State.REQUESTED)) {
298+
handleSubscriberCompleted(processor);
299+
}
300+
}
301+
302+
private <T> void handleSubscriberCompleted(AbstractListenerWriteFlushProcessor<T> processor) {
303+
if (processor.isFlushPending()) {
304+
// Ensure the final flush
305+
processor.changeState(State.REQUESTED, State.FLUSHING);
306+
processor.flushIfPossible();
307+
}
308+
else if (processor.changeState(State.REQUESTED, State.COMPLETED)) {
309+
processor.resultPublisher.publishComplete();
310+
}
311+
else {
312+
processor.state.get().onComplete(processor);
313+
}
306314
}
307315
},
308316

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -376,13 +376,21 @@ public <T> void onWritePossible(AbstractListenerWriteProcessor<T> processor) {
376376
@Override
377377
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
378378
processor.subscriberCompleted = true;
379+
// A competing write might have completed very quickly
380+
if (processor.state.get().equals(State.REQUESTED)) {
381+
processor.changeStateToComplete(State.REQUESTED);
382+
}
379383
}
380384
},
381385

382386
WRITING {
383387
@Override
384388
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
385389
processor.subscriberCompleted = true;
390+
// A competing write might have completed very quickly
391+
if (processor.state.get().equals(State.REQUESTED)) {
392+
processor.changeStateToComplete(State.REQUESTED);
393+
}
386394
}
387395
},
388396

0 commit comments

Comments
 (0)