Skip to content

Commit f051755

Browse files
committed
Refine behavior on error after response committed
If the response is set and we can't change the status through ServerHttpResponse any more, allow the error signal to propagate and let the individual server adapters handle it. Ultimately that should result in closing the connection. On Servlet containers, we check one last time if the response is committed (we may not have filled the buffer). If not then save the exception as a request attribute, dispatch, and re-throw it on the container thread. On Undertow access the connection and close it. On Netty just let the error through to Reactor Netty. Issue: SPR-16051
1 parent 9d42184 commit f051755

File tree

6 files changed

+127
-54
lines changed

6 files changed

+127
-54
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,8 @@ public Mono<Void> apply(HttpServerRequest request, HttpServerResponse response)
7272
}
7373

7474
return this.httpHandler.handle(adaptedRequest, adaptedResponse)
75-
.onErrorResume(ex -> {
76-
logger.error("Could not complete request", ex);
77-
response.status(HttpResponseStatus.INTERNAL_SERVER_ERROR);
78-
return Mono.empty();
79-
})
80-
.doOnSuccess(aVoid -> logger.debug("Successfully completed request"));
75+
.doOnError(ex -> logger.error("Handling completed with error", ex))
76+
.doOnSuccess(aVoid -> logger.debug("Handling completed with success"));
8177
}
8278

8379
}

spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java

Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818

1919
import java.io.IOException;
2020
import java.util.Collection;
21+
import java.util.concurrent.atomic.AtomicBoolean;
2122
import javax.servlet.AsyncContext;
2223
import javax.servlet.AsyncEvent;
2324
import javax.servlet.AsyncListener;
25+
import javax.servlet.DispatcherType;
2426
import javax.servlet.Servlet;
2527
import javax.servlet.ServletConfig;
28+
import javax.servlet.ServletException;
2629
import javax.servlet.ServletRegistration;
2730
import javax.servlet.ServletRequest;
2831
import javax.servlet.ServletResponse;
@@ -41,6 +44,7 @@
4144
import org.springframework.http.HttpMethod;
4245
import org.springframework.lang.Nullable;
4346
import org.springframework.util.Assert;
47+
import org.springframework.web.util.NestedServletException;
4448

4549
/**
4650
* Adapt {@link HttpHandler} to an {@link HttpServlet} using Servlet Async
@@ -56,9 +60,10 @@ public class ServletHttpHandlerAdapter implements Servlet {
5660

5761
private static final Log logger = LogFactory.getLog(ServletHttpHandlerAdapter.class);
5862

59-
6063
private static final int DEFAULT_BUFFER_SIZE = 8192;
6164

65+
private static final String WRITE_ERROR_ATTRIBUTE_NAME = ServletHttpHandlerAdapter.class.getName() + ".ERROR";
66+
6267

6368
private final HttpHandler httpHandler;
6469

@@ -151,7 +156,14 @@ private String getServletPath(ServletConfig config) {
151156

152157

153158
@Override
154-
public void service(ServletRequest request, ServletResponse response) throws IOException {
159+
public void service(ServletRequest request, ServletResponse response) throws ServletException, IOException {
160+
161+
if (DispatcherType.ASYNC.equals(request.getDispatcherType())) {
162+
Throwable ex = (Throwable) request.getAttribute(WRITE_ERROR_ATTRIBUTE_NAME);
163+
Assert.notNull(ex, "Unexpected async dispatch");
164+
throw new NestedServletException("Write publisher error", ex);
165+
}
166+
155167
// Start async before Read/WriteListener registration
156168
AsyncContext asyncContext = request.startAsync();
157169
asyncContext.setTimeout(-1);
@@ -163,9 +175,11 @@ public void service(ServletRequest request, ServletResponse response) throws IOE
163175
httpResponse = new HttpHeadResponseDecorator(httpResponse);
164176
}
165177

166-
asyncContext.addListener(ERROR_LISTENER);
178+
AtomicBoolean isCompleted = new AtomicBoolean();
179+
HandlerResultAsyncListener listener = new HandlerResultAsyncListener(isCompleted);
180+
asyncContext.addListener(listener);
167181

168-
HandlerResultSubscriber subscriber = new HandlerResultSubscriber(asyncContext);
182+
HandlerResultSubscriber subscriber = new HandlerResultSubscriber(asyncContext, isCompleted);
169183
this.httpHandler.handle(httpRequest, httpResponse).subscribe(subscriber);
170184
}
171185

@@ -199,9 +213,9 @@ public void destroy() {
199213
* We cannot combine ERROR_LISTENER and HandlerResultSubscriber due to:
200214
* https://issues.jboss.org/browse/WFLY-8515
201215
*/
202-
private static void runIfAsyncNotComplete(AsyncContext asyncContext, Runnable task) {
216+
private static void runIfAsyncNotComplete(AsyncContext asyncContext, AtomicBoolean isCompleted, Runnable task) {
203217
try {
204-
if (asyncContext.getRequest().isAsyncStarted()) {
218+
if (asyncContext.getRequest().isAsyncStarted() && isCompleted.compareAndSet(false, true)) {
205219
task.run();
206220
}
207221
}
@@ -212,18 +226,27 @@ private static void runIfAsyncNotComplete(AsyncContext asyncContext, Runnable ta
212226
}
213227

214228

215-
private final static AsyncListener ERROR_LISTENER = new AsyncListener() {
229+
private static class HandlerResultAsyncListener implements AsyncListener {
230+
231+
private final AtomicBoolean isCompleted;
232+
233+
234+
public HandlerResultAsyncListener(AtomicBoolean isCompleted) {
235+
this.isCompleted = isCompleted;
236+
}
216237

217238
@Override
218239
public void onTimeout(AsyncEvent event) {
240+
logger.debug("Timeout notification from Servlet container");
219241
AsyncContext context = event.getAsyncContext();
220-
runIfAsyncNotComplete(context, context::complete);
242+
runIfAsyncNotComplete(context, this.isCompleted, context::complete);
221243
}
222244

223245
@Override
224246
public void onError(AsyncEvent event) {
247+
logger.debug("Error notification from Servlet container");
225248
AsyncContext context = event.getAsyncContext();
226-
runIfAsyncNotComplete(context, context::complete);
249+
runIfAsyncNotComplete(context, this.isCompleted, context::complete);
227250
}
228251

229252
@Override
@@ -242,8 +265,12 @@ private class HandlerResultSubscriber implements Subscriber<Void> {
242265

243266
private final AsyncContext asyncContext;
244267

245-
public HandlerResultSubscriber(AsyncContext asyncContext) {
268+
private final AtomicBoolean isCompleted;
269+
270+
271+
public HandlerResultSubscriber(AsyncContext asyncContext, AtomicBoolean isCompleted) {
246272
this.asyncContext = asyncContext;
273+
this.isCompleted = isCompleted;
247274
}
248275

249276
@Override
@@ -258,20 +285,30 @@ public void onNext(Void aVoid) {
258285

259286
@Override
260287
public void onError(Throwable ex) {
261-
runIfAsyncNotComplete(this.asyncContext, () -> {
262-
logger.error("Could not complete request", ex);
263-
HttpServletResponse response = (HttpServletResponse) this.asyncContext.getResponse();
264-
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
265-
this.asyncContext.complete();
288+
logger.error("Handling completed with error", ex);
289+
runIfAsyncNotComplete(this.asyncContext, this.isCompleted, () -> {
290+
if (this.asyncContext.getResponse().isCommitted()) {
291+
logger.debug("Dispatching into container to raise error");
292+
this.asyncContext.getRequest().setAttribute(WRITE_ERROR_ATTRIBUTE_NAME, ex);
293+
this.asyncContext.dispatch();
294+
}
295+
else {
296+
try {
297+
logger.debug("Setting response status code to 500");
298+
this.asyncContext.getResponse().resetBuffer();
299+
((HttpServletResponse) this.asyncContext.getResponse()).setStatus(500);
300+
}
301+
finally {
302+
this.asyncContext.complete();
303+
}
304+
}
266305
});
267306
}
268307

269308
@Override
270309
public void onComplete() {
271-
runIfAsyncNotComplete(this.asyncContext, () -> {
272-
logger.debug("Successfully completed request");
273-
this.asyncContext.complete();
274-
});
310+
logger.debug("Handling completed with success");
311+
runIfAsyncNotComplete(this.asyncContext, this.isCompleted, this.asyncContext::complete);
275312
}
276313
}
277314

spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.http.server.reactive;
1818

19+
import java.io.IOException;
20+
1921
import io.undertow.server.HttpServerExchange;
2022

2123
import org.apache.commons.logging.Log;
@@ -98,16 +100,26 @@ public void onNext(Void aVoid) {
98100

99101
@Override
100102
public void onError(Throwable ex) {
101-
logger.error("Could not complete request", ex);
102-
if (!this.exchange.isResponseStarted() && this.exchange.getStatusCode() < 500) {
103+
logger.error("Handling completed with error", ex);
104+
if (this.exchange.isResponseStarted()) {
105+
try {
106+
logger.debug("Closing connection");
107+
this.exchange.getConnection().close();
108+
}
109+
catch (IOException e) {
110+
// Ignore
111+
}
112+
}
113+
else {
114+
logger.debug("Setting response status code to 500");
103115
this.exchange.setStatusCode(500);
116+
this.exchange.endExchange();
104117
}
105-
this.exchange.endExchange();
106118
}
107119

108120
@Override
109121
public void onComplete() {
110-
logger.debug("Successfully completed request");
122+
logger.debug("Handling completed with success");
111123
this.exchange.endExchange();
112124
}
113125
}

spring-web/src/main/java/org/springframework/web/server/adapter/HttpWebHandlerAdapter.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,7 @@ public LocaleContextResolver getLocaleContextResolver() {
157157
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
158158
ServerWebExchange exchange = createExchange(request, response);
159159
return getDelegate().handle(exchange)
160-
.onErrorResume(ex -> {
161-
handleFailure(response, ex);
162-
return Mono.empty();
163-
})
160+
.onErrorResume(ex -> handleFailure(response, ex))
164161
.then(Mono.defer(response::setComplete));
165162
}
166163

@@ -169,8 +166,7 @@ protected ServerWebExchange createExchange(ServerHttpRequest request, ServerHttp
169166
getCodecConfigurer(), getLocaleContextResolver());
170167
}
171168

172-
private void handleFailure(ServerHttpResponse response, Throwable ex) {
173-
boolean statusCodeChanged = response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
169+
private Mono<Void> handleFailure(ServerHttpResponse response, Throwable ex) {
174170
if (isDisconnectedClientError(ex)) {
175171
if (disconnectedClientLogger.isTraceEnabled()) {
176172
disconnectedClientLogger.trace("Looks like the client has gone away", ex);
@@ -180,14 +176,16 @@ else if (disconnectedClientLogger.isDebugEnabled()) {
180176
" (For a full stack trace, set the log category '" + DISCONNECTED_CLIENT_LOG_CATEGORY +
181177
"' to TRACE level.)");
182178
}
179+
return Mono.empty();
183180
}
184-
else if (!statusCodeChanged) {
185-
logger.error("Unhandled failure: " + ex.getMessage() + ", " +
186-
"response already committed with status=" + response.getStatusCode());
187-
}
188-
else {
181+
if (response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR)) {
189182
logger.error("Failed to handle request", ex);
183+
return Mono.empty();
190184
}
185+
// After the response is committed, propagate errors to the server..
186+
HttpStatus status = response.getStatusCode();
187+
logger.error("Unhandled failure: " + ex.getMessage() + ", response already set (status=" + status + ")");
188+
return Mono.error(ex);
191189
}
192190

193191
private boolean isDisconnectedClientError(Throwable ex) {

spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingExceptionHandlingIntegrationTests.java

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
import java.io.IOException;
2020

21+
import org.junit.Assume;
2122
import org.junit.Test;
2223
import org.reactivestreams.Publisher;
24+
import reactor.core.publisher.Flux;
2325
import reactor.core.publisher.Mono;
2426

2527
import org.springframework.context.ApplicationContext;
@@ -28,6 +30,7 @@
2830
import org.springframework.context.annotation.Configuration;
2931
import org.springframework.http.HttpHeaders;
3032
import org.springframework.http.ResponseEntity;
33+
import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer;
3134
import org.springframework.web.bind.annotation.ExceptionHandler;
3235
import org.springframework.web.bind.annotation.GetMapping;
3336
import org.springframework.web.bind.annotation.RestController;
@@ -53,28 +56,44 @@ protected ApplicationContext initApplicationContext() {
5356

5457

5558
@Test
56-
public void controllerThrowingException() throws Exception {
57-
String expected = "Recovered from error: State";
58-
assertEquals(expected, performGet("/thrown-exception", new HttpHeaders(), String.class).getBody());
59+
public void thrownException() throws Exception {
60+
doTest("/thrown-exception", "Recovered from error: State");
5961
}
6062

6163
@Test
62-
public void controllerThrowingExceptionWithCause() throws Exception {
63-
String expected = "Recovered from error: State";
64-
assertEquals(expected, performGet("/thrown-exception-with-cause", new HttpHeaders(), String.class).getBody());
64+
public void thrownExceptionWithCause() throws Exception {
65+
doTest("/thrown-exception-with-cause", "Recovered from error: State");
6566
}
6667

6768
@Test
68-
public void controllerThrowingExceptionWithCauseToHandle() throws Exception {
69-
String expected = "Recovered from error: IO";
70-
String url = "/thrown-exception-with-cause-to-handle";
71-
assertEquals(expected, performGet(url, new HttpHeaders(), String.class).getBody());
69+
public void thrownExceptionWithCauseToHandle() throws Exception {
70+
doTest("/thrown-exception-with-cause-to-handle", "Recovered from error: IO");
7271
}
7372

7473
@Test
75-
public void controllerReturnsMonoError() throws Exception {
76-
String expected = "Recovered from error: Argument";
77-
assertEquals(expected, performGet("/mono-error", new HttpHeaders(), String.class).getBody());
74+
public void errorBeforeFirstItem() throws Exception {
75+
doTest("/mono-error", "Recovered from error: Argument");
76+
}
77+
78+
@Test // SPR-16051
79+
public void exceptionAfterSeveralItems() throws Exception {
80+
81+
// TODO: uncomment and try after https://github.com/reactor/reactor-netty/issues/231
82+
Assume.assumeFalse(server instanceof ReactorHttpServer);
83+
84+
try {
85+
performGet("/SPR-16051", new HttpHeaders(), String.class).getBody();
86+
fail();
87+
}
88+
catch (Throwable ex) {
89+
String message = ex.getMessage();
90+
assertNotNull(message);
91+
assertTrue("Actual: " + message, message.startsWith("Error while extracting response"));
92+
}
93+
}
94+
95+
private void doTest(String url, String expected) throws Exception {
96+
assertEquals(expected, performGet(url, new HttpHeaders(), String.class).getBody());
7897
}
7998

8099

@@ -110,6 +129,18 @@ public Publisher<String> handleWithError() {
110129
return Mono.error(new IllegalArgumentException("Argument"));
111130
}
112131

132+
@GetMapping("/SPR-16051")
133+
public Flux<String> errors() {
134+
return Flux.range(1, 10000)
135+
.map(i -> {
136+
if (i == 1000) {
137+
throw new RuntimeException("Random error");
138+
}
139+
return i + ". foo bar";
140+
});
141+
}
142+
143+
113144
@ExceptionHandler
114145
public Publisher<String> handleArgumentException(IOException ex) {
115146
return Mono.just("Recovered from error: " + ex.getMessage());

spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractWebSocketIntegrationTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.LinkedHashMap;
2424
import java.util.Map;
2525

26-
import org.apache.tomcat.websocket.WsWebSocketContainer;
2726
import org.apache.tomcat.websocket.server.WsContextListener;
2827
import org.junit.After;
2928
import org.junit.Before;

0 commit comments

Comments
 (0)