Skip to content

Commit eecb41d

Browse files
committed
Merge pull request #3335 from akarnokd/ScalarFastPathEnable2x
2.x: scalar flatMap optimization enabled
2 parents dd51e5d + 99359a7 commit eecb41d

18 files changed

+210
-241
lines changed

src/main/java/io/reactivex/Observable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,7 @@ public static Observable<Long> intervalRange(long start, long count, long initia
596596
@SchedulerSupport(SchedulerKind.NONE)
597597
public static <T> Observable<T> just(T value) {
598598
Objects.requireNonNull(value);
599-
return create(new PublisherScalarSource<>(value));
599+
return new ObservableScalarSource<>(value);
600600
}
601601

602602
@BackpressureSupport(BackpressureKind.FULL)
@@ -1648,9 +1648,9 @@ public final <R> Observable<R> flatMap(Function<? super T, ? extends Publisher<?
16481648
throw new IllegalArgumentException("maxConcurrency > 0 required but it was " + maxConcurrency);
16491649
}
16501650
validateBufferSize(bufferSize);
1651-
if (onSubscribe instanceof PublisherScalarSource) {
1652-
PublisherScalarSource<T> scalar = (PublisherScalarSource<T>) onSubscribe;
1653-
return create(scalar.flatMap(mapper));
1651+
if (this instanceof ObservableScalarSource) {
1652+
ObservableScalarSource<T> scalar = (ObservableScalarSource<T>) this;
1653+
return create(scalar.scalarFlatMap(mapper));
16541654
}
16551655
return lift(new OperatorFlatMap<>(mapper, delayErrors, maxConcurrency, bufferSize));
16561656
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators;
15+
16+
import java.util.function.Function;
17+
18+
import org.reactivestreams.*;
19+
20+
import io.reactivex.Observable;
21+
import io.reactivex.internal.subscriptions.*;
22+
23+
/**
24+
* Represents a constant scalar value.
25+
*/
26+
public final class ObservableScalarSource<T> extends Observable<T> {
27+
private final T value;
28+
public ObservableScalarSource(T value) {
29+
super(new Publisher<T>() {
30+
@Override
31+
public void subscribe(Subscriber<? super T> s) {
32+
s.onSubscribe(new ScalarSubscription<>(s, value));
33+
}
34+
});
35+
this.value = value;
36+
}
37+
38+
public T value() {
39+
return value;
40+
}
41+
42+
public <U> Publisher<U> scalarFlatMap(Function<? super T, ? extends Publisher<? extends U>> mapper) {
43+
return s -> {
44+
Publisher<? extends U> other;
45+
try {
46+
other = mapper.apply(value);
47+
} catch (Throwable e) {
48+
EmptySubscription.error(e, s);
49+
return;
50+
}
51+
if (other == null) {
52+
EmptySubscription.error(new NullPointerException("The publisher returned by the function is null"), s);
53+
return;
54+
}
55+
other.subscribe(s);
56+
};
57+
}
58+
}

src/main/java/io/reactivex/internal/operators/OperatorFlatMap.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
1-
/*
2-
* Copyright 2011-2015 David Karnok
1+
/**
2+
* Copyright 2015 Netflix, Inc.
33
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
76
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
7+
* http://www.apache.org/licenses/LICENSE-2.0
98
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
1512
*/
1613

1714
package io.reactivex.internal.operators;
@@ -134,8 +131,8 @@ public void onNext(T t) {
134131
onError(e);
135132
return;
136133
}
137-
if (p instanceof PublisherScalarSource) {
138-
tryEmitScalar(((PublisherScalarSource<? extends U>)p).value());
134+
if (p instanceof ObservableScalarSource) {
135+
tryEmitScalar(((ObservableScalarSource<? extends U>)p).value());
139136
} else {
140137
InnerSubscriber<T, U> inner = new InnerSubscriber<>(this, uniqueId++);
141138
addInner(inner);
@@ -220,7 +217,11 @@ void tryEmitScalar(U value) {
220217
s.request(1);
221218
}
222219
} else {
223-
220+
Queue<U> q = getMainQueue();
221+
if (!q.offer(value)) {
222+
onError(new IllegalStateException("Scalar queue full?!"));
223+
return;
224+
}
224225
}
225226
if (decrementAndGet() == 0) {
226227
return;

src/main/java/io/reactivex/internal/operators/OperatorMap.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
1-
/*
2-
* Copyright 2011-2015 David Karnok
1+
/**
2+
* Copyright 2015 Netflix, Inc.
33
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
76
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
7+
* http://www.apache.org/licenses/LICENSE-2.0
98
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
1512
*/
1613

14+
1715
package io.reactivex.internal.operators;
1816

1917
import java.util.function.Function;

src/main/java/io/reactivex/internal/operators/PublisherArraySource.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
1-
/*
2-
* Copyright 2011-2015 David Karnok
1+
/**
2+
* Copyright 2015 Netflix, Inc.
33
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
76
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
7+
* http://www.apache.org/licenses/LICENSE-2.0
98
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
1512
*/
1613

1714
package io.reactivex.internal.operators;

src/main/java/io/reactivex/internal/operators/PublisherCompletableFutureSource.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
1-
/*
2-
* Copyright 2011-2015 David Karnok
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
1512
*/
13+
1614
package io.reactivex.internal.operators;
1715

1816
import java.util.concurrent.CompletableFuture;

src/main/java/io/reactivex/internal/operators/PublisherDefer.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
1-
/*
2-
* Copyright 2011-2015 David Karnok
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
1512
*/
13+
1614
package io.reactivex.internal.operators;
1715

1816
import java.util.function.Supplier;

src/main/java/io/reactivex/internal/operators/PublisherEmptySource.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
1-
/*
2-
* Copyright 2011-2015 David Karnok
1+
/**
2+
* Copyright 2015 Netflix, Inc.
33
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
76
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
7+
* http://www.apache.org/licenses/LICENSE-2.0
98
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
1512
*/
1613

1714
package io.reactivex.internal.operators;

src/main/java/io/reactivex/internal/operators/PublisherErrorSource.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
1-
/*
2-
* Copyright 2011-2015 David Karnok
1+
/**
2+
* Copyright 2015 Netflix, Inc.
33
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
76
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
7+
* http://www.apache.org/licenses/LICENSE-2.0
98
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
1512
*/
1613

1714
package io.reactivex.internal.operators;

src/main/java/io/reactivex/internal/operators/PublisherIterableSource.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
1-
/*
2-
* Copyright 2011-2015 David Karnok
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
1512
*/
13+
1614
package io.reactivex.internal.operators;
1715

1816
import java.util.Iterator;

src/main/java/io/reactivex/internal/operators/PublisherRangeSource.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
1-
/*
2-
* Copyright 2011-2015 David Karnok
1+
/**
2+
* Copyright 2015 Netflix, Inc.
33
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
76
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
7+
* http://www.apache.org/licenses/LICENSE-2.0
98
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
1512
*/
1613

1714
package io.reactivex.internal.operators;

src/main/java/io/reactivex/internal/operators/PublisherScalarAsyncSource.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
1-
/*
2-
* Copyright 2011-2015 David Karnok
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
1512
*/
13+
1614
package io.reactivex.internal.operators;
1715

1816
import java.util.concurrent.Callable;

0 commit comments

Comments
 (0)