diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 68f7530dec..82433c73c8 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -292,20 +292,20 @@ trait Observable[+T] * Creates an Observable which produces buffers of collected values. * * This Observable produces connected non-overlapping buffers. The current buffer is - * emitted and replaced with a new buffer when the Observable produced by the specified function produces a [[rx.lang.scala.util.Closing]] object. The function will then + * emitted and replaced with a new buffer when the Observable produced by the specified function produces an object. The function will then * be used to create a new Observable to listen for the end of the next buffer. * * @param closings * The function which is used to produce an [[rx.lang.scala.Observable]] for every buffer created. - * When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated buffer + * When this [[rx.lang.scala.Observable]] produces an object, the associated buffer * is emitted and replaced with a new one. * @return * An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers, which are emitted - * when the current [[rx.lang.scala.Observable]] created with the function argument produces a [[rx.lang.scala.util.Closing]] object. + * when the current [[rx.lang.scala.Observable]] created with the function argument produces an object. */ - def buffer(closings: () => Observable[Closing]) : Observable[Seq[T]] = { + def buffer[Closing](closings: () => Observable[_ <: Closing]) : Observable[Seq[T]] = { val f: Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable - val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(f) + val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Closing](f) Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]]) } @@ -313,24 +313,24 @@ trait Observable[+T] * Creates an Observable which produces buffers of collected values. * * This Observable produces buffers. Buffers are created when the specified `openings` - * Observable produces a [[rx.lang.scala.util.Opening]] object. Additionally the function argument - * is used to create an Observable which produces [[rx.lang.scala.util.Closing]] objects. When this + * Observable produces an object. Additionally the function argument + * is used to create an Observable which produces objects. When this * Observable produces such an object, the associated buffer is emitted. * * @param openings - * The [[rx.lang.scala.Observable]] which, when it produces a [[rx.lang.scala.util.Opening]] object, will cause + * The [[rx.lang.scala.Observable]] which, when it produces an object, will cause * another buffer to be created. * @param closings * The function which is used to produce an [[rx.lang.scala.Observable]] for every buffer created. - * When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated buffer + * When this [[rx.lang.scala.Observable]] produces an object, the associated buffer * is emitted. * @return * An [[rx.lang.scala.Observable]] which produces buffers which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects. */ - def buffer(openings: Observable[Opening], closings: Opening => Observable[Closing]): Observable[Seq[T]] = { + def buffer[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]): Observable[Seq[T]] = { val opening: rx.Observable[_ <: Opening] = openings.asJavaObservable - val closing: Func1[Opening, _ <: rx.Observable[_ <: Closing]] = (o: Opening) => closings(o).asJavaObservable - val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(opening, closing) + val closing: Func1[_ >: Opening, _ <: rx.Observable[_ <: Closing]] = (o: Opening) => closings(o).asJavaObservable + val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Opening, Closing](opening, closing) Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]]) } @@ -502,21 +502,21 @@ trait Observable[+T] /** * Creates an Observable which produces windows of collected values. This Observable produces connected * non-overlapping windows. The current window is emitted and replaced with a new window when the - * Observable produced by the specified function produces a [[rx.lang.scala.util.Closing]] object. + * Observable produced by the specified function produces an object. * The function will then be used to create a new Observable to listen for the end of the next * window. * * @param closings * The function which is used to produce an [[rx.lang.scala.Observable]] for every window created. - * When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated window + * When this [[rx.lang.scala.Observable]] produces an object, the associated window * is emitted and replaced with a new one. * @return * An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows, which are emitted - * when the current [[rx.lang.scala.Observable]] created with the function argument produces a [[rx.lang.scala.util.Closing]] object. + * when the current [[rx.lang.scala.Observable]] created with the function argument produces an object. */ - def window(closings: () => Observable[Closing]): Observable[Observable[T]] = { + def window[Closing](closings: () => Observable[Closing]): Observable[Observable[T]] = { val func : Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable - val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window(func) + val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Closing](func) val o2 = Observable[rx.Observable[_]](o1).map((x: rx.Observable[_]) => { val x2 = x.asInstanceOf[rx.Observable[_ <: T]] Observable[T](x2) @@ -526,23 +526,23 @@ trait Observable[+T] /** * Creates an Observable which produces windows of collected values. This Observable produces windows. - * Chunks are created when the specified `openings` Observable produces a [[rx.lang.scala.util.Opening]] object. - * Additionally the `closings` argument is used to create an Observable which produces [[rx.lang.scala.util.Closing]] objects. + * Chunks are created when the specified `openings` Observable produces an object. + * Additionally the `closings` argument is used to create an Observable which produces objects. * When this Observable produces such an object, the associated window is emitted. * * @param openings - * The [[rx.lang.scala.Observable]] which when it produces a [[rx.lang.scala.util.Opening]] object, will cause + * The [[rx.lang.scala.Observable]] which when it produces an object, will cause * another window to be created. * @param closings * The function which is used to produce an [[rx.lang.scala.Observable]] for every window created. - * When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated window + * When this [[rx.lang.scala.Observable]] produces an object, the associated window * is emitted. * @return * An [[rx.lang.scala.Observable]] which produces windows which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects. */ - def window(openings: Observable[Opening], closings: Opening => Observable[Closing]) = { + def window[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]) = { Observable.jObsOfJObsToScObsOfScObs( - asJavaObservable.window(openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable)) + asJavaObservable.window[Opening, Closing](openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable)) : Observable[Observable[T]] // SI-7818 } diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 0cef17b1d3..370b1c1ef5 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -108,9 +108,7 @@ import rx.subjects.ReplaySubject; import rx.subjects.Subject; import rx.subscriptions.Subscriptions; -import rx.util.Closing; import rx.util.OnErrorNotImplementedException; -import rx.util.Opening; import rx.util.Range; import rx.util.TimeInterval; import rx.util.Timestamped; @@ -2795,24 +2793,24 @@ public static Observable combineLates *

* This Observable produces connected, non-overlapping buffers. The current * buffer is emitted and replaced with a new buffer when the Observable - * produced by the specified bufferClosingSelector produces a - * {@link rx.util.Closing} object. The bufferClosingSelector + * produced by the specified bufferClosingSelector produces an + * object. The bufferClosingSelector * will then be used to create a new Observable to listen for the end of * the next buffer. * * @param bufferClosingSelector the {@link Func0} which is used to produce * an {@link Observable} for every buffer * created. When this {@link Observable} - * produces a {@link rx.util.Closing} object, + * produces an object, * the associated buffer is emitted and * replaced with a new one. * @return an {@link Observable} which produces connected, non-overlapping * buffers, which are emitted when the current {@link Observable} - * created with the {@link Func0} argument produces a - * {@link rx.util.Closing} object + * created with the {@link Func0} argument produces an + * object * @see RxJava Wiki: buffer() */ - public Observable> buffer(Func0> bufferClosingSelector) { + public Observable> buffer(Func0> bufferClosingSelector) { return create(OperationBuffer.buffer(this, bufferClosingSelector)); } @@ -2822,26 +2820,26 @@ public Observable> buffer(Func0> * *

* This Observable produces buffers. Buffers are created when the specified - * bufferOpenings Observable produces a {@link rx.util.Opening} + * bufferOpenings Observable produces an * object. Additionally the bufferClosingSelector argument is - * used to create an Observable which produces {@link rx.util.Closing} + * used to create an Observable which produces * objects. When this Observable produces such an object, the associated * buffer is emitted. * - * @param bufferOpenings the {@link Observable} that, when it produces a - * {@link rx.util.Opening} object, will cause another + * @param bufferOpenings the {@link Observable} that, when it produces an + * object, will cause another * buffer to be created * @param bufferClosingSelector the {@link Func1} that is used to produce * an {@link Observable} for every buffer * created. When this {@link Observable} - * produces a {@link rx.util.Closing} object, + * produces an object, * the associated buffer is emitted. * @return an {@link Observable} that produces buffers that are created and * emitted when the specified {@link Observable}s publish certain * objects * @see RxJava Wiki: buffer() */ - public Observable> buffer(Observable bufferOpenings, Func1> bufferClosingSelector) { + public Observable> buffer(Observable bufferOpenings, Func1> bufferClosingSelector) { return create(OperationBuffer.buffer(this, bufferOpenings, bufferClosingSelector)); } @@ -3038,8 +3036,8 @@ public Observable> buffer(long timespan, long timeshift, TimeUnit unit, * Creates an Observable that produces windows of collected items. This * Observable produces connected, non-overlapping windows. The current * window is emitted and replaced with a new window when the Observable - * produced by the specified closingSelector produces a - * {@link rx.util.Closing} object. The closingSelector will + * produced by the specified closingSelector produces an + * object. The closingSelector will * then be used to create a new Observable to listen for the end of the next * window. *

@@ -3047,42 +3045,42 @@ public Observable> buffer(long timespan, long timeshift, TimeUnit unit, * * @param closingSelector the {@link Func0} used to produce an * {@link Observable} for every window created. When this - * {@link Observable} emits a {@link rx.util.Closing} object, the + * {@link Observable} emits an object, the * associated window is emitted and replaced with a new one. * @return an {@link Observable} that produces connected, non-overlapping * windows, which are emitted when the current {@link Observable} - * created with the closingSelector argument emits a - * {@link rx.util.Closing} object. + * created with the closingSelector argument emits an + * object. * @see RxJava Wiki: window() */ - public Observable> window(Func0> closingSelector) { + public Observable> window(Func0> closingSelector) { return create(OperationWindow.window(this, closingSelector)); } /** * Creates an Observable that produces windows of collected items. This * Observable produces windows. Chunks are created when the - * windowOpenings Observable produces a {@link rx.util.Opening} + * windowOpenings Observable produces an * object. Additionally the closingSelector argument creates an - * Observable that produces {@link rx.util.Closing} objects. When this + * Observable that produces objects. When this * Observable produces such an object, the associated window is emitted. *

* * - * @param windowOpenings the {@link Observable} that, when it produces a - * {@link rx.util.Opening} object, causes another + * @param windowOpenings the {@link Observable} that, when it produces an + * object, causes another * window to be created * @param closingSelector the {@link Func1} that produces an * {@link Observable} for every window created. When - * this {@link Observable} produces a - * {@link rx.util.Closing} object, the associated + * this {@link Observable} produces an + * object, the associated * window is emitted. * @return an {@link Observable} that produces windows that are created and * emitted when the specified {@link Observable}s publish certain * objects * @see RxJava Wiki: window() */ - public Observable> window(Observable windowOpenings, Func1> closingSelector) { + public Observable> window(Observable windowOpenings, Func1> closingSelector) { return create(OperationWindow.window(this, windowOpenings, closingSelector)); } diff --git a/rxjava-core/src/main/java/rx/joins/JoinObserver1.java b/rxjava-core/src/main/java/rx/joins/JoinObserver1.java index 873d3d1a7f..70bdaad3e8 100644 --- a/rxjava-core/src/main/java/rx/joins/JoinObserver1.java +++ b/rxjava-core/src/main/java/rx/joins/JoinObserver1.java @@ -15,6 +15,7 @@ */ package rx.joins; +import rx.operators.ObserverBase; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; diff --git a/rxjava-core/src/main/java/rx/joins/ObserverBase.java b/rxjava-core/src/main/java/rx/joins/ObserverBase.java deleted file mode 100644 index f1144a8ad2..0000000000 --- a/rxjava-core/src/main/java/rx/joins/ObserverBase.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.joins; - -import java.util.concurrent.atomic.AtomicBoolean; -import rx.Observer; - -/** - * Implements an observer that ensures proper event delivery - * semantics to its abstract onXxxxCore methods. - */ -public abstract class ObserverBase implements Observer { - private final AtomicBoolean completed = new AtomicBoolean(); - - @Override - public void onNext(T args) { - if (!completed.get()) { - onNextCore(args); - } - } - - @Override - public void onError(Throwable e) { - if (completed.compareAndSet(false, true)) { - onErrorCore(e); - } - } - - @Override - public void onCompleted() { - if (completed.compareAndSet(false, true)) { - onCompletedCore(); - } - } - /** - * Implement this method to react to the receival of a new element in the sequence. - */ - protected abstract void onNextCore(T args); - /** - * Implement this method to react to the occurrence of an exception. - */ - protected abstract void onErrorCore(Throwable e); - /** - * Implement this method to react to the end of the sequence. - */ - protected abstract void onCompletedCore(); - /** - * Try to trigger the error state. - * @param t - * @return false if already completed - */ - protected boolean fail(Throwable t) { - if (completed.compareAndSet(false, true)) { - onErrorCore(t); - return true; - } - return false; - } -} diff --git a/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java b/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java index cabc1309e6..1d3b762891 100644 --- a/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java +++ b/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java @@ -28,8 +28,6 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; -import rx.util.Closing; -import rx.util.Opening; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func0; @@ -449,13 +447,13 @@ public void stop() { * The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record. * The type of object being tracked by the {@link Chunk} */ - protected static class ObservableBasedSingleChunkCreator implements ChunkCreator { + protected static class ObservableBasedSingleChunkCreator implements ChunkCreator { private final SafeObservableSubscription subscription = new SafeObservableSubscription(); - private final Func0> chunkClosingSelector; + private final Func0> chunkClosingSelector; private final NonOverlappingChunks chunks; - public ObservableBasedSingleChunkCreator(NonOverlappingChunks chunks, Func0> chunkClosingSelector) { + public ObservableBasedSingleChunkCreator(NonOverlappingChunks chunks, Func0> chunkClosingSelector) { this.chunks = chunks; this.chunkClosingSelector = chunkClosingSelector; @@ -464,10 +462,10 @@ public ObservableBasedSingleChunkCreator(NonOverlappingChunks chunks, Func } private void listenForChunkEnd() { - Observable closingObservable = chunkClosingSelector.call(); - closingObservable.subscribe(new Action1() { + Observable closingObservable = chunkClosingSelector.call(); + closingObservable.subscribe(new Action1() { @Override - public void call(Closing closing) { + public void call(TClosing closing) { chunks.emitAndReplaceChunk(); listenForChunkEnd(); } @@ -495,20 +493,20 @@ public void stop() { * The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record. * The type of object being tracked by the {@link Chunk} */ - protected static class ObservableBasedMultiChunkCreator implements ChunkCreator { + protected static class ObservableBasedMultiChunkCreator implements ChunkCreator { private final SafeObservableSubscription subscription = new SafeObservableSubscription(); - public ObservableBasedMultiChunkCreator(final OverlappingChunks chunks, Observable openings, final Func1> chunkClosingSelector) { - subscription.wrap(openings.subscribe(new Action1() { + public ObservableBasedMultiChunkCreator(final OverlappingChunks chunks, Observable openings, final Func1> chunkClosingSelector) { + subscription.wrap(openings.subscribe(new Action1() { @Override - public void call(Opening opening) { + public void call(TOpening opening) { final Chunk chunk = chunks.createChunk(); - Observable closingObservable = chunkClosingSelector.call(opening); + Observable closingObservable = chunkClosingSelector.call(opening); - closingObservable.subscribe(new Action1() { + closingObservable.subscribe(new Action1() { @Override - public void call(Closing closing) { + public void call(TClosing closing) { chunks.emitChunk(chunk); } }); diff --git a/rxjava-core/src/main/java/rx/operators/ObserverBase.java b/rxjava-core/src/main/java/rx/operators/ObserverBase.java new file mode 100644 index 0000000000..7e4dc87952 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/ObserverBase.java @@ -0,0 +1,203 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.atomic.AtomicBoolean; +import rx.Observer; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +/** + * Implements an observer that ensures proper event delivery + * semantics to its abstract onXxxxCore methods. + */ +public abstract class ObserverBase implements Observer { + private final AtomicBoolean completed = new AtomicBoolean(); + + @Override + public void onNext(T args) { + if (!completed.get()) { + onNextCore(args); + } + } + + @Override + public void onError(Throwable e) { + if (completed.compareAndSet(false, true)) { + onErrorCore(e); + } + } + + @Override + public void onCompleted() { + if (completed.compareAndSet(false, true)) { + onCompletedCore(); + } + } + /** + * Implement this method to react to the receival of a new element in the sequence. + */ + protected abstract void onNextCore(T args); + /** + * Implement this method to react to the occurrence of an exception. + */ + protected abstract void onErrorCore(Throwable e); + /** + * Implement this method to react to the end of the sequence. + */ + protected abstract void onCompletedCore(); + /** + * Try to trigger the error state. + * @param t + * @return false if already completed + */ + protected boolean fail(Throwable t) { + if (completed.compareAndSet(false, true)) { + onErrorCore(t); + return true; + } + return false; + } + /** + * Wrap an existing observer into an ObserverBase instance. + * + * @param observer the observer to wrap + * @return the wrapped observer base which forwards its onNext + * onError and onCompleted events to the wrapped observer's + * methods of the same name + */ + public static ObserverBase wrap(Observer observer) { + return new WrappingObserverBase(observer); + } + /** + * Create an ObserverBase instance that forwards its onNext to the + * given action and ignores the onError and onCompleted events. + * @param onNextAction the onNext action + * @return an ObserverBase instance that forwards its events to the given + * action. + */ + public static ObserverBase create(Action1 onNextAction) { + return create(onNextAction, ObserverBase.empty1(), empty0()); + } + /** + * Create an ObserverBase instance that forwards its onNext to the + * given action and ignores the onError and onCompleted events. + * @param onNextAction the onNext action + * @param onErrorAction the onError action + * @return an ObserverBase instance that forwards its events to the given + * actions. + */ + public static ObserverBase create(Action1 onNextAction, Action1 onErrorAction) { + return create(onNextAction, onErrorAction, empty0()); + } + /** + * Create an ObserverBase instance that forwards its onNext, onError + * and onCompleted events to the given actions. + * @param onNextAction the onNext action + * @param onErrorAction the onError action + * @param onCompletedAction the onCompleted action + * @return an ObserverBase instance that forwards its events to the given + * actions. + */ + public static ObserverBase create(Action1 onNextAction, Action1 onErrorAction, Action0 onCompletedAction) { + return new FunctionalObserverBase(onNextAction, onErrorAction, onCompletedAction); + } + /** + * Creates a no-op Action1 instance. + */ + private static Action1 empty1() { + return new Action1() { + @Override + public void call(T t1) { + } + }; + } + /** + * Creates a no-op Action0 instance. + */ + private static Action0 empty0() { + return new Action0() { + @Override + public void call() { + } + }; + } + /** Require the object to be non-null. */ + private static T requireNonNull(T obj, String message) { + if (obj == null) { + throw new NullPointerException(message); + } + return obj; + } + /** + * Default implementation which calls actions for the onNext, onError + * and onCompleted events. + */ + private static class FunctionalObserverBase extends ObserverBase { + private final Action1 onNextAction; + private final Action1 onErrorAction; + private final Action0 onCompletedAction; + public FunctionalObserverBase( + Action1 onNextAction, + Action1 onErrorAction, + Action0 onCompletedAction + ) { + this.onNextAction = requireNonNull(onNextAction, "onNextAction"); + this.onErrorAction = requireNonNull(onErrorAction, "onErrorAction"); + this.onCompletedAction = requireNonNull(onCompletedAction, "onCompletedAction"); + } + + @Override + protected void onNextCore(T args) { + onNextAction.call(args); + } + + @Override + protected void onErrorCore(Throwable e) { + onErrorAction.call(e); + } + + @Override + protected void onCompletedCore() { + onCompletedAction.call(); + } + + } + /** + * Default implementation which wraps another observer. + */ + private static class WrappingObserverBase extends ObserverBase { + private final Observer observer; + public WrappingObserverBase(Observer observer) { + this.observer = requireNonNull(observer, "observer"); + } + + @Override + protected void onNextCore(T args) { + observer.onNext(args); + } + + @Override + protected void onErrorCore(Throwable e) { + observer.onError(e); + } + + @Override + protected void onCompletedCore() { + observer.onCompleted(); + } + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java index c9aba14411..d9532267d8 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java @@ -24,8 +24,6 @@ import rx.Scheduler; import rx.Subscription; import rx.concurrency.Schedulers; -import rx.util.Closing; -import rx.util.Opening; import rx.util.functions.Func0; import rx.util.functions.Func1; @@ -42,7 +40,7 @@ public Buffer call() { /** *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes - * values from the specified {@link Observable} source and stores them in a buffer until the {@link Observable} constructed using the {@link Func0} argument, produces a {@link rx.util.Closing} + * values from the specified {@link Observable} source and stores them in a buffer until the {@link Observable} constructed using the {@link Func0} argument, produces a * value. The buffer is then * emitted, and a new buffer is created to replace it. A new {@link Observable} will be constructed using the * provided {@link Func0} object, which will determine when this new buffer is emitted. When the source {@link Observable} completes or produces an error, the current buffer is emitted, and the @@ -56,17 +54,17 @@ public Buffer call() { * The {@link Observable} which produces values. * @param bufferClosingSelector * A {@link Func0} object which produces {@link Observable}s. These {@link Observable}s determine when a buffer is emitted and replaced by simply - * producing an {@link rx.util.Closing} object. + * producing an object. * @return * the {@link Func1} object representing the specified buffer operation. */ - public static OnSubscribeFunc> buffer(final Observable source, final Func0> bufferClosingSelector) { + public static OnSubscribeFunc> buffer(final Observable source, final Func0> bufferClosingSelector) { return new OnSubscribeFunc>() { @Override public Subscription onSubscribe(Observer> observer) { NonOverlappingChunks> buffers = new NonOverlappingChunks>(observer, OperationBuffer. bufferMaker()); - ChunkCreator creator = new ObservableBasedSingleChunkCreator>(buffers, bufferClosingSelector); + ChunkCreator creator = new ObservableBasedSingleChunkCreator, TClosing>(buffers, bufferClosingSelector); return source.subscribe(new ChunkObserver>(buffers, observer, creator)); } }; @@ -77,9 +75,9 @@ public Subscription onSubscribe(Observer> observer) { * values from the specified {@link Observable} source and stores them in the currently active chunks. Initially * there are no chunks active.

* - *

Chunks can be created by pushing a {@link rx.util.Opening} value to the "bufferOpenings" {@link Observable}. + *

Chunks can be created by pushing a value to the "bufferOpenings" {@link Observable}. * This creates a new buffer which will then start recording values which are produced by the "source" {@link Observable}. Additionally the "bufferClosingSelector" will be used to construct an - * {@link Observable} which can produce {@link rx.util.Closing} values. When it does so it will close this (and only this) newly created + * {@link Observable} which can produce values. When it does so it will close this (and only this) newly created * buffer. When the source {@link Observable} completes or produces an error, all chunks are emitted, and the * event is propagated to all subscribed {@link Observer}s.

* @@ -89,20 +87,20 @@ public Subscription onSubscribe(Observer> observer) { * @param source * The {@link Observable} which produces values. * @param bufferOpenings - * An {@link Observable} which when it produces a {@link rx.util.Opening} value will + * An {@link Observable} which when it produces a value will * create a new buffer which instantly starts recording the "source" {@link Observable}. * @param bufferClosingSelector * A {@link Func0} object which produces {@link Observable}s. These {@link Observable}s determine when a buffer is emitted and replaced by simply - * producing an {@link rx.util.Closing} object. + * producing an object. * @return * the {@link Func1} object representing the specified buffer operation. */ - public static OnSubscribeFunc> buffer(final Observable source, final Observable bufferOpenings, final Func1> bufferClosingSelector) { + public static OnSubscribeFunc> buffer(final Observable source, final Observable bufferOpenings, final Func1> bufferClosingSelector) { return new OnSubscribeFunc>() { @Override public Subscription onSubscribe(final Observer> observer) { OverlappingChunks> buffers = new OverlappingChunks>(observer, OperationBuffer. bufferMaker()); - ChunkCreator creator = new ObservableBasedMultiChunkCreator>(buffers, bufferOpenings, bufferClosingSelector); + ChunkCreator creator = new ObservableBasedMultiChunkCreator, TOpening, TClosing>(buffers, bufferOpenings, bufferClosingSelector); return source.subscribe(new ChunkObserver>(buffers, observer, creator)); } }; diff --git a/rxjava-core/src/main/java/rx/operators/OperationWindow.java b/rxjava-core/src/main/java/rx/operators/OperationWindow.java index d7ff20e061..d6e961ae12 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationWindow.java +++ b/rxjava-core/src/main/java/rx/operators/OperationWindow.java @@ -23,8 +23,6 @@ import rx.Scheduler; import rx.Subscription; import rx.concurrency.Schedulers; -import rx.util.Closing; -import rx.util.Opening; import rx.util.functions.Func0; import rx.util.functions.Func1; @@ -42,7 +40,7 @@ public Window call() { /** *

This method creates a {@link rx.util.functions.Func1} object which represents the window operation. This operation takes * values from the specified {@link rx.Observable} source and stores them in a window until the {@link rx.Observable} constructed using the {@link rx.util.functions.Func0} argument, produces a - * {@link rx.util.Closing} value. The window is then + * value. The window is then * emitted, and a new window is created to replace it. A new {@link rx.Observable} will be constructed using the * provided {@link rx.util.functions.Func0} object, which will determine when this new window is emitted. When the source {@link rx.Observable} completes or produces an error, the current window * is emitted, and the event is propagated @@ -55,16 +53,16 @@ public Window call() { * The {@link rx.Observable} which produces values. * @param windowClosingSelector * A {@link rx.util.functions.Func0} object which produces {@link rx.Observable}s. These {@link rx.Observable}s determine when a window is emitted and replaced by simply - * producing an {@link rx.util.Closing} object. + * producing an object. * @return * the {@link rx.util.functions.Func1} object representing the specified window operation. */ - public static OnSubscribeFunc> window(final Observable source, final Func0> windowClosingSelector) { + public static OnSubscribeFunc> window(final Observable source, final Func0> windowClosingSelector) { return new OnSubscribeFunc>() { @Override public Subscription onSubscribe(final Observer> observer) { NonOverlappingChunks> windows = new NonOverlappingChunks>(observer, OperationWindow. windowMaker()); - ChunkCreator creator = new ObservableBasedSingleChunkCreator>(windows, windowClosingSelector); + ChunkCreator creator = new ObservableBasedSingleChunkCreator, TClosing>(windows, windowClosingSelector); return source.subscribe(new ChunkObserver>(windows, observer, creator)); } @@ -76,9 +74,9 @@ public Subscription onSubscribe(final Observer> observer) * values from the specified {@link rx.Observable} source and stores them in the currently active window. Initially * there are no windows active.

* - *

Windows can be created by pushing a {@link rx.util.Opening} value to the "windowOpenings" {@link rx.Observable}. + *

Windows can be created by pushing a value to the "windowOpenings" {@link rx.Observable}. * This creates a new window which will then start recording values which are produced by the "source" {@link rx.Observable}. Additionally the "windowClosingSelector" will be used to construct an - * {@link rx.Observable} which can produce {@link rx.util.Closing} values. When it does so it will close this (and only this) newly created + * {@link rx.Observable} which can produce values. When it does so it will close this (and only this) newly created * window. When the source {@link rx.Observable} completes or produces an error, all windows are emitted, and the * event is propagated to all subscribed {@link rx.Observer}s.

* @@ -88,20 +86,20 @@ public Subscription onSubscribe(final Observer> observer) * @param source * The {@link rx.Observable} which produces values. * @param windowOpenings - * An {@link rx.Observable} which when it produces a {@link rx.util.Opening} value will + * An {@link rx.Observable} which when it produces a value will * create a new window which instantly starts recording the "source" {@link rx.Observable}. * @param windowClosingSelector * A {@link rx.util.functions.Func0} object which produces {@link rx.Observable}s. These {@link rx.Observable}s determine when a window is emitted and replaced by simply - * producing an {@link rx.util.Closing} object. + * producing an object. * @return * the {@link rx.util.functions.Func1} object representing the specified window operation. */ - public static OnSubscribeFunc> window(final Observable source, final Observable windowOpenings, final Func1> windowClosingSelector) { + public static OnSubscribeFunc> window(final Observable source, final Observable windowOpenings, final Func1> windowClosingSelector) { return new OnSubscribeFunc>() { @Override public Subscription onSubscribe(final Observer> observer) { OverlappingChunks> windows = new OverlappingChunks>(observer, OperationWindow. windowMaker()); - ChunkCreator creator = new ObservableBasedMultiChunkCreator>(windows, windowOpenings, windowClosingSelector); + ChunkCreator creator = new ObservableBasedMultiChunkCreator, TOpening, TClosing>(windows, windowOpenings, windowClosingSelector); return source.subscribe(new ChunkObserver>(windows, observer, creator)); } }; diff --git a/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java b/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java index 9318af8fd6..c759867679 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java @@ -33,10 +33,6 @@ import rx.Subscription; import rx.concurrency.TestScheduler; import rx.subscriptions.Subscriptions; -import rx.util.Closing; -import rx.util.Closings; -import rx.util.Opening; -import rx.util.Openings; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func0; @@ -226,23 +222,23 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable openings = Observable.create(new Observable.OnSubscribeFunc() { + Observable openings = Observable.create(new Observable.OnSubscribeFunc() { @Override - public Subscription onSubscribe(Observer observer) { - push(observer, Openings.create(), 50); - push(observer, Openings.create(), 200); + public Subscription onSubscribe(Observer observer) { + push(observer, new Object(), 50); + push(observer, new Object(), 200); complete(observer, 250); return Subscriptions.empty(); } }); - Func1> closer = new Func1>() { + Func1> closer = new Func1>() { @Override - public Observable call(Opening opening) { - return Observable.create(new Observable.OnSubscribeFunc() { + public Observable call(Object opening) { + return Observable.create(new Observable.OnSubscribeFunc() { @Override - public Subscription onSubscribe(Observer observer) { - push(observer, Closings.create(), 100); + public Subscription onSubscribe(Observer observer) { + push(observer, new Object(), 100); complete(observer, 101); return Subscriptions.empty(); } @@ -277,13 +273,13 @@ public Subscription onSubscribe(Observer observer) { } }); - Func0> closer = new Func0>() { + Func0> closer = new Func0>() { @Override - public Observable call() { - return Observable.create(new Observable.OnSubscribeFunc() { + public Observable call() { + return Observable.create(new Observable.OnSubscribeFunc() { @Override - public Subscription onSubscribe(Observer observer) { - push(observer, Closings.create(), 100); + public Subscription onSubscribe(Observer observer) { + push(observer, new Object(), 100); complete(observer, 101); return Subscriptions.empty(); } diff --git a/rxjava-core/src/test/java/rx/operators/OperationToMultimapTest.java b/rxjava-core/src/test/java/rx/operators/OperationToMultimapTest.java index f715e69c61..ff687b912e 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationToMultimapTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationToMultimapTest.java @@ -1,7 +1,17 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package rx.operators; diff --git a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java b/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java index b26cf42cae..c435c235f8 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java @@ -30,10 +30,6 @@ import rx.Subscription; import rx.concurrency.TestScheduler; import rx.subscriptions.Subscriptions; -import rx.util.Closing; -import rx.util.Closings; -import rx.util.Opening; -import rx.util.Openings; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func0; @@ -202,23 +198,23 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable openings = Observable.create(new Observable.OnSubscribeFunc() { + Observable openings = Observable.create(new Observable.OnSubscribeFunc() { @Override - public Subscription onSubscribe(Observer observer) { - push(observer, Openings.create(), 50); - push(observer, Openings.create(), 200); + public Subscription onSubscribe(Observer observer) { + push(observer, new Object(), 50); + push(observer, new Object(), 200); complete(observer, 250); return Subscriptions.empty(); } }); - Func1> closer = new Func1>() { + Func1> closer = new Func1>() { @Override - public Observable call(Opening opening) { - return Observable.create(new Observable.OnSubscribeFunc() { + public Observable call(Object opening) { + return Observable.create(new Observable.OnSubscribeFunc() { @Override - public Subscription onSubscribe(Observer observer) { - push(observer, Closings.create(), 100); + public Subscription onSubscribe(Observer observer) { + push(observer, new Object(), 100); complete(observer, 101); return Subscriptions.empty(); } @@ -253,13 +249,13 @@ public Subscription onSubscribe(Observer observer) { } }); - Func0> closer = new Func0>() { + Func0> closer = new Func0>() { @Override - public Observable call() { - return Observable.create(new Observable.OnSubscribeFunc() { + public Observable call() { + return Observable.create(new Observable.OnSubscribeFunc() { @Override - public Subscription onSubscribe(Observer observer) { - push(observer, Closings.create(), 100); + public Subscription onSubscribe(Observer observer) { + push(observer, new Object(), 100); complete(observer, 101); return Subscriptions.empty(); }