Skip to content

Commit 74ce009

Browse files
Synchronized SubscriptionList
As per discussion in ReactiveX#1383 ... despite the performance hit.
1 parent 1a92220 commit 74ce009

File tree

3 files changed

+45
-8
lines changed

3 files changed

+45
-8
lines changed

rxjava-core/src/main/java/rx/internal/util/SubscriptionList.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public synchronized boolean isUnsubscribed() {
5454
* @param s
5555
* the {@link Subscription} to add
5656
*/
57-
public void add(final Subscription s) {
57+
public synchronized void add(final Subscription s) {
5858
if (unsubscribed) {
5959
s.unsubscribe();
6060
} else {
@@ -71,13 +71,19 @@ public void add(final Subscription s) {
7171
*/
7272
@Override
7373
public void unsubscribe() {
74-
if (unsubscribed) {
75-
return;
74+
Collection<Subscription> toUnsubscribe = null;
75+
synchronized (this) {
76+
if (unsubscribed) {
77+
return;
78+
}
79+
unsubscribed = true;
80+
toUnsubscribe = subscriptions;
81+
subscriptions = null;
82+
}
83+
84+
if (toUnsubscribe != null) {
85+
unsubscribeFromAll(toUnsubscribe);
7686
}
77-
unsubscribed = true;
78-
// we will only get here once
79-
unsubscribeFromAll(subscriptions);
80-
subscriptions = null;
8187
}
8288

8389
private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {

rxjava-core/src/perf/java/rx/PerfBaseline.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ public void observableConsumption(Input input) throws InterruptedException {
3939
public void observableViaRange(Input input) throws InterruptedException {
4040
input.observable.subscribe(input.observer);
4141
}
42+
43+
@Benchmark
44+
public void observableConsumptionUnsafe(Input input) throws InterruptedException {
45+
input.firehose.unsafeSubscribe(input.newSubscriber());
46+
}
47+
48+
@Benchmark
49+
public void observableViaRangeUnsafe(Input input) throws InterruptedException {
50+
input.observable.unsafeSubscribe(input.newSubscriber());
51+
}
4252

4353
@Benchmark
4454
public void iterableViaForLoopConsumption(Input input) throws InterruptedException {

rxjava-core/src/perf/java/rx/jmh/InputWithIncrementingInteger.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public abstract class InputWithIncrementingInteger {
3636
public Observer<Integer> observer;
3737

3838
public abstract int getSize();
39-
39+
4040
@Setup
4141
public void setup(final Blackhole bh) {
4242
this.bh = bh;
@@ -106,4 +106,25 @@ public LatchedObserver<Integer> newLatchedObserver() {
106106
return new LatchedObserver<Integer>(bh);
107107
}
108108

109+
public Subscriber<Integer> newSubscriber() {
110+
return new Subscriber<Integer>() {
111+
112+
@Override
113+
public void onCompleted() {
114+
115+
}
116+
117+
@Override
118+
public void onError(Throwable e) {
119+
120+
}
121+
122+
@Override
123+
public void onNext(Integer t) {
124+
bh.consume(t);
125+
}
126+
127+
};
128+
}
129+
109130
}

0 commit comments

Comments
 (0)