11/*
2- * Copyright 2002-2025 the original author or authors.
2+ * Copyright 2002-2024 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
2121import java .util .LinkedHashSet ;
2222import java .util .List ;
2323import java .util .Set ;
24- import java .util .concurrent .atomic .AtomicReference ;
24+ import java .util .concurrent .atomic .AtomicBoolean ;
2525import java .util .function .Consumer ;
2626
2727import org .springframework .http .MediaType ;
@@ -73,20 +73,21 @@ public class ResponseBodyEmitter {
7373 @ Nullable
7474 private Handler handler ;
7575
76- private final AtomicReference <State > state = new AtomicReference <>(State .START );
77-
7876 /** Store send data before handler is initialized. */
7977 private final Set <DataWithMediaType > earlySendAttempts = new LinkedHashSet <>(8 );
8078
79+ /** Store successful completion before the handler is initialized. */
80+ private final AtomicBoolean complete = new AtomicBoolean ();
81+
8182 /** Store an error before the handler is initialized. */
8283 @ Nullable
8384 private Throwable failure ;
8485
85- private final TimeoutCallback timeoutCallback = new TimeoutCallback ();
86+ private final DefaultCallback timeoutCallback = new DefaultCallback ();
8687
8788 private final ErrorCallback errorCallback = new ErrorCallback ();
8889
89- private final CompletionCallback completionCallback = new CompletionCallback ();
90+ private final DefaultCallback completionCallback = new DefaultCallback ();
9091
9192
9293 /**
@@ -127,7 +128,7 @@ synchronized void initialize(Handler handler) throws IOException {
127128 this .earlySendAttempts .clear ();
128129 }
129130
130- if (this .state .get () == State . COMPLETE ) {
131+ if (this .complete .get ()) {
131132 if (this .failure != null ) {
132133 this .handler .completeWithError (this .failure );
133134 }
@@ -143,7 +144,7 @@ synchronized void initialize(Handler handler) throws IOException {
143144 }
144145
145146 void initializeWithError (Throwable ex ) {
146- if (this .state .compareAndSet (State . START , State . COMPLETE )) {
147+ if (this .complete .compareAndSet (false , true )) {
147148 this .failure = ex ;
148149 this .earlySendAttempts .clear ();
149150 this .errorCallback .accept (ex );
@@ -185,7 +186,8 @@ public void send(Object object) throws IOException {
185186 * @throws java.lang.IllegalStateException wraps any other errors
186187 */
187188 public synchronized void send (Object object , @ Nullable MediaType mediaType ) throws IOException {
188- assertNotComplete ();
189+ Assert .state (!this .complete .get (), () -> "ResponseBodyEmitter has already completed" +
190+ (this .failure != null ? " with error: " + this .failure : "" ));
189191 if (this .handler != null ) {
190192 try {
191193 this .handler .send (object , mediaType );
@@ -212,13 +214,9 @@ public synchronized void send(Object object, @Nullable MediaType mediaType) thro
212214 * @since 6.0.12
213215 */
214216 public synchronized void send (Set <DataWithMediaType > items ) throws IOException {
215- assertNotComplete ();
216- sendInternal (items );
217- }
218-
219- private void assertNotComplete () {
220- Assert .state (this .state .get () == State .START , () -> "ResponseBodyEmitter has already completed" +
217+ Assert .state (!this .complete .get (), () -> "ResponseBodyEmitter has already completed" +
221218 (this .failure != null ? " with error: " + this .failure : "" ));
219+ sendInternal (items );
222220 }
223221
224222 private void sendInternal (Set <DataWithMediaType > items ) throws IOException {
@@ -250,7 +248,7 @@ private void sendInternal(Set<DataWithMediaType> items) throws IOException {
250248 * related events such as an error while {@link #send(Object) sending}.
251249 */
252250 public void complete () {
253- if (trySetComplete ( ) && this .handler != null ) {
251+ if (this . complete . compareAndSet ( false , true ) && this .handler != null ) {
254252 this .handler .complete ();
255253 }
256254 }
@@ -267,19 +265,14 @@ public void complete() {
267265 * {@link #send(Object) sending}.
268266 */
269267 public void completeWithError (Throwable ex ) {
270- if (trySetComplete ( )) {
268+ if (this . complete . compareAndSet ( false , true )) {
271269 this .failure = ex ;
272270 if (this .handler != null ) {
273271 this .handler .completeWithError (ex );
274272 }
275273 }
276274 }
277275
278- private boolean trySetComplete () {
279- return (this .state .compareAndSet (State .START , State .COMPLETE ) ||
280- (this .state .compareAndSet (State .TIMEOUT , State .COMPLETE )));
281- }
282-
283276 /**
284277 * Register code to invoke when the async request times out. This method is
285278 * called from a container thread when an async request times out.
@@ -376,7 +369,7 @@ public MediaType getMediaType() {
376369 }
377370
378371
379- private class TimeoutCallback implements Runnable {
372+ private class DefaultCallback implements Runnable {
380373
381374 private final List <Runnable > delegates = new ArrayList <>(1 );
382375
@@ -386,10 +379,9 @@ public synchronized void addDelegate(Runnable delegate) {
386379
387380 @ Override
388381 public void run () {
389- if (ResponseBodyEmitter .this .state .compareAndSet (State .START , State .TIMEOUT )) {
390- for (Runnable delegate : this .delegates ) {
391- delegate .run ();
392- }
382+ ResponseBodyEmitter .this .complete .compareAndSet (false , true );
383+ for (Runnable delegate : this .delegates ) {
384+ delegate .run ();
393385 }
394386 }
395387 }
@@ -405,51 +397,11 @@ public synchronized void addDelegate(Consumer<Throwable> callback) {
405397
406398 @ Override
407399 public void accept (Throwable t ) {
408- if (ResponseBodyEmitter .this .state .compareAndSet (State .START , State .COMPLETE )) {
409- for (Consumer <Throwable > delegate : this .delegates ) {
410- delegate .accept (t );
411- }
412- }
413- }
414- }
415-
416-
417- private class CompletionCallback implements Runnable {
418-
419- private final List <Runnable > delegates = new ArrayList <>(1 );
420-
421- public synchronized void addDelegate (Runnable delegate ) {
422- this .delegates .add (delegate );
423- }
424-
425- @ Override
426- public void run () {
427- if (ResponseBodyEmitter .this .state .compareAndSet (State .START , State .COMPLETE )) {
428- for (Runnable delegate : this .delegates ) {
429- delegate .run ();
430- }
400+ ResponseBodyEmitter .this .complete .compareAndSet (false , true );
401+ for (Consumer <Throwable > delegate : this .delegates ) {
402+ delegate .accept (t );
431403 }
432404 }
433405 }
434406
435-
436- /**
437- * Represents a state for {@link ResponseBodyEmitter}.
438- * <p><pre>
439- * START ----+
440- * | |
441- * v |
442- * TIMEOUT |
443- * | |
444- * v |
445- * COMPLETE <--+
446- * </pre>
447- * @since 6.2.4
448- */
449- private enum State {
450- START ,
451- TIMEOUT , // handling a timeout
452- COMPLETE
453- }
454-
455407}
0 commit comments