diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 36215718ce..1c98cd91a1 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -56,6 +56,7 @@ import rx.operators.OperationOnErrorResumeNextViaFunction; import rx.operators.OperationOnErrorResumeNextViaObservable; import rx.operators.OperationOnErrorReturn; +import rx.operators.OperationSample; import rx.operators.OperationScan; import rx.operators.OperationSkip; import rx.operators.OperationSubscribeOn; @@ -252,6 +253,7 @@ public Subscription subscribe(final Map callbacks) { */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { Object onComplete = callbacks.get("onCompleted"); if (onComplete != null) { @@ -259,6 +261,7 @@ public void onCompleted() { } } + @Override public void onError(Exception e) { handleError(e); Object onError = callbacks.get("onError"); @@ -267,6 +270,7 @@ public void onError(Exception e) { } } + @Override public void onNext(Object args) { onNext.call(args); } @@ -298,15 +302,18 @@ public Subscription subscribe(final Object o) { */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { // do nothing } + @Override public void onError(Exception e) { handleError(e); // no callback defined } + @Override public void onNext(Object args) { onNext.call(args); } @@ -327,15 +334,18 @@ public Subscription subscribe(final Action1 onNext) { */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { // do nothing } + @Override public void onError(Exception e) { handleError(e); // no callback defined } + @Override public void onNext(T args) { if (onNext == null) { throw new RuntimeException("onNext must be implemented"); @@ -365,10 +375,12 @@ public Subscription subscribe(final Object onNext, final Object onError) { */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { // do nothing } + @Override public void onError(Exception e) { handleError(e); if (onError != null) { @@ -376,6 +388,7 @@ public void onError(Exception e) { } } + @Override public void onNext(Object args) { onNextFunction.call(args); } @@ -396,10 +409,12 @@ public Subscription subscribe(final Action1 onNext, final Action1 */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { // do nothing } + @Override public void onError(Exception e) { handleError(e); if (onError != null) { @@ -407,6 +422,7 @@ public void onError(Exception e) { } } + @Override public void onNext(T args) { if (onNext == null) { throw new RuntimeException("onNext must be implemented"); @@ -436,12 +452,14 @@ public Subscription subscribe(final Object onNext, final Object onError, final O */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { if (onComplete != null) { Functions.from(onComplete).call(); } } + @Override public void onError(Exception e) { handleError(e); if (onError != null) { @@ -449,6 +467,7 @@ public void onError(Exception e) { } } + @Override public void onNext(Object args) { onNextFunction.call(args); } @@ -469,10 +488,12 @@ public Subscription subscribe(final Action1 onNext, final Action1 */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { onComplete.call(); } + @Override public void onError(Exception e) { handleError(e); if (onError != null) { @@ -480,6 +501,7 @@ public void onError(Exception e) { } } + @Override public void onNext(T args) { if (onNext == null) { throw new RuntimeException("onNext must be implemented"); @@ -516,10 +538,12 @@ public void forEach(final Action1 onNext) { * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { latch.countDown(); } + @Override public void onError(Exception e) { /* * If we receive an onError event we set the reference on the outer thread @@ -531,6 +555,7 @@ public void onError(Exception e) { latch.countDown(); } + @Override public void onNext(T args) { onNext.call(args); } @@ -582,6 +607,7 @@ public void forEach(final Object o) { forEach(new Action1() { + @Override public void call(Object args) { onNext.call(args); } @@ -2743,6 +2769,7 @@ public Observable filter(final Object callback) { final FuncN _f = Functions.from(callback); return filter(this, new Func1() { + @Override public Boolean call(T t1) { return (Boolean) _f.call(t1); } @@ -2913,6 +2940,7 @@ public Observable map(final Object callback) { final FuncN _f = Functions.from(callback); return map(this, new Func1() { + @Override @SuppressWarnings("unchecked") public R call(T t1) { return (R) _f.call(t1); @@ -2963,6 +2991,7 @@ public Observable mapMany(final Object callback) { final FuncN _f = Functions.from(callback); return mapMany(this, new Func1>() { + @Override @SuppressWarnings("unchecked") public Observable call(T t1) { return (Observable) _f.call(t1); @@ -3071,6 +3100,7 @@ public Observable onErrorResumeNext(final Object resumeFunction) { final FuncN _f = Functions.from(resumeFunction); return onErrorResumeNext(this, new Func1>() { + @Override @SuppressWarnings("unchecked") public Observable call(Exception e) { return (Observable) _f.call(e); @@ -3152,6 +3182,7 @@ public Observable onErrorReturn(final Object resumeFunction) { final FuncN _f = Functions.from(resumeFunction); return onErrorReturn(this, new Func1() { + @Override @SuppressWarnings("unchecked") public T call(Exception e) { return (T) _f.call(e); @@ -3288,6 +3319,34 @@ public Observable scan(Func2 accumulator) { return scan(this, accumulator); } + /** + * Samples the observable sequence at each interval. + * + * @param period + * The period of time that defines the sampling rate. + * @param unit + * The time unit for the sampling rate time period. + * @return An observable sequence whose elements are the results of sampling the current observable sequence. + */ + public Observable sample(long period, TimeUnit unit) { + return create(OperationSample.sample(this, period, unit)); + } + + /** + * Samples the observable sequence at each interval. + * + * @param period + * The period of time that defines the sampling rate. + * @param unit + * The time unit for the sampling rate time period. + * @param scheduler + * The scheduler to use for sampling. + * @return An observable sequence whose elements are the results of sampling the current observable sequence. + */ + public Observable sample(long period, TimeUnit unit, Scheduler scheduler) { + return create(OperationSample.sample(this, period, unit, scheduler)); + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted diff --git a/rxjava-core/src/main/java/rx/operators/OperationSample.java b/rxjava-core/src/main/java/rx/operators/OperationSample.java new file mode 100644 index 0000000000..2773cbfe3c --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationSample.java @@ -0,0 +1,197 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.concurrency.TestScheduler; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + +/** + * Samples the observable sequence at each interval. + */ +public final class OperationSample { + + /** + * Samples the observable sequence at each interval. + */ + public static Func1, Subscription> sample(final Observable source, long period, TimeUnit unit) { + return new Sample(source, period, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); + } + + /** + * Samples the observable sequence at each interval. + */ + public static Func1, Subscription> sample(final Observable source, long period, TimeUnit unit, Scheduler scheduler) { + return new Sample(source, period, unit, scheduler); + } + + private static class Sample implements Func1, Subscription> { + private final Observable source; + private final long period; + private final TimeUnit unit; + private final Scheduler scheduler; + + private final AtomicBoolean hasValue = new AtomicBoolean(); + private final AtomicReference latestValue = new AtomicReference(); + + private Sample(Observable source, long interval, TimeUnit unit, Scheduler scheduler) { + this.source = source; + this.period = interval; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public Subscription call(final Observer observer) { + Observable clock = Observable.create(OperationInterval.interval(period, unit, scheduler)); + final Subscription clockSubscription = clock.subscribe(new Observer() { + @Override + public void onCompleted() { /* the clock never completes */ } + + @Override + public void onError(Exception e) { /* the clock has no errors */ } + + @Override + public void onNext(Long tick) { + if (hasValue.get()) { + observer.onNext(latestValue.get()); + } + } + }); + + final Subscription sourceSubscription = source.subscribe(new Observer() { + @Override + public void onCompleted() { + clockSubscription.unsubscribe(); + observer.onCompleted(); + } + + @Override + public void onError(Exception e) { + clockSubscription.unsubscribe(); + observer.onError(e); + } + + @Override + public void onNext(T value) { + latestValue.set(value); + hasValue.set(true); + } + }); + + return Subscriptions.create(new Action0() { + @Override + public void call() { + clockSubscription.unsubscribe(); + sourceSubscription.unsubscribe(); + } + }); + } + } + + public static class UnitTest { + private TestScheduler scheduler; + private Observer observer; + + @Before + @SuppressWarnings("unchecked") // due to mocking + public void before() { + scheduler = new TestScheduler(); + observer = mock(Observer.class); + } + + @Test + public void testSample() { + Observable source = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(final Observer observer1) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer1.onNext(1L); + } + }, 1, TimeUnit.SECONDS); + scheduler.schedule(new Action0() { + @Override + public void call() { + observer1.onNext(2L); + } + }, 2, TimeUnit.SECONDS); + scheduler.schedule(new Action0() { + @Override + public void call() { + observer1.onCompleted(); + } + }, 3, TimeUnit.SECONDS); + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationSample.sample(source, 400L, TimeUnit.MILLISECONDS, scheduler)); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(800L, TimeUnit.MILLISECONDS); + verify(observer, never()).onNext(any(Long.class)); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(1200L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(1L); + verify(observer, never()).onNext(2L); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(1600L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(1L); + verify(observer, never()).onNext(2L); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(2000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(1L); + inOrder.verify(observer, times(1)).onNext(2L); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(3000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(1L); + inOrder.verify(observer, times(2)).onNext(2L); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + } + } +}