From 8ed552b0103e3ac62ecb64750e3e544eb685b833 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Mon, 8 Apr 2013 21:15:58 +0200 Subject: [PATCH 1/6] First attempt at "sample" operator. Should probably use "interval" operator instead of an internal clock. Also still needs tests! --- .../java/rx/operators/OperationSample.java | 147 ++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationSample.java diff --git a/rxjava-core/src/main/java/rx/operators/OperationSample.java b/rxjava-core/src/main/java/rx/operators/OperationSample.java new file mode 100644 index 0000000000..c4a3b676ae --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationSample.java @@ -0,0 +1,147 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static rx.operators.Tester.UnitTest.*; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +/** + * Samples the observable sequence at each interval. + */ +public final class OperationSample { + + /** + * Samples the observable sequence at each interval. + */ + public static Func1, Subscription> sample(final Observable source, long interval, TimeUnit unit) { + return new Sample(source, interval, unit); + } + + private static class Sample implements Func1, Subscription> { + private final Observable source; + private final long interval; + private final TimeUnit unit; + + private final AtomicBoolean hasValue = new AtomicBoolean(); + private final AtomicReference latestValue = new AtomicReference(); + private final AtomicBoolean sourceCompleted = new AtomicBoolean(); + + private Sample(Observable source, long interval, TimeUnit unit) { + this.source = source; + this.interval = interval; + this.unit = unit; + } + + @Override + public Subscription call(final Observer observer) { + Clock clock = new Clock(Schedulers.currentThread(), interval, unit); + final Subscription clockSubscription = Observable.create(clock).subscribe(new Observer() { + @Override + public void onCompleted() { /* the clock never completes */ } + + @Override + public void onError(Exception e) { /* the clock has no errors */ } + + @Override + public void onNext(Long totalTimePassed) { + if (hasValue.get()) { + observer.onNext(latestValue.get()); + } + } + }); + + Subscription sourceSubscription = source.subscribe(new Observer() { + @Override + public void onCompleted() { + clockSubscription.unsubscribe(); + sourceCompleted.set(true); + observer.onCompleted(); + } + + @Override + public void onError(Exception e) { + clockSubscription.unsubscribe(); + sourceCompleted.set(true); + observer.onError(e); + } + + @Override + public void onNext(T value) { + latestValue.set(value); + hasValue.set(true); + } + }); + + return clockSubscription; + } + + private class Clock implements Func1, Subscription> { + private final Scheduler scheduler; + private final long interval; + private final TimeUnit unit; + + private long timePassed; + + private Clock(Scheduler scheduler, long interval, TimeUnit unit) { + this.scheduler = scheduler; + this.interval = interval; + this.unit = unit; + } + + @Override + public Subscription call(final Observer observer) { + return scheduler.schedule(new Func0() { + @Override + public Subscription call() { + if (! sourceCompleted.get()) { + timePassed += interval; + observer.onNext(timePassed); + return Clock.this.call(observer); + } + return Subscriptions.create(new Action0() { + @Override + public void call() { + // TODO Auto-generated method stub + } + }); + } + }, interval, unit); + } + } + } + + public static class UnitTest { + // TODO + } +} From 74ba4ebca1188e593526829a506113f2e930fa5e Mon Sep 17 00:00:00 2001 From: jmhofer Date: Thu, 11 Apr 2013 14:01:54 +0200 Subject: [PATCH 2/6] Switched sample from internal clock to using the interval operator --- .../java/rx/operators/OperationSample.java | 67 ++++++------------- 1 file changed, 22 insertions(+), 45 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationSample.java b/rxjava-core/src/main/java/rx/operators/OperationSample.java index c4a3b676ae..90195d1a27 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSample.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSample.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.*; import static rx.operators.Tester.UnitTest.*; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -33,7 +34,6 @@ import rx.concurrency.Schedulers; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; -import rx.util.functions.Func0; import rx.util.functions.Func1; /** @@ -45,28 +45,35 @@ public final class OperationSample { * Samples the observable sequence at each interval. */ public static Func1, Subscription> sample(final Observable source, long interval, TimeUnit unit) { - return new Sample(source, interval, unit); + return new Sample(source, interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); } + /** + * Samples the observable sequence at each interval. + */ + public static Func1, Subscription> sample(final Observable source, long interval, TimeUnit unit, Scheduler scheduler) { + return new Sample(source, interval, unit, scheduler); + } private static class Sample implements Func1, Subscription> { private final Observable source; private final long interval; private final TimeUnit unit; + private final Scheduler scheduler; private final AtomicBoolean hasValue = new AtomicBoolean(); private final AtomicReference latestValue = new AtomicReference(); - private final AtomicBoolean sourceCompleted = new AtomicBoolean(); - private Sample(Observable source, long interval, TimeUnit unit) { + private Sample(Observable source, long interval, TimeUnit unit, Scheduler scheduler) { this.source = source; this.interval = interval; this.unit = unit; + this.scheduler = scheduler; } @Override public Subscription call(final Observer observer) { - Clock clock = new Clock(Schedulers.currentThread(), interval, unit); - final Subscription clockSubscription = Observable.create(clock).subscribe(new Observer() { + Observable clock = Observable.create(OperationInterval.interval(interval, unit, scheduler)); + final Subscription clockSubscription = clock.subscribe(new Observer() { @Override public void onCompleted() { /* the clock never completes */ } @@ -74,25 +81,23 @@ public void onCompleted() { /* the clock never completes */ } public void onError(Exception e) { /* the clock has no errors */ } @Override - public void onNext(Long totalTimePassed) { + public void onNext(@SuppressWarnings("unused") Long tick) { if (hasValue.get()) { observer.onNext(latestValue.get()); } } }); - Subscription sourceSubscription = source.subscribe(new Observer() { + final Subscription sourceSubscription = source.subscribe(new Observer() { @Override public void onCompleted() { clockSubscription.unsubscribe(); - sourceCompleted.set(true); observer.onCompleted(); } @Override public void onError(Exception e) { clockSubscription.unsubscribe(); - sourceCompleted.set(true); observer.onError(e); } @@ -103,41 +108,13 @@ public void onNext(T value) { } }); - return clockSubscription; - } - - private class Clock implements Func1, Subscription> { - private final Scheduler scheduler; - private final long interval; - private final TimeUnit unit; - - private long timePassed; - - private Clock(Scheduler scheduler, long interval, TimeUnit unit) { - this.scheduler = scheduler; - this.interval = interval; - this.unit = unit; - } - - @Override - public Subscription call(final Observer observer) { - return scheduler.schedule(new Func0() { - @Override - public Subscription call() { - if (! sourceCompleted.get()) { - timePassed += interval; - observer.onNext(timePassed); - return Clock.this.call(observer); - } - return Subscriptions.create(new Action0() { - @Override - public void call() { - // TODO Auto-generated method stub - } - }); - } - }, interval, unit); - } + return Subscriptions.create(new Action0() { + @Override + public void call() { + clockSubscription.unsubscribe(); + sourceSubscription.unsubscribe(); + } + }); } } From cd7a56e7e8181b2ebfb7925f7dee7b74fa438fba Mon Sep 17 00:00:00 2001 From: jmhofer Date: Thu, 11 Apr 2013 14:22:58 +0200 Subject: [PATCH 3/6] Added a unit test against the sample operator --- .../java/rx/operators/OperationSample.java | 80 ++++++++++++++++++- 1 file changed, 76 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationSample.java b/rxjava-core/src/main/java/rx/operators/OperationSample.java index 90195d1a27..96e61170bf 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSample.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSample.java @@ -15,23 +15,24 @@ */ package rx.operators; -import static org.junit.Assert.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; -import static rx.operators.Tester.UnitTest.*; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import org.junit.Before; import org.junit.Test; +import org.mockito.InOrder; import rx.Observable; import rx.Observer; import rx.Scheduler; import rx.Subscription; import rx.concurrency.Schedulers; +import rx.concurrency.TestScheduler; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Func1; @@ -78,7 +79,7 @@ public Subscription call(final Observer observer) { public void onCompleted() { /* the clock never completes */ } @Override - public void onError(Exception e) { /* the clock has no errors */ } + public void onError(@SuppressWarnings("unused") Exception e) { /* the clock has no errors */ } @Override public void onNext(@SuppressWarnings("unused") Long tick) { @@ -119,6 +120,77 @@ public void call() { } public static class UnitTest { - // TODO + private TestScheduler scheduler; + private Observer observer; + + @Before + @SuppressWarnings("unchecked") // due to mocking + public void before() { + scheduler = new TestScheduler(); + observer = mock(Observer.class); + } + + @Test + public void testSample() { + Observable source = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(final Observer observer1) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer1.onNext(1L); + } + }, 1, TimeUnit.SECONDS); + scheduler.schedule(new Action0() { + @Override + public void call() { + observer1.onNext(2L); + } + }, 2, TimeUnit.SECONDS); + scheduler.schedule(new Action0() { + @Override + public void call() { + observer1.onCompleted(); + } + }, 3, TimeUnit.SECONDS); + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationSample.sample(source, 400L, TimeUnit.MILLISECONDS, scheduler)); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(800L, TimeUnit.MILLISECONDS); + verify(observer, never()).onNext(any(Long.class)); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(1200L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(1L); + verify(observer, never()).onNext(2L); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(1600L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(1L); + verify(observer, never()).onNext(2L); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(2000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(1L); + inOrder.verify(observer, times(1)).onNext(2L); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(3000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(1L); + inOrder.verify(observer, times(2)).onNext(2L); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + } } } From 5924a761d1a3351c4ae90e0325fe83f8f9f07cc6 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Mon, 22 Apr 2013 10:45:06 +0200 Subject: [PATCH 4/6] added missing linebreak --- rxjava-core/src/main/java/rx/operators/OperationSample.java | 1 + 1 file changed, 1 insertion(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationSample.java b/rxjava-core/src/main/java/rx/operators/OperationSample.java index 96e61170bf..7b8ade471f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSample.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSample.java @@ -55,6 +55,7 @@ public static Func1, Subscription> sample(final Observable so public static Func1, Subscription> sample(final Observable source, long interval, TimeUnit unit, Scheduler scheduler) { return new Sample(source, interval, unit, scheduler); } + private static class Sample implements Func1, Subscription> { private final Observable source; private final long interval; From 36e460a3e6ea40fd9453b06350da1cdc5292d6cd Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 27 Apr 2013 22:00:31 +0200 Subject: [PATCH 5/6] minor renaming and removing unnecessary warning suppression again --- .../java/rx/operators/OperationSample.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationSample.java b/rxjava-core/src/main/java/rx/operators/OperationSample.java index 7b8ade471f..2773cbfe3c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSample.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSample.java @@ -45,20 +45,20 @@ public final class OperationSample { /** * Samples the observable sequence at each interval. */ - public static Func1, Subscription> sample(final Observable source, long interval, TimeUnit unit) { - return new Sample(source, interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); + public static Func1, Subscription> sample(final Observable source, long period, TimeUnit unit) { + return new Sample(source, period, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); } /** * Samples the observable sequence at each interval. */ - public static Func1, Subscription> sample(final Observable source, long interval, TimeUnit unit, Scheduler scheduler) { - return new Sample(source, interval, unit, scheduler); + public static Func1, Subscription> sample(final Observable source, long period, TimeUnit unit, Scheduler scheduler) { + return new Sample(source, period, unit, scheduler); } private static class Sample implements Func1, Subscription> { private final Observable source; - private final long interval; + private final long period; private final TimeUnit unit; private final Scheduler scheduler; @@ -67,23 +67,23 @@ private static class Sample implements Func1, Subscription> { private Sample(Observable source, long interval, TimeUnit unit, Scheduler scheduler) { this.source = source; - this.interval = interval; + this.period = interval; this.unit = unit; this.scheduler = scheduler; } @Override public Subscription call(final Observer observer) { - Observable clock = Observable.create(OperationInterval.interval(interval, unit, scheduler)); + Observable clock = Observable.create(OperationInterval.interval(period, unit, scheduler)); final Subscription clockSubscription = clock.subscribe(new Observer() { @Override public void onCompleted() { /* the clock never completes */ } @Override - public void onError(@SuppressWarnings("unused") Exception e) { /* the clock has no errors */ } + public void onError(Exception e) { /* the clock has no errors */ } @Override - public void onNext(@SuppressWarnings("unused") Long tick) { + public void onNext(Long tick) { if (hasValue.get()) { observer.onNext(latestValue.get()); } From ae2183c71dd798a6a45b538ec673b4a50d175589 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Wed, 1 May 2013 17:19:47 +0200 Subject: [PATCH 6/6] Added sample methods to observable. --- rxjava-core/src/main/java/rx/Observable.java | 59 ++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 36215718ce..1c98cd91a1 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -56,6 +56,7 @@ import rx.operators.OperationOnErrorResumeNextViaFunction; import rx.operators.OperationOnErrorResumeNextViaObservable; import rx.operators.OperationOnErrorReturn; +import rx.operators.OperationSample; import rx.operators.OperationScan; import rx.operators.OperationSkip; import rx.operators.OperationSubscribeOn; @@ -252,6 +253,7 @@ public Subscription subscribe(final Map callbacks) { */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { Object onComplete = callbacks.get("onCompleted"); if (onComplete != null) { @@ -259,6 +261,7 @@ public void onCompleted() { } } + @Override public void onError(Exception e) { handleError(e); Object onError = callbacks.get("onError"); @@ -267,6 +270,7 @@ public void onError(Exception e) { } } + @Override public void onNext(Object args) { onNext.call(args); } @@ -298,15 +302,18 @@ public Subscription subscribe(final Object o) { */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { // do nothing } + @Override public void onError(Exception e) { handleError(e); // no callback defined } + @Override public void onNext(Object args) { onNext.call(args); } @@ -327,15 +334,18 @@ public Subscription subscribe(final Action1 onNext) { */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { // do nothing } + @Override public void onError(Exception e) { handleError(e); // no callback defined } + @Override public void onNext(T args) { if (onNext == null) { throw new RuntimeException("onNext must be implemented"); @@ -365,10 +375,12 @@ public Subscription subscribe(final Object onNext, final Object onError) { */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { // do nothing } + @Override public void onError(Exception e) { handleError(e); if (onError != null) { @@ -376,6 +388,7 @@ public void onError(Exception e) { } } + @Override public void onNext(Object args) { onNextFunction.call(args); } @@ -396,10 +409,12 @@ public Subscription subscribe(final Action1 onNext, final Action1 */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { // do nothing } + @Override public void onError(Exception e) { handleError(e); if (onError != null) { @@ -407,6 +422,7 @@ public void onError(Exception e) { } } + @Override public void onNext(T args) { if (onNext == null) { throw new RuntimeException("onNext must be implemented"); @@ -436,12 +452,14 @@ public Subscription subscribe(final Object onNext, final Object onError, final O */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { if (onComplete != null) { Functions.from(onComplete).call(); } } + @Override public void onError(Exception e) { handleError(e); if (onError != null) { @@ -449,6 +467,7 @@ public void onError(Exception e) { } } + @Override public void onNext(Object args) { onNextFunction.call(args); } @@ -469,10 +488,12 @@ public Subscription subscribe(final Action1 onNext, final Action1 */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { onComplete.call(); } + @Override public void onError(Exception e) { handleError(e); if (onError != null) { @@ -480,6 +501,7 @@ public void onError(Exception e) { } } + @Override public void onNext(T args) { if (onNext == null) { throw new RuntimeException("onNext must be implemented"); @@ -516,10 +538,12 @@ public void forEach(final Action1 onNext) { * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { latch.countDown(); } + @Override public void onError(Exception e) { /* * If we receive an onError event we set the reference on the outer thread @@ -531,6 +555,7 @@ public void onError(Exception e) { latch.countDown(); } + @Override public void onNext(T args) { onNext.call(args); } @@ -582,6 +607,7 @@ public void forEach(final Object o) { forEach(new Action1() { + @Override public void call(Object args) { onNext.call(args); } @@ -2743,6 +2769,7 @@ public Observable filter(final Object callback) { final FuncN _f = Functions.from(callback); return filter(this, new Func1() { + @Override public Boolean call(T t1) { return (Boolean) _f.call(t1); } @@ -2913,6 +2940,7 @@ public Observable map(final Object callback) { final FuncN _f = Functions.from(callback); return map(this, new Func1() { + @Override @SuppressWarnings("unchecked") public R call(T t1) { return (R) _f.call(t1); @@ -2963,6 +2991,7 @@ public Observable mapMany(final Object callback) { final FuncN _f = Functions.from(callback); return mapMany(this, new Func1>() { + @Override @SuppressWarnings("unchecked") public Observable call(T t1) { return (Observable) _f.call(t1); @@ -3071,6 +3100,7 @@ public Observable onErrorResumeNext(final Object resumeFunction) { final FuncN _f = Functions.from(resumeFunction); return onErrorResumeNext(this, new Func1>() { + @Override @SuppressWarnings("unchecked") public Observable call(Exception e) { return (Observable) _f.call(e); @@ -3152,6 +3182,7 @@ public Observable onErrorReturn(final Object resumeFunction) { final FuncN _f = Functions.from(resumeFunction); return onErrorReturn(this, new Func1() { + @Override @SuppressWarnings("unchecked") public T call(Exception e) { return (T) _f.call(e); @@ -3288,6 +3319,34 @@ public Observable scan(Func2 accumulator) { return scan(this, accumulator); } + /** + * Samples the observable sequence at each interval. + * + * @param period + * The period of time that defines the sampling rate. + * @param unit + * The time unit for the sampling rate time period. + * @return An observable sequence whose elements are the results of sampling the current observable sequence. + */ + public Observable sample(long period, TimeUnit unit) { + return create(OperationSample.sample(this, period, unit)); + } + + /** + * Samples the observable sequence at each interval. + * + * @param period + * The period of time that defines the sampling rate. + * @param unit + * The time unit for the sampling rate time period. + * @param scheduler + * The scheduler to use for sampling. + * @return An observable sequence whose elements are the results of sampling the current observable sequence. + */ + public Observable sample(long period, TimeUnit unit, Scheduler scheduler) { + return create(OperationSample.sample(this, period, unit, scheduler)); + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted