Skip to content

Commit 0913412

Browse files
committed
Merge branch 'OperatorMergeFullRewrite' of
https://github.com/akarnokd/RxJava.git into OperatorMergeFullRewrite Conflicts: src/main/java/rx/internal/operators/OperatorMapNotification.java src/main/java/rx/internal/operators/OperatorMerge.java src/main/java/rx/internal/util/ScalarSynchronousObservable.java
2 parents 74a1138 + 8a6d506 commit 0913412

File tree

10 files changed

+2688
-63
lines changed

10 files changed

+2688
-63
lines changed

CHANGES.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,24 @@
11
# RxJava Releases #
22

3+
### Version 1.0.12 – June 9th 2015 ([Maven Central](http://search.maven.org/#artifactdetails%7Cio.reactivex%7Crxjava%7C1.0.12%7C)) ###
4+
5+
* [Pull 2963] (https://github.com/ReactiveX/RxJava/pull/2963) Set of standard producers and updated queue implementations
6+
* [Pull 2961] (https://github.com/ReactiveX/RxJava/pull/2961) fix Amb backpressure bug
7+
* [Pull 2960] (https://github.com/ReactiveX/RxJava/pull/2960) fix OperatorConcat race condition where request lost
8+
* [Pull 2985] (https://github.com/ReactiveX/RxJava/pull/2985) improve OperatorSerializeTest.testMultiThreadedWithNPEinMiddle
9+
* [Pull 2986] (https://github.com/ReactiveX/RxJava/pull/2986) OperatorAll - implement backpressure and include last value in exception cause
10+
* [Pull 2987] (https://github.com/ReactiveX/RxJava/pull/2987) fix skip() race condition and request overflow
11+
* [Pull 2988] (https://github.com/ReactiveX/RxJava/pull/2988) Operator exists() - implement backpressure & include last value in exception cause
12+
* [Pull 2989] (https://github.com/ReactiveX/RxJava/pull/2989) prevent take() from requesting more than needed
13+
* [Pull 2991] (https://github.com/ReactiveX/RxJava/pull/2991) takeUntil(predicate) - include last value in error cause
14+
* [Pull 2992] (https://github.com/ReactiveX/RxJava/pull/2992) Fix value rendering in error last cause for primitive types
15+
* [Pull 2993] (https://github.com/ReactiveX/RxJava/pull/2993) takeWhile(predicate) - include last value in error cause
16+
* [Pull 2996] (https://github.com/ReactiveX/RxJava/pull/2996) switchIfEmpty - fix backpressure bug and lost requests
17+
* [Pull 2999] (https://github.com/ReactiveX/RxJava/pull/2999) Fix a wrong assertion in assertError
18+
* [Pull 3000] (https://github.com/ReactiveX/RxJava/pull/3000) Replace the Java 7 AssertionError(message, cause) with initCause
19+
* [Pull 3001] (https://github.com/ReactiveX/RxJava/pull/3001) use Subscribers.from()
20+
* [Pull 3009] (https://github.com/ReactiveX/RxJava/pull/3009) Observable.from(iterable) - ensure it.hasNext() is not called unnecessarily
21+
322
### Version 1.0.11 – May 19th 2015 ([Maven Central](http://search.maven.org/#artifactdetails%7Cio.reactivex%7Crxjava%7C1.0.11%7C)) ###
423

524
* [Pull 2948] (https://github.com/ReactiveX/RxJava/pull/2948) More assertions for TestSubscriber

src/main/java/rx/Single.java

Lines changed: 1858 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx;
17+
18+
import rx.annotations.Experimental;
19+
import rx.internal.util.SubscriptionList;
20+
21+
/**
22+
* Provides a mechanism for receiving push-based notifications.
23+
* <p>
24+
* After an SingleSubscriber calls an {@link Single}'s {@link Single#subscribe subscribe} method, the
25+
* {@code Single} calls the SingleSubscriber's {@link #onSuccess} and {@link #onError} methods to provide notifications.
26+
* A well-behaved {@code Single} will call an SingleSubscriber's {@link #onSuccess} method exactly once or
27+
* the SingleSubscriber's {@link #onError} method exactly once.
28+
*
29+
* @see <a href="http://reactivex.io/documentation/observable.html">ReactiveX documentation: Observable</a>
30+
* @param <T>
31+
* the type of item the SingleSubscriber expects to observe
32+
*/
33+
@Experimental
34+
public abstract class SingleSubscriber<T> implements Subscription {
35+
36+
private final SubscriptionList cs = new SubscriptionList();
37+
38+
/**
39+
* Notifies the SingleSubscriber with a single item and that the {@link Single} has finished sending push-based notifications.
40+
* <p>
41+
* The {@link Single} will not call this method if it calls {@link #onError}.
42+
*/
43+
public abstract void onSuccess(T value);
44+
45+
/**
46+
* Notifies the SingleSubscriber that the {@link Single} has experienced an error condition.
47+
* <p>
48+
* If the {@link Single} calls this method, it will not thereafter call {@link #onSuccess}.
49+
*
50+
* @param e
51+
* the exception encountered by the Single
52+
*/
53+
public abstract void onError(Throwable error);
54+
55+
/**
56+
* Adds a {@link Subscription} to this Subscriber's list of subscriptions if this list is not marked as
57+
* unsubscribed. If the list <em>is</em> marked as unsubscribed, {@code add} will indicate this by
58+
* explicitly unsubscribing the new {@code Subscription} as well.
59+
*
60+
* @param s
61+
* the {@code Subscription} to add
62+
*/
63+
public final void add(Subscription s) {
64+
cs.add(s);
65+
}
66+
67+
@Override
68+
public final void unsubscribe() {
69+
cs.unsubscribe();
70+
}
71+
72+
/**
73+
* Indicates whether this Subscriber has unsubscribed from its list of subscriptions.
74+
*
75+
* @return {@code true} if this Subscriber has unsubscribed from its subscriptions, {@code false} otherwise
76+
*/
77+
@Override
78+
public final boolean isUnsubscribed() {
79+
return cs.isUnsubscribed();
80+
}
81+
}

src/main/java/rx/Subscriber.java

Lines changed: 53 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,23 @@
3131
* the type of items the Subscriber expects to observe
3232
*/
3333
public abstract class Subscriber<T> implements Observer<T>, Subscription {
34+
35+
// represents requested not set yet
36+
private static final Long NOT_SET = Long.MIN_VALUE;
3437

35-
private final SubscriptionList cs;
36-
private final Subscriber<?> op;
38+
private final SubscriptionList subscriptions;
39+
private final Subscriber<?> subscriber;
3740
/* protected by `this` */
38-
private Producer p;
41+
private Producer producer;
3942
/* protected by `this` */
40-
private long requested = Long.MIN_VALUE; // default to not set
43+
private long requested = NOT_SET; // default to not set
4144

4245
protected Subscriber() {
4346
this(null, false);
4447
}
4548

46-
protected Subscriber(Subscriber<?> op) {
47-
this(op, true);
49+
protected Subscriber(Subscriber<?> subscriber) {
50+
this(subscriber, true);
4851
}
4952

5053
/**
@@ -53,15 +56,15 @@ protected Subscriber(Subscriber<?> op) {
5356
* <p>
5457
* To retain the chaining of subscribers, add the created instance to {@code op} via {@link #add}.
5558
*
56-
* @param op
59+
* @param subscriber
5760
* the other Subscriber
5861
* @param shareSubscriptions
5962
* {@code true} to share the subscription list in {@code op} with this instance
6063
* @since 1.0.6
6164
*/
62-
protected Subscriber(Subscriber<?> op, boolean shareSubscriptions) {
63-
this.op = op;
64-
this.cs = shareSubscriptions && op != null ? op.cs : new SubscriptionList();
65+
protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
66+
this.subscriber = subscriber;
67+
this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
6568
}
6669

6770
/**
@@ -73,12 +76,12 @@ protected Subscriber(Subscriber<?> op, boolean shareSubscriptions) {
7376
* the {@code Subscription} to add
7477
*/
7578
public final void add(Subscription s) {
76-
cs.add(s);
79+
subscriptions.add(s);
7780
}
7881

7982
@Override
8083
public final void unsubscribe() {
81-
cs.unsubscribe();
84+
subscriptions.unsubscribe();
8285
}
8386

8487
/**
@@ -88,7 +91,7 @@ public final void unsubscribe() {
8891
*/
8992
@Override
9093
public final boolean isUnsubscribed() {
91-
return cs.isUnsubscribed();
94+
return subscriptions.isUnsubscribed();
9295
}
9396

9497
/**
@@ -124,57 +127,64 @@ protected final void request(long n) {
124127
if (n < 0) {
125128
throw new IllegalArgumentException("number requested cannot be negative: " + n);
126129
}
127-
Producer shouldRequest = null;
130+
131+
// if producer is set then we will request from it
132+
// otherwise we increase the requested count by n
133+
Producer producerToRequestFrom = null;
128134
synchronized (this) {
129-
if (p != null) {
130-
shouldRequest = p;
131-
} else if (requested == Long.MIN_VALUE) {
132-
requested = n;
133-
} else {
134-
final long total = requested + n;
135-
// check if overflow occurred
136-
if (total < 0) {
137-
requested = Long.MAX_VALUE;
138-
} else {
139-
requested = total;
140-
}
135+
if (producer != null) {
136+
producerToRequestFrom = producer;
137+
} else {
138+
addToRequested(n);
139+
return;
141140
}
142141
}
143-
// after releasing lock
144-
if (shouldRequest != null) {
145-
shouldRequest.request(n);
146-
}
142+
// after releasing lock (we should not make requests holding a lock)
143+
producerToRequestFrom.request(n);
147144
}
148145

146+
private void addToRequested(long n) {
147+
if (requested == NOT_SET) {
148+
requested = n;
149+
} else {
150+
final long total = requested + n;
151+
// check if overflow occurred
152+
if (total < 0) {
153+
requested = Long.MAX_VALUE;
154+
} else {
155+
requested = total;
156+
}
157+
}
158+
}
159+
149160
/**
150161
* @warn javadoc description missing
151162
* @warn param producer not described
152-
* @param producer
163+
* @param p
153164
*/
154-
public void setProducer(Producer producer) {
165+
public void setProducer(Producer p) {
155166
long toRequest;
156-
boolean setProducer = false;
167+
boolean passToSubscriber = false;
157168
synchronized (this) {
158169
toRequest = requested;
159-
p = producer;
160-
if (op != null) {
170+
producer = p;
171+
if (subscriber != null) {
161172
// middle operator ... we pass thru unless a request has been made
162-
if (toRequest == Long.MIN_VALUE) {
173+
if (toRequest == NOT_SET) {
163174
// we pass-thru to the next producer as nothing has been requested
164-
setProducer = true;
175+
passToSubscriber = true;
165176
}
166-
167177
}
168178
}
169179
// do after releasing lock
170-
if (setProducer) {
171-
op.setProducer(p);
180+
if (passToSubscriber) {
181+
subscriber.setProducer(producer);
172182
} else {
173183
// we execute the request with whatever has been requested (or Long.MAX_VALUE)
174-
if (toRequest == Long.MIN_VALUE) {
175-
p.request(Long.MAX_VALUE);
184+
if (toRequest == NOT_SET) {
185+
producer.request(Long.MAX_VALUE);
176186
} else {
177-
p.request(toRequest);
187+
producer.request(toRequest);
178188
}
179189
}
180190
}

src/main/java/rx/internal/operators/OnSubscribeFromIterable.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -71,42 +71,52 @@ public void request(long n) {
7171
}
7272
if (n == Long.MAX_VALUE && REQUESTED_UPDATER.compareAndSet(this, 0, Long.MAX_VALUE)) {
7373
// fast-path without backpressure
74-
while (it.hasNext()) {
74+
75+
while (true) {
7576
if (o.isUnsubscribed()) {
7677
return;
78+
} else if (it.hasNext()) {
79+
o.onNext(it.next());
80+
} else if (!o.isUnsubscribed()) {
81+
o.onCompleted();
82+
return;
83+
} else {
84+
// is unsubscribed
85+
return;
7786
}
78-
o.onNext(it.next());
79-
}
80-
if (!o.isUnsubscribed()) {
81-
o.onCompleted();
8287
}
8388
} else if (n > 0) {
8489
// backpressure is requested
8590
long _c = BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER, this, n);
8691
if (_c == 0) {
8792
while (true) {
8893
/*
89-
* This complicated logic is done to avoid touching the volatile `requested` value
90-
* during the loop itself. If it is touched during the loop the performance is impacted significantly.
94+
* This complicated logic is done to avoid touching the
95+
* volatile `requested` value during the loop itself. If
96+
* it is touched during the loop the performance is
97+
* impacted significantly.
9198
*/
9299
long r = requested;
93100
long numToEmit = r;
94-
while (it.hasNext() && --numToEmit >= 0) {
101+
while (true) {
95102
if (o.isUnsubscribed()) {
96103
return;
97-
}
98-
o.onNext(it.next());
99-
100-
}
101-
102-
if (!it.hasNext()) {
103-
if (!o.isUnsubscribed()) {
104+
} else if (it.hasNext()) {
105+
if (--numToEmit >= 0) {
106+
o.onNext(it.next());
107+
} else
108+
break;
109+
} else if (!o.isUnsubscribed()) {
104110
o.onCompleted();
111+
return;
112+
} else {
113+
// is unsubscribed
114+
return;
105115
}
106-
return;
107116
}
108117
if (REQUESTED_UPDATER.addAndGet(this, -r) == 0) {
109-
// we're done emitting the number requested so return
118+
// we're done emitting the number requested so
119+
// return
110120
return;
111121
}
112122

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -932,7 +932,6 @@ public FastInnerSubscriber(MergeSubscriber<T> parent) {
932932
this.parent = parent;
933933
}
934934

935-
936935
@Override
937936
public void onNext(T t) {
938937
try {
@@ -958,4 +957,4 @@ public void onCompleted() {
958957
}
959958
}
960959
}
961-
}
960+
}

src/perf/java/rx/PerfBaseline.java renamed to src/perf/java/rx/ObservablePerfBaseline.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
@BenchmarkMode(Mode.Throughput)
3232
@OutputTimeUnit(TimeUnit.SECONDS)
33-
public class PerfBaseline {
33+
public class ObservablePerfBaseline {
3434

3535
@State(Scope.Thread)
3636
public static class Input extends InputWithIncrementingInteger {

0 commit comments

Comments
 (0)