Skip to content

1.x: DelaySubscription with a plain other Observable. #3447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4195,6 +4195,32 @@ public final <U> Observable<T> delaySubscription(Func0<? extends Observable<U>>
return create(new OnSubscribeDelaySubscriptionWithSelector<T, U>(this, subscriptionDelay));
}

/**
* Returns an Observable that delays the subscription to this Observable
* until the other Observable emits an element or completes normally.
* <p>
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator forwards the backpressure requests to this Observable once
* the subscription happens and requests Long.MAX_VALUE from the other Observable</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U> the value type of the other Observable, irrelevant
* @param other the other Observable that should trigger the subscription
* to this Observable.
* @return an Observable that delays the subscription to this Observable
* until the other Observable emits an element or completes normally.
*/
@Experimental
public final <U> Observable<T> delaySubscription(Observable<U> other) {
if (other == null) {
throw new NullPointerException();
}
return create(new OnSubscribeDelaySubscriptionOther<T, U>(this, other));
}

/**
* Returns an Observable that reverses the effect of {@link #materialize materialize} by transforming the
* {@link Notification} objects emitted by the source Observable into the items or notifications they
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package rx.internal.operators;

import rx.*;
import rx.Observable.OnSubscribe;
import rx.observers.Subscribers;
import rx.plugins.*;
import rx.subscriptions.SerialSubscription;

/**
* Delays the subscription to the main source until the other
* observable fires an event or completes.
* @param <T> the main type
* @param <U> the other value type, ignored
*/
public final class OnSubscribeDelaySubscriptionOther<T, U> implements OnSubscribe<T> {
final Observable<? extends T> main;
final Observable<U> other;

public OnSubscribeDelaySubscriptionOther(Observable<? extends T> main, Observable<U> other) {
this.main = main;
this.other = other;
}

@Override
public void call(Subscriber<? super T> t) {
final Subscriber<T> child = Subscribers.wrap(t);

final SerialSubscription serial = new SerialSubscription();

Subscriber<U> otherSubscriber = new Subscriber<U>() {
boolean done;
@Override
public void onNext(U t) {
onCompleted();
}

@Override
public void onError(Throwable e) {
if (done) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
return;
}
done = true;
child.onError(e);
}

@Override
public void onCompleted() {
if (done) {
return;
}
done = true;
serial.set(child);

main.unsafeSubscribe(child);
}
};

serial.set(otherSubscriber);

other.unsafeSubscribe(otherSubscriber);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package rx.internal.operators;

import java.util.concurrent.atomic.AtomicInteger;

import org.junit.*;

import rx.Observable;
import rx.exceptions.TestException;
import rx.functions.Action0;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;

public class OnSubscribeDelaySubscriptionOtherTest {
@Test
public void testNoPrematureSubscription() {
PublishSubject<Object> other = PublishSubject.create();

TestSubscriber<Integer> ts = TestSubscriber.create();

final AtomicInteger subscribed = new AtomicInteger();

Observable.just(1)
.doOnSubscribe(new Action0() {
@Override
public void call() {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(ts);

ts.assertNotCompleted();
ts.assertNoErrors();
ts.assertNoValues();

Assert.assertEquals("Premature subscription", 0, subscribed.get());

other.onNext(1);

Assert.assertEquals("No subscription", 1, subscribed.get());

ts.assertValue(1);
ts.assertNoErrors();
ts.assertCompleted();
}

@Test
public void testNoMultipleSubscriptions() {
PublishSubject<Object> other = PublishSubject.create();

TestSubscriber<Integer> ts = TestSubscriber.create();

final AtomicInteger subscribed = new AtomicInteger();

Observable.just(1)
.doOnSubscribe(new Action0() {
@Override
public void call() {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(ts);

ts.assertNotCompleted();
ts.assertNoErrors();
ts.assertNoValues();

Assert.assertEquals("Premature subscription", 0, subscribed.get());

other.onNext(1);
other.onNext(2);

Assert.assertEquals("No subscription", 1, subscribed.get());

ts.assertValue(1);
ts.assertNoErrors();
ts.assertCompleted();
}

@Test
public void testCompleteTriggersSubscription() {
PublishSubject<Object> other = PublishSubject.create();

TestSubscriber<Integer> ts = TestSubscriber.create();

final AtomicInteger subscribed = new AtomicInteger();

Observable.just(1)
.doOnSubscribe(new Action0() {
@Override
public void call() {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(ts);

ts.assertNotCompleted();
ts.assertNoErrors();
ts.assertNoValues();

Assert.assertEquals("Premature subscription", 0, subscribed.get());

other.onCompleted();

Assert.assertEquals("No subscription", 1, subscribed.get());

ts.assertValue(1);
ts.assertNoErrors();
ts.assertCompleted();
}

@Test
public void testNoPrematureSubscriptionToError() {
PublishSubject<Object> other = PublishSubject.create();

TestSubscriber<Integer> ts = TestSubscriber.create();

final AtomicInteger subscribed = new AtomicInteger();

Observable.<Integer>error(new TestException())
.doOnSubscribe(new Action0() {
@Override
public void call() {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(ts);

ts.assertNotCompleted();
ts.assertNoErrors();
ts.assertNoValues();

Assert.assertEquals("Premature subscription", 0, subscribed.get());

other.onCompleted();

Assert.assertEquals("No subscription", 1, subscribed.get());

ts.assertNoValues();
ts.assertNotCompleted();
ts.assertError(TestException.class);
}

@Test
public void testNoSubscriptionIfOtherErrors() {
PublishSubject<Object> other = PublishSubject.create();

TestSubscriber<Integer> ts = TestSubscriber.create();

final AtomicInteger subscribed = new AtomicInteger();

Observable.<Integer>error(new TestException())
.doOnSubscribe(new Action0() {
@Override
public void call() {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(ts);

ts.assertNotCompleted();
ts.assertNoErrors();
ts.assertNoValues();

Assert.assertEquals("Premature subscription", 0, subscribed.get());

other.onError(new TestException());

Assert.assertEquals("Premature subscription", 0, subscribed.get());

ts.assertNoValues();
ts.assertNotCompleted();
ts.assertError(TestException.class);
}

@Test
public void testBackpressurePassesThrough() {

PublishSubject<Object> other = PublishSubject.create();

TestSubscriber<Integer> ts = TestSubscriber.create(0L);

final AtomicInteger subscribed = new AtomicInteger();

Observable.just(1, 2, 3, 4, 5)
.doOnSubscribe(new Action0() {
@Override
public void call() {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(ts);

ts.assertNotCompleted();
ts.assertNoErrors();
ts.assertNoValues();

Assert.assertEquals("Premature subscription", 0, subscribed.get());

other.onNext(1);

Assert.assertEquals("No subscription", 1, subscribed.get());

Assert.assertFalse("Not unsubscribed from other", other.hasObservers());

ts.assertNotCompleted();
ts.assertNoErrors();
ts.assertNoValues();

ts.requestMore(1);
ts.assertValue(1);
ts.assertNoErrors();
ts.assertNotCompleted();

ts.requestMore(2);
ts.assertValues(1, 2, 3);
ts.assertNoErrors();
ts.assertNotCompleted();

ts.requestMore(10);
ts.assertValues(1, 2, 3, 4, 5);
ts.assertNoErrors();
ts.assertCompleted();
}
}