Skip to content

Commit 3bcf4e0

Browse files
Merge pull request #600 from benjchristensen/replay-subject
BugFix: Replay Subject
2 parents 61f0580 + fcb7657 commit 3bcf4e0

File tree

2 files changed

+94
-4
lines changed

2 files changed

+94
-4
lines changed

rxjava-core/src/main/java/rx/subjects/ReplaySubject.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,9 @@ public void onError(Throwable e)
164164
public void onNext(T args)
165165
{
166166
synchronized (subscriptions) {
167+
if (isDone) {
168+
return;
169+
}
167170
history.add(args);
168171
for (Observer<? super T> observer : new ArrayList<Observer<? super T>>(subscriptions.values())) {
169172
observer.onNext(args);

rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -56,6 +56,93 @@ public void testCompleted() {
5656
assertCompletedObserver(o2);
5757
}
5858

59+
@Test
60+
public void testCompletedStopsEmittingData() {
61+
ReplaySubject<Integer> channel = ReplaySubject.create();
62+
@SuppressWarnings("unchecked")
63+
Observer<Object> observerA = mock(Observer.class);
64+
@SuppressWarnings("unchecked")
65+
Observer<Object> observerB = mock(Observer.class);
66+
@SuppressWarnings("unchecked")
67+
Observer<Object> observerC = mock(Observer.class);
68+
@SuppressWarnings("unchecked")
69+
Observer<Object> observerD = mock(Observer.class);
70+
71+
Subscription a = channel.subscribe(observerA);
72+
Subscription b = channel.subscribe(observerB);
73+
74+
InOrder inOrderA = inOrder(observerA);
75+
InOrder inOrderB = inOrder(observerB);
76+
InOrder inOrderC = inOrder(observerC);
77+
InOrder inOrderD = inOrder(observerD);
78+
79+
channel.onNext(42);
80+
81+
// both A and B should have received 42 from before subscription
82+
inOrderA.verify(observerA).onNext(42);
83+
inOrderB.verify(observerB).onNext(42);
84+
85+
a.unsubscribe();
86+
87+
// a should receive no more
88+
inOrderA.verifyNoMoreInteractions();
89+
90+
channel.onNext(4711);
91+
92+
// only be should receive 4711 at this point
93+
inOrderB.verify(observerB).onNext(4711);
94+
95+
channel.onCompleted();
96+
97+
// B is subscribed so should receive onCompleted
98+
inOrderB.verify(observerB).onCompleted();
99+
100+
Subscription c = channel.subscribe(observerC);
101+
102+
// when C subscribes it should receive 42, 4711, onCompleted
103+
inOrderC.verify(observerC).onNext(42);
104+
inOrderC.verify(observerC).onNext(4711);
105+
inOrderC.verify(observerC).onCompleted();
106+
107+
// if further events are propagated they should be ignored
108+
channel.onNext(13);
109+
channel.onNext(14);
110+
channel.onNext(15);
111+
channel.onError(new RuntimeException());
112+
113+
// a new subscription should only receive what was emitted prior to terminal state onCompleted
114+
Subscription d = channel.subscribe(observerD);
115+
116+
inOrderD.verify(observerD).onNext(42);
117+
inOrderD.verify(observerD).onNext(4711);
118+
inOrderD.verify(observerD).onCompleted();
119+
120+
Mockito.verifyNoMoreInteractions(observerA);
121+
Mockito.verifyNoMoreInteractions(observerB);
122+
Mockito.verifyNoMoreInteractions(observerC);
123+
Mockito.verifyNoMoreInteractions(observerD);
124+
125+
}
126+
127+
@Test
128+
public void testCompletedAfterError() {
129+
ReplaySubject<String> subject = ReplaySubject.create();
130+
131+
@SuppressWarnings("unchecked")
132+
Observer<String> aObserver = mock(Observer.class);
133+
134+
subject.onNext("one");
135+
subject.onError(testException);
136+
subject.onNext("two");
137+
subject.onCompleted();
138+
subject.onError(new RuntimeException());
139+
140+
subject.subscribe(aObserver);
141+
verify(aObserver, times(1)).onNext("one");
142+
verify(aObserver, times(1)).onError(testException);
143+
verifyNoMoreInteractions(aObserver);
144+
}
145+
59146
private void assertCompletedObserver(Observer<String> aObserver) {
60147
InOrder inOrder = inOrder(aObserver);
61148

0 commit comments

Comments
 (0)