Skip to content

1.x: reduce stack depth with switchIfEmpty #5125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5481,7 +5481,7 @@ public final Observable<T> switchIfEmpty(Observable<? extends T> alternate) {
if (alternate == null) {
throw new NullPointerException("alternate is null");
}
return lift(new OperatorSwitchIfEmpty<T>(alternate));
return unsafeCreate(new OnSubscribeSwitchIfEmpty<T>(this, alternate));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package rx.internal.operators;


import java.util.concurrent.atomic.AtomicInteger;

import rx.*;
import rx.internal.producers.ProducerArbiter;
import rx.subscriptions.SerialSubscription;
Expand All @@ -26,22 +28,28 @@
* empty, the results of the given Observable will be emitted.
* @param <T> the value type
*/
public final class OperatorSwitchIfEmpty<T> implements Observable.Operator<T, T> {
private final Observable<? extends T> alternate;
public final class OnSubscribeSwitchIfEmpty<T> implements Observable.OnSubscribe<T> {

final Observable<? extends T> source;

public OperatorSwitchIfEmpty(Observable<? extends T> alternate) {
final Observable<? extends T> alternate;

public OnSubscribeSwitchIfEmpty(Observable<? extends T> source, Observable<? extends T> alternate) {
this.source = source;
this.alternate = alternate;
}

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
public void call(Subscriber<? super T> child) {
final SerialSubscription serial = new SerialSubscription();
ProducerArbiter arbiter = new ProducerArbiter();
final ParentSubscriber<T> parent = new ParentSubscriber<T>(child, serial, arbiter, alternate);

serial.set(parent);
child.add(serial);
child.setProducer(arbiter);
return parent;

parent.subscribe(source);
}

static final class ParentSubscriber<T> extends Subscriber<T> {
Expand All @@ -52,11 +60,15 @@ static final class ParentSubscriber<T> extends Subscriber<T> {
private final ProducerArbiter arbiter;
private final Observable<? extends T> alternate;

final AtomicInteger wip;
volatile boolean active;

ParentSubscriber(Subscriber<? super T> child, final SerialSubscription serial, ProducerArbiter arbiter, Observable<? extends T> alternate) {
this.child = child;
this.serial = serial;
this.arbiter = arbiter;
this.alternate = alternate;
this.wip = new AtomicInteger();
}

@Override
Expand All @@ -69,14 +81,33 @@ public void onCompleted() {
if (!empty) {
child.onCompleted();
} else if (!child.isUnsubscribed()) {
subscribeToAlternate();
active = false;
subscribe(null);
}
}

private void subscribeToAlternate() {
AlternateSubscriber<T> as = new AlternateSubscriber<T>(child, arbiter);
serial.set(as);
alternate.unsafeSubscribe(as);
void subscribe(Observable<? extends T> source) {
if (wip.getAndIncrement() == 0) {
do {
if (child.isUnsubscribed()) {
break;
}

if (!active) {
if (source == null) {
AlternateSubscriber<T> as = new AlternateSubscriber<T>(child, arbiter);
serial.set(as);
active = true;
alternate.unsafeSubscribe(as);
} else {
active = true;
source.unsafeSubscribe(this);
source = null;
}
}

} while (wip.decrementAndGet() != 0);
}
}

@Override
Expand Down
30 changes: 27 additions & 3 deletions src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@

import static org.junit.Assert.*;

import java.util.*;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;

import rx.*;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.functions.Action0;
import rx.functions.*;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;
Expand Down Expand Up @@ -212,4 +211,29 @@ public void call() {
public void testAlternateNull() {
Observable.just(1).switchIfEmpty(null);
}

Observable<StackTraceElement[]> recursiveSwitch(final int level) {
if (level == 100) {
return Observable.just(Thread.currentThread().getStackTrace());
}
return Observable.<StackTraceElement[]>empty().switchIfEmpty(Observable.defer(new Func0<Observable<StackTraceElement[]>>() {
@Override
public Observable<StackTraceElement[]> call() {
return recursiveSwitch(level + 1);
}
}));
}

@Test
public void stackDepth() {
StackTraceElement[] trace = recursiveSwitch(0)
.toBlocking().last();

if (trace.length > 1000 || trace.length < 100) {
for (StackTraceElement ste : trace) {
System.out.println(ste);
}
fail("Stack too deep: " + trace.length);
}
}
}