From 38770dba359682970eac54d79d2980d404d94940 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Thu, 29 Oct 2015 00:33:08 +0100 Subject: [PATCH 1/2] 1.x: reduce Subscriber's creation overhead --- src/main/java/rx/Subscriber.java | 92 ++++++++++++++++++++++---------- 1 file changed, 63 insertions(+), 29 deletions(-) diff --git a/src/main/java/rx/Subscriber.java b/src/main/java/rx/Subscriber.java index 67ac611e4c..feb9ac1ba0 100644 --- a/src/main/java/rx/Subscriber.java +++ b/src/main/java/rx/Subscriber.java @@ -33,15 +33,25 @@ public abstract class Subscriber implements Observer, Subscription { // represents requested not set yet - private static final Long NOT_SET = Long.MIN_VALUE; + private static final long NOT_SET = Long.MIN_VALUE; - private final SubscriptionList subscriptions; +// private final SubscriptionList subscriptions; private final Subscriber subscriber; /* protected by `this` */ private Producer producer; /* protected by `this` */ private long requested = NOT_SET; // default to not set + SubscriptionList subscriptions; + static final SubscriptionList UNSUBSCRIBED; + + final boolean sharedSubscriptions; + + static { + UNSUBSCRIBED = new SubscriptionList(); + UNSUBSCRIBED.unsubscribe(); + } + protected Subscriber() { this(null, false); } @@ -78,7 +88,7 @@ protected Subscriber(Subscriber subscriber) { */ protected Subscriber(Subscriber subscriber, boolean shareSubscriptions) { this.subscriber = subscriber; - this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList(); + this.sharedSubscriptions = shareSubscriptions && subscriber != null; } /** @@ -90,12 +100,37 @@ protected Subscriber(Subscriber subscriber, boolean shareSubscriptions) { * the {@code Subscription} to add */ public final void add(Subscription s) { - subscriptions.add(s); + if (sharedSubscriptions) { + subscriber.add(s); + return; + } + SubscriptionList sl; + synchronized (this) { + sl = subscriptions; + if (sl == null) { + sl = new SubscriptionList(); + subscriptions = sl; + } + } + sl.add(s); } @Override public final void unsubscribe() { - subscriptions.unsubscribe(); + if (sharedSubscriptions) { + subscriber.unsubscribe(); + return; + } + SubscriptionList sl; + synchronized (this) { + sl = subscriptions; + if (sl == null) { + subscriptions = UNSUBSCRIBED; + return; + } + } + + sl.unsubscribe(); } /** @@ -105,7 +140,11 @@ public final void unsubscribe() { */ @Override public final boolean isUnsubscribed() { - return subscriptions.isUnsubscribed(); + if (sharedSubscriptions) { + return subscriber.isUnsubscribed(); + } + SubscriptionList sl = subscriptions; + return sl != null && sl.isUnsubscribed(); } /** @@ -144,12 +183,20 @@ protected final void request(long n) { // if producer is set then we will request from it // otherwise we increase the requested count by n - Producer producerToRequestFrom = null; + Producer producerToRequestFrom; synchronized (this) { - if (producer != null) { - producerToRequestFrom = producer; - } else { - addToRequested(n); + producerToRequestFrom = producer; + if (producerToRequestFrom == null) { + long r = requested; + if (r == NOT_SET) { + requested = n; + } else { + long u = r + n; + if (u < 0L) { + u = Long.MAX_VALUE; + } + requested = u; + } return; } } @@ -157,20 +204,6 @@ protected final void request(long n) { producerToRequestFrom.request(n); } - private void addToRequested(long n) { - if (requested == NOT_SET) { - requested = n; - } else { - final long total = requested + n; - // check if overflow occurred - if (total < 0) { - requested = Long.MAX_VALUE; - } else { - requested = total; - } - } - } - /** * If other subscriber is set (by calling constructor * {@link #Subscriber(Subscriber)} or @@ -189,10 +222,11 @@ private void addToRequested(long n) { public void setProducer(Producer p) { long toRequest; boolean passToSubscriber = false; + final Subscriber child = subscriber; synchronized (this) { toRequest = requested; producer = p; - if (subscriber != null) { + if (child != null) { // middle operator ... we pass thru unless a request has been made if (toRequest == NOT_SET) { // we pass-thru to the next producer as nothing has been requested @@ -202,13 +236,13 @@ public void setProducer(Producer p) { } // do after releasing lock if (passToSubscriber) { - subscriber.setProducer(producer); + child.setProducer(p); } else { // we execute the request with whatever has been requested (or Long.MAX_VALUE) if (toRequest == NOT_SET) { - producer.request(Long.MAX_VALUE); + p.request(Long.MAX_VALUE); } else { - producer.request(toRequest); + p.request(toRequest); } } } From 1c0d45b949bcd0abec30b3ed6eab88bda348be67 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 29 Oct 2015 09:31:17 +0100 Subject: [PATCH 2/2] Updated perf. --- src/perf/java/rx/SubscribingPerf.java | 85 +++++++++++++++++++++++---- 1 file changed, 72 insertions(+), 13 deletions(-) diff --git a/src/perf/java/rx/SubscribingPerf.java b/src/perf/java/rx/SubscribingPerf.java index cdc229c8b9..9f40c5bc97 100644 --- a/src/perf/java/rx/SubscribingPerf.java +++ b/src/perf/java/rx/SubscribingPerf.java @@ -21,6 +21,8 @@ import org.openjdk.jmh.annotations.*; import org.openjdk.jmh.infra.Blackhole; +import rx.functions.Func1; + /** * Benchmark the cost of subscription and initial request management. *

@@ -38,64 +40,121 @@ public class SubscribingPerf { @Benchmark public void justDirect(Blackhole bh) { - just.subscribe(new DirectSubscriber(Long.MAX_VALUE, bh)); + DirectSubscriber subscriber = new DirectSubscriber(Long.MAX_VALUE, bh); + bh.consume(subscriber); + just.subscribe(subscriber); } @Benchmark public void justStarted(Blackhole bh) { - just.subscribe(new StartedSubscriber(Long.MAX_VALUE, bh)); + StartedSubscriber subscriber = new StartedSubscriber(Long.MAX_VALUE, bh); + bh.consume(subscriber); + just.subscribe(subscriber); } @Benchmark public void justUsual(Blackhole bh) { - just.subscribe(new UsualSubscriber(Long.MAX_VALUE, bh)); + UsualSubscriber subscriber = new UsualSubscriber(Long.MAX_VALUE, bh); + bh.consume(subscriber); + just.subscribe(subscriber); } @Benchmark public void rangeDirect(Blackhole bh) { - range.subscribe(new DirectSubscriber(Long.MAX_VALUE, bh)); + DirectSubscriber subscriber = new DirectSubscriber(Long.MAX_VALUE, bh); + bh.consume(subscriber); + range.subscribe(subscriber); } @Benchmark public void rangeStarted(Blackhole bh) { - range.subscribe(new DirectSubscriber(Long.MAX_VALUE, bh)); + StartedSubscriber subscriber = new StartedSubscriber(Long.MAX_VALUE, bh); + bh.consume(subscriber); + range.subscribe(subscriber); } @Benchmark public void rangeUsual(Blackhole bh) { - range.subscribe(new UsualSubscriber(Long.MAX_VALUE, bh)); + UsualSubscriber subscriber = new UsualSubscriber(Long.MAX_VALUE, bh); + bh.consume(subscriber); + range.subscribe(subscriber); } @Benchmark public void justDirectUnsafe(Blackhole bh) { - just.unsafeSubscribe(new DirectSubscriber(Long.MAX_VALUE, bh)); + DirectSubscriber subscriber = new DirectSubscriber(Long.MAX_VALUE, bh); + bh.consume(subscriber); + just.unsafeSubscribe(subscriber); } @Benchmark public void justStartedUnsafe(Blackhole bh) { - just.unsafeSubscribe(new StartedSubscriber(Long.MAX_VALUE, bh)); + StartedSubscriber subscriber = new StartedSubscriber(Long.MAX_VALUE, bh); + bh.consume(subscriber); + just.unsafeSubscribe(subscriber); } @Benchmark public void justUsualUnsafe(Blackhole bh) { - just.unsafeSubscribe(new UsualSubscriber(Long.MAX_VALUE, bh)); + UsualSubscriber subscriber = new UsualSubscriber(Long.MAX_VALUE, bh); + bh.consume(subscriber); + just.unsafeSubscribe(subscriber); } @Benchmark public void rangeDirectUnsafe(Blackhole bh) { - range.unsafeSubscribe(new DirectSubscriber(Long.MAX_VALUE, bh)); + DirectSubscriber subscriber = new DirectSubscriber(Long.MAX_VALUE, bh); + bh.consume(subscriber); + range.unsafeSubscribe(subscriber); } @Benchmark public void rangeStartedUnsafe(Blackhole bh) { - range.unsafeSubscribe(new DirectSubscriber(Long.MAX_VALUE, bh)); + StartedSubscriber subscriber = new StartedSubscriber(Long.MAX_VALUE, bh); + bh.consume(subscriber); + range.unsafeSubscribe(subscriber); } @Benchmark public void rangeUsualUnsafe(Blackhole bh) { - range.unsafeSubscribe(new UsualSubscriber(Long.MAX_VALUE, bh)); + UsualSubscriber subscriber = new UsualSubscriber(Long.MAX_VALUE, bh); + bh.consume(subscriber); + range.unsafeSubscribe(subscriber); } + @State(Scope.Thread) + public static class Chain { + @Param({"10", "1000", "1000000"}) + public int times; + + @Param({"1", "2", "3", "4", "5"}) + public int maps; + + Observable source; + + @Setup + public void setup() { + Observable o = Observable.range(1, times); + + for (int i = 0; i < maps; i++) { + o = o.map(new Func1() { + @Override + public Integer call(Integer v) { + return v + 1; + } + }); + } + + source = o; + } + + @Benchmark + public void mapped(Chain c, Blackhole bh) { + DirectSubscriber subscriber = new DirectSubscriber(Long.MAX_VALUE, bh); + bh.consume(subscriber); + c.source.subscribe(subscriber); + } + } static final class DirectSubscriber extends Subscriber { final long r; @@ -179,4 +238,4 @@ public void onCompleted() { } } -} +} \ No newline at end of file