File tree Expand file tree Collapse file tree 1 file changed +4
-1
lines changed
src/main/java/rx/internal/operators Expand file tree Collapse file tree 1 file changed +4
-1
lines changed Original file line number Diff line number Diff line change @@ -240,6 +240,8 @@ boolean checkTerminated(Object term, boolean empty) {
240240 if (term != null ) {
241241 if (nl .isCompleted (term )) {
242242 if (empty ) {
243+ // this will prevent OnSubscribe spinning on a terminated but not yet unsubscribed PublishSubscriber
244+ current .compareAndSet (this , null );
243245 try {
244246 for (InnerProducer <?> ip : producers .getAndSet (TERMINATED )) {
245247 ip .child .onCompleted ();
@@ -251,6 +253,8 @@ boolean checkTerminated(Object term, boolean empty) {
251253 }
252254 } else {
253255 Throwable t = nl .getError (term );
256+ // this will prevent OnSubscribe spinning on a terminated but not yet unsubscribed PublishSubscriber
257+ current .compareAndSet (this , null );
254258 try {
255259 for (InnerProducer <?> ip : producers .getAndSet (TERMINATED )) {
256260 ip .child .onError (t );
@@ -265,7 +269,6 @@ boolean checkTerminated(Object term, boolean empty) {
265269 }
266270
267271 void dispatch () {
268- // TODO
269272 synchronized (this ) {
270273 if (emitting ) {
271274 missed = true ;
You can’t perform that action at this time.
0 commit comments