Skip to content

Commit 78c5708

Browse files
committed
Fixes based on @zsxwing's suggestions.
1 parent 04a9e05 commit 78c5708

File tree

3 files changed

+27
-100
lines changed

3 files changed

+27
-100
lines changed

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,10 +274,7 @@ public Iterable<T> next() {
274274
* If the underlying observable produces items faster than the Iterator.next() takes them
275275
* onNext events might be skipped, but onError or onCompleted events are not.
276276
* <p>
277-
* The difference between BlockingObservable.next() and BlockingObservable.latest() is that
278-
* the former does not overwrite untaken values whereas the latter does.
279-
* <p>
280-
* Note also that an onNext() directly followed by onCompleted() might hide the given onNext() event.
277+
* Note also that an onNext() directly followed by onCompleted() might hide the onNext() event.
281278
*
282279
* @return the Iterable sequence
283280
*/

rxjava-core/src/main/java/rx/operators/OperationLatest.java

Lines changed: 25 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
import java.util.Iterator;
1919
import java.util.NoSuchElementException;
2020
import java.util.concurrent.Semaphore;
21-
import java.util.concurrent.locks.Lock;
22-
import java.util.concurrent.locks.ReentrantLock;
21+
import java.util.concurrent.atomic.AtomicReference;
2322
import rx.Notification;
2423
import rx.Observable;
2524
import rx.Observer;
@@ -39,136 +38,67 @@ public static <T> Iterable<T> latest(final Observable<? extends T> source) {
3938
@Override
4039
public Iterator<T> iterator() {
4140
LatestObserverIterator<T> lio = new LatestObserverIterator<T>();
42-
source.subscribe(lio);
41+
source.materialize().subscribe(lio);
4342
return lio;
4443
}
4544
};
4645
}
4746

4847
/** Observer of source, iterator for output. */
49-
static final class LatestObserverIterator<T> implements Observer<T>, Iterator<T> {
50-
final Lock lock = new ReentrantLock();
48+
static final class LatestObserverIterator<T> implements Observer<Notification<? extends T>>, Iterator<T> {
5149
final Semaphore notify = new Semaphore(0);
52-
// observer's values
53-
boolean oHasValue;
54-
Notification.Kind oKind;
55-
T oValue;
56-
Throwable oError;
50+
// observer's notification
51+
final AtomicReference<Notification<? extends T>> reference = new AtomicReference<Notification<? extends T>>();
5752
@Override
58-
public void onNext(T args) {
59-
boolean wasntAvailable;
60-
lock.lock();
61-
try {
62-
wasntAvailable = !oHasValue;
63-
oHasValue = true;
64-
oValue = args;
65-
oKind = Notification.Kind.OnNext;
66-
} finally {
67-
lock.unlock();
68-
}
53+
public void onNext(Notification<? extends T> args) {
54+
boolean wasntAvailable = reference.getAndSet(args) == null;
6955
if (wasntAvailable) {
7056
notify.release();
7157
}
7258
}
7359

7460
@Override
7561
public void onError(Throwable e) {
76-
boolean wasntAvailable;
77-
lock.lock();
78-
try {
79-
wasntAvailable = !oHasValue;
80-
oHasValue = true;
81-
oValue = null;
82-
oError = e;
83-
oKind = Notification.Kind.OnError;
84-
} finally {
85-
lock.unlock();
86-
}
87-
if (wasntAvailable) {
88-
notify.release();
89-
}
62+
// not expected
9063
}
9164

9265
@Override
9366
public void onCompleted() {
94-
boolean wasntAvailable;
95-
lock.lock();
96-
try {
97-
wasntAvailable = !oHasValue;
98-
oHasValue = true;
99-
oValue = null;
100-
oKind = Notification.Kind.OnCompleted;
101-
} finally {
102-
lock.unlock();
103-
}
104-
if (wasntAvailable) {
105-
notify.release();
106-
}
67+
// not expected
10768
}
10869

109-
// iterator's values
110-
111-
boolean iDone;
112-
boolean iHasValue;
113-
T iValue;
114-
Throwable iError;
115-
Notification.Kind iKind;
116-
70+
// iterator's notification
71+
Notification<? extends T> iNotif;
11772
@Override
11873
public boolean hasNext() {
119-
if (iError != null) {
120-
Exceptions.propagate(iError);
74+
if (iNotif != null && iNotif.isOnError()) {
75+
throw Exceptions.propagate(iNotif.getThrowable());
12176
}
122-
if (!iDone) {
123-
if (!iHasValue) {
77+
if (iNotif == null || !iNotif.isOnCompleted()) {
78+
if (iNotif == null) {
12479
try {
12580
notify.acquire();
12681
} catch (InterruptedException ex) {
127-
iError = ex;
128-
iHasValue = true;
129-
iKind = Notification.Kind.OnError;
130-
return true;
82+
Thread.currentThread().interrupt();
83+
iNotif = new Notification<T>(ex);
84+
throw Exceptions.propagate(ex);
13185
}
13286

133-
lock.lock();
134-
try {
135-
iKind = oKind;
136-
switch (oKind) {
137-
case OnNext:
138-
iValue = oValue;
139-
oValue = null; // handover
140-
break;
141-
case OnError:
142-
iError = oError;
143-
oError = null; // handover
144-
if (iError != null) {
145-
Exceptions.propagate(iError);
146-
}
147-
break;
148-
case OnCompleted:
149-
iDone = true;
150-
break;
151-
}
152-
oHasValue = false;
153-
} finally {
154-
lock.unlock();
87+
iNotif = reference.getAndSet(null);
88+
if (iNotif.isOnError()) {
89+
throw Exceptions.propagate(iNotif.getThrowable());
15590
}
156-
iHasValue = true;
15791
}
15892
}
159-
return !iDone;
93+
return !iNotif.isOnCompleted();
16094
}
16195

16296
@Override
16397
public T next() {
164-
if (iKind == Notification.Kind.OnError) {
165-
Exceptions.propagate(iError);
166-
}
16798
if (hasNext()) {
168-
if (iKind == Notification.Kind.OnNext) {
169-
T v = iValue;
170-
iValue = null; // handover
171-
iHasValue = false;
99+
if (iNotif.isOnNext()) {
100+
T v = iNotif.getValue();
101+
iNotif = null;
172102
return v;
173103
}
174104
}

rxjava-core/src/test/java/rx/operators/OperationLatestTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void testSimpleJustNext() {
104104
Assert.assertEquals(Long.valueOf(i), it.next());
105105
}
106106
}
107-
@Test(timeout = 1000, expected = RuntimeException.class)
107+
@Test(/*timeout = 1000, */expected = RuntimeException.class)
108108
public void testHasNextThrows() {
109109
TestScheduler scheduler = new TestScheduler();
110110

0 commit comments

Comments
 (0)