diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithObservable.java b/src/main/java/rx/internal/operators/OperatorWindowWithObservable.java index 4b2cbb7dd8..6d7324bf9f 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithObservable.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithObservable.java @@ -22,6 +22,7 @@ import rx.Observable; import rx.Observer; import rx.observers.SerializedSubscriber; +import rx.subjects.UnicastSubject; /** * Creates non-overlapping windows of items where each window is terminated by diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithObservableFactory.java b/src/main/java/rx/internal/operators/OperatorWindowWithObservableFactory.java index 5ac748e6f1..9dbfed2bb7 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithObservableFactory.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithObservableFactory.java @@ -23,6 +23,7 @@ import rx.Observer; import rx.functions.Func0; import rx.observers.SerializedSubscriber; +import rx.subjects.UnicastSubject; import rx.subscriptions.SerialSubscription; /** diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithSize.java b/src/main/java/rx/internal/operators/OperatorWindowWithSize.java index 3538991526..4873a3109b 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithSize.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithSize.java @@ -23,7 +23,7 @@ import rx.Observable.Operator; import rx.functions.Action0; import rx.internal.util.atomic.SpscLinkedArrayQueue; -import rx.subjects.Subject; +import rx.subjects.*; import rx.subscriptions.Subscriptions; /** diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithStartEndObservable.java b/src/main/java/rx/internal/operators/OperatorWindowWithStartEndObservable.java index 07dea16e76..28a6fffc21 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithStartEndObservable.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithStartEndObservable.java @@ -23,6 +23,7 @@ import rx.Observer; import rx.functions.Func1; import rx.observers.*; +import rx.subjects.UnicastSubject; import rx.subscriptions.CompositeSubscription; /** diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithTime.java b/src/main/java/rx/internal/operators/OperatorWindowWithTime.java index d55f0db31c..8ea74a3e1a 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithTime.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithTime.java @@ -25,6 +25,7 @@ import rx.Observer; import rx.functions.Action0; import rx.observers.*; +import rx.subjects.UnicastSubject; import rx.subscriptions.Subscriptions; /** diff --git a/src/main/java/rx/internal/operators/UnicastSubject.java b/src/main/java/rx/subjects/UnicastSubject.java similarity index 97% rename from src/main/java/rx/internal/operators/UnicastSubject.java rename to src/main/java/rx/subjects/UnicastSubject.java index 569745358e..f401c364f1 100644 --- a/src/main/java/rx/internal/operators/UnicastSubject.java +++ b/src/main/java/rx/subjects/UnicastSubject.java @@ -13,18 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.internal.operators; +package rx.subjects; import java.util.Queue; import java.util.concurrent.atomic.*; import rx.*; +import rx.annotations.Experimental; import rx.exceptions.*; -import rx.functions.*; +import rx.functions.Action0; +import rx.internal.operators.*; import rx.internal.util.atomic.*; import rx.internal.util.unsafe.*; -import rx.subjects.Subject; -import rx.subscriptions.Subscriptions; /** * A Subject variant which buffers events until a single Subscriber arrives and replays them to it @@ -35,6 +35,7 @@ * * @param the input and output value type */ +@Experimental public final class UnicastSubject extends Subject { /** @@ -111,7 +112,7 @@ public boolean hasObservers() { * * @param the value type */ - static final class State extends AtomicLong implements Producer, Observer, Action0, OnSubscribe { + static final class State extends AtomicLong implements Producer, Observer, OnSubscribe, Subscription { /** */ private static final long serialVersionUID = -9044104859202255786L; /** The single subscriber. */ @@ -250,7 +251,7 @@ public void request(long n) { @Override public void call(Subscriber subscriber) { if (this.subscriber.compareAndSet(null, subscriber)) { - subscriber.add(Subscriptions.create(this)); + subscriber.add(this); subscriber.setProducer(this); } else { subscriber.onError(new IllegalStateException("Only a single subscriber is allowed")); @@ -326,7 +327,7 @@ void replay() { * Should be called only when the child unsubscribes */ @Override - public void call() { + public void unsubscribe() { doTerminate(); @@ -339,6 +340,12 @@ public void call() { } queue.clear(); } + + @Override + public boolean isUnsubscribed() { + return done; + } + /** * Checks if one of the terminal conditions have been met: child unsubscribed, * an error happened or the source terminated and the queue is empty diff --git a/src/test/java/rx/internal/operators/OperatorConcatTest.java b/src/test/java/rx/internal/operators/OperatorConcatTest.java index a824374659..71d13f8d3c 100644 --- a/src/test/java/rx/internal/operators/OperatorConcatTest.java +++ b/src/test/java/rx/internal/operators/OperatorConcatTest.java @@ -42,7 +42,7 @@ import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; import rx.schedulers.TestScheduler; -import rx.subjects.Subject; +import rx.subjects.*; import rx.subscriptions.BooleanSubscription; public class OperatorConcatTest { diff --git a/src/test/java/rx/internal/operators/BufferUntilSubscriberTest.java b/src/test/java/rx/subjects/BufferUntilSubscriberTest.java similarity index 98% rename from src/test/java/rx/internal/operators/BufferUntilSubscriberTest.java rename to src/test/java/rx/subjects/BufferUntilSubscriberTest.java index 801138d4ab..556254ae9f 100644 --- a/src/test/java/rx/internal/operators/BufferUntilSubscriberTest.java +++ b/src/test/java/rx/subjects/BufferUntilSubscriberTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.internal.operators; +package rx.subjects; import java.util.List; import java.util.concurrent.*; @@ -26,7 +26,6 @@ import rx.functions.*; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; -import rx.subjects.PublishSubject; public class BufferUntilSubscriberTest {