Skip to content

1.x: promote UnicastSubject to be a standard+experimental Subject #3936

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import rx.Observable;
import rx.Observer;
import rx.observers.SerializedSubscriber;
import rx.subjects.UnicastSubject;

/**
* Creates non-overlapping windows of items where each window is terminated by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import rx.Observer;
import rx.functions.Func0;
import rx.observers.SerializedSubscriber;
import rx.subjects.UnicastSubject;
import rx.subscriptions.SerialSubscription;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import rx.Observable.Operator;
import rx.functions.Action0;
import rx.internal.util.atomic.SpscLinkedArrayQueue;
import rx.subjects.Subject;
import rx.subjects.*;
import rx.subscriptions.Subscriptions;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import rx.Observer;
import rx.functions.Func1;
import rx.observers.*;
import rx.subjects.UnicastSubject;
import rx.subscriptions.CompositeSubscription;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import rx.Observer;
import rx.functions.Action0;
import rx.observers.*;
import rx.subjects.UnicastSubject;
import rx.subscriptions.Subscriptions;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;
package rx.subjects;

import java.util.Queue;
import java.util.concurrent.atomic.*;

import rx.*;
import rx.annotations.Experimental;
import rx.exceptions.*;
import rx.functions.*;
import rx.functions.Action0;
import rx.internal.operators.*;
import rx.internal.util.atomic.*;
import rx.internal.util.unsafe.*;
import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;

/**
* A Subject variant which buffers events until a single Subscriber arrives and replays them to it
Expand All @@ -35,6 +35,7 @@
*
* @param <T> the input and output value type
*/
@Experimental
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't promote it for public usage without tests!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is tested through window operators and there is BufferUntilSubscriberTest.

public final class UnicastSubject<T> extends Subject<T, T> {

/**
Expand Down Expand Up @@ -111,7 +112,7 @@ public boolean hasObservers() {
*
* @param <T> the value type
*/
static final class State<T> extends AtomicLong implements Producer, Observer<T>, Action0, OnSubscribe<T> {
static final class State<T> extends AtomicLong implements Producer, Observer<T>, OnSubscribe<T>, Subscription {
/** */
private static final long serialVersionUID = -9044104859202255786L;
/** The single subscriber. */
Expand Down Expand Up @@ -250,7 +251,7 @@ public void request(long n) {
@Override
public void call(Subscriber<? super T> subscriber) {
if (this.subscriber.compareAndSet(null, subscriber)) {
subscriber.add(Subscriptions.create(this));
subscriber.add(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 👍

subscriber.setProducer(this);
} else {
subscriber.onError(new IllegalStateException("Only a single subscriber is allowed"));
Expand Down Expand Up @@ -326,7 +327,7 @@ void replay() {
* Should be called only when the child unsubscribes
*/
@Override
public void call() {
public void unsubscribe() {

doTerminate();

Expand All @@ -339,6 +340,12 @@ public void call() {
}
queue.clear();
}

@Override
public boolean isUnsubscribed() {
return done;
}

/**
* Checks if one of the terminal conditions have been met: child unsubscribed,
* an error happened or the source terminated and the queue is empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
import rx.subjects.Subject;
import rx.subjects.*;
import rx.subscriptions.BooleanSubscription;

public class OperatorConcatTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;
package rx.subjects;

import java.util.List;
import java.util.concurrent.*;
Expand All @@ -26,7 +26,6 @@
import rx.functions.*;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

public class BufferUntilSubscriberTest {

Expand Down