From 8e8ce8eb3d27f9849828163af7b91d616c2b0177 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 9 Nov 2018 10:01:00 +0100 Subject: [PATCH 1/2] 2.x: Fix refCount eager disconnect not resetting the connection --- .../operators/flowable/FlowableRefCount.java | 13 ++++++++++++- .../operators/observable/ObservableRefCount.java | 14 +++++++++++++- .../operators/flowable/FlowableRefCountTest.java | 16 ++++++++++++++++ .../observable/ObservableRefCountTest.java | 15 +++++++++++++++ 4 files changed, 56 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java index f966f01365..bc11aa5425 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java @@ -141,7 +141,11 @@ void timeout(RefConnection rc) { if (source instanceof Disposable) { ((Disposable)source).dispose(); } else if (source instanceof ResettableConnectable) { - ((ResettableConnectable)source).resetIf(connectionObject); + if (connectionObject == null) { + rc.disconnectedEarly = true; + } else { + ((ResettableConnectable)source).resetIf(connectionObject); + } } } } @@ -160,6 +164,8 @@ static final class RefConnection extends AtomicReference boolean connected; + boolean disconnectedEarly; + RefConnection(FlowableRefCount parent) { this.parent = parent; } @@ -172,6 +178,11 @@ public void run() { @Override public void accept(Disposable t) throws Exception { DisposableHelper.replace(this, t); + synchronized (parent) { + if (disconnectedEarly) { + ((ResettableConnectable)parent.source).resetIf(t); + } + } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java index 3dced24de6..5abc174350 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java @@ -135,10 +135,15 @@ void timeout(RefConnection rc) { connection = null; Disposable connectionObject = rc.get(); DisposableHelper.dispose(rc); + if (source instanceof Disposable) { ((Disposable)source).dispose(); } else if (source instanceof ResettableConnectable) { - ((ResettableConnectable)source).resetIf(connectionObject); + if (connectionObject == null) { + rc.disconnectedEarly = true; + } else { + ((ResettableConnectable)source).resetIf(connectionObject); + } } } } @@ -157,6 +162,8 @@ static final class RefConnection extends AtomicReference boolean connected; + boolean disconnectedEarly; + RefConnection(ObservableRefCount parent) { this.parent = parent; } @@ -169,6 +176,11 @@ public void run() { @Override public void accept(Disposable t) throws Exception { DisposableHelper.replace(this, t); + synchronized (parent) { + if (disconnectedEarly) { + ((ResettableConnectable)parent.source).resetIf(t); + } + } } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java index 289170b254..a91b1179cc 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java @@ -1394,4 +1394,20 @@ public void timeoutDisposesSource() { assertTrue(((Disposable)o.source).isDisposed()); } + + @Test + public void disconnectBeforeConnect() { + BehaviorProcessor processor = BehaviorProcessor.create(); + + Flowable flowable = processor + .replay(1) + .refCount(); + + // This line causes the test to fail. + flowable.takeUntil(Flowable.just(1)).test(); + + processor.onNext(2); + + flowable.take(1).test().assertResult(2); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java index ea69a1d500..0f0d930d8d 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java @@ -1345,4 +1345,19 @@ public void timeoutDisposesSource() { assertTrue(((Disposable)o.source).isDisposed()); } + + @Test + public void disconnectBeforeConnect() { + BehaviorSubject subject = BehaviorSubject.create(); + + Observable observable = subject + .replay(1) + .refCount(); + + observable.takeUntil(Observable.just(1)).test(); + + subject.onNext(2); + + observable.take(1).test().assertResult(2); + } } From f60c09fb0a186e2b64294eac0f3d456df5e3fe3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Sat, 10 Nov 2018 10:07:02 +0100 Subject: [PATCH 2/2] Remove unnecessary comment. --- .../internal/operators/flowable/FlowableRefCountTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java index a91b1179cc..673a0f4add 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java @@ -1403,7 +1403,6 @@ public void disconnectBeforeConnect() { .replay(1) .refCount(); - // This line causes the test to fail. flowable.takeUntil(Flowable.just(1)).test(); processor.onNext(2);