Skip to content

Commit e26096d

Browse files
committed
1.x: AsyncSubject now supports backpressure
1 parent f7d23ae commit e26096d

File tree

3 files changed

+53
-6
lines changed

3 files changed

+53
-6
lines changed

src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import rx.exceptions.Exceptions;
2323
import rx.functions.Action1;
2424
import rx.internal.operators.NotificationLite;
25+
import rx.internal.producers.SingleProducer;
2526
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
2627

2728
/**
@@ -68,9 +69,13 @@ public static <T> AsyncSubject<T> create() {
6869
public void call(SubjectObserver<T> o) {
6970
Object v = state.getLatest();
7071
NotificationLite<T> nl = state.nl;
71-
o.accept(v, nl);
72-
if (v == null || (!nl.isCompleted(v) && !nl.isError(v))) {
72+
if (v == null || nl.isCompleted(v)) {
7373
o.onCompleted();
74+
} else
75+
if (nl.isError(v)) {
76+
o.onError(nl.getError(v));
77+
} else {
78+
o.actual.setProducer(new SingleProducer<T>(o.actual, nl.getValue(v)));
7479
}
7580
}
7681
};
@@ -97,8 +102,7 @@ public void onCompleted() {
97102
if (last == nl.completed()) {
98103
bo.onCompleted();
99104
} else {
100-
bo.onNext(nl.getValue(last));
101-
bo.onCompleted();
105+
bo.actual.setProducer(new SingleProducer<T>(bo.actual, nl.getValue(last)));
102106
}
103107
}
104108
}

src/main/java/rx/subjects/SubjectSubscriptionManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ public State remove(SubjectObserver o) {
203203
*/
204204
protected static final class SubjectObserver<T> implements Observer<T> {
205205
/** The actual Observer. */
206-
final Observer<? super T> actual;
206+
final Subscriber<? super T> actual;
207207
/** Was the emitFirst run? Guarded by this. */
208208
boolean first = true;
209209
/** Guarded by this. */
@@ -215,7 +215,7 @@ protected static final class SubjectObserver<T> implements Observer<T> {
215215
protected volatile boolean caughtUp;
216216
/** Indicate where the observer is at replaying. */
217217
private volatile Object index;
218-
public SubjectObserver(Observer<? super T> actual) {
218+
public SubjectObserver(Subscriber<? super T> actual) {
219219
this.actual = actual;
220220
}
221221
@Override

src/test/java/rx/subjects/AsyncSubjectTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,4 +430,47 @@ public void testAsyncSubjectValueError() {
430430
assertNull(async.getValue());
431431
assertFalse(async.hasValue());
432432
}
433+
434+
@Test
435+
public void backpressureOnline() {
436+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
437+
438+
AsyncSubject<Integer> subject = AsyncSubject.create();
439+
440+
subject.subscribe(ts);
441+
442+
subject.onNext(1);
443+
subject.onCompleted();
444+
445+
ts.assertNoValues();
446+
ts.assertNoErrors();
447+
ts.assertNotCompleted();
448+
449+
ts.requestMore(1);
450+
451+
ts.assertValue(1);
452+
ts.assertCompleted();
453+
ts.assertNoErrors();
454+
}
455+
456+
@Test
457+
public void backpressureOffline() {
458+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
459+
460+
AsyncSubject<Integer> subject = AsyncSubject.create();
461+
subject.onNext(1);
462+
subject.onCompleted();
463+
464+
subject.subscribe(ts);
465+
466+
ts.assertNoValues();
467+
ts.assertNoErrors();
468+
ts.assertNotCompleted();
469+
470+
ts.requestMore(1);
471+
472+
ts.assertValue(1);
473+
ts.assertCompleted();
474+
ts.assertNoErrors();
475+
}
433476
}

0 commit comments

Comments
 (0)