Skip to content

Commit 271c83b

Browse files
committed
1.x: promote UnicastSubject to be a standard+experimental Subject (#3936)
1 parent bc83db0 commit 271c83b

8 files changed

+21
-11
lines changed

src/main/java/rx/internal/operators/OperatorWindowWithObservable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import rx.Observable;
2323
import rx.Observer;
2424
import rx.observers.SerializedSubscriber;
25+
import rx.subjects.UnicastSubject;
2526

2627
/**
2728
* Creates non-overlapping windows of items where each window is terminated by

src/main/java/rx/internal/operators/OperatorWindowWithObservableFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import rx.Observer;
2424
import rx.functions.Func0;
2525
import rx.observers.SerializedSubscriber;
26+
import rx.subjects.UnicastSubject;
2627
import rx.subscriptions.SerialSubscription;
2728

2829
/**

src/main/java/rx/internal/operators/OperatorWindowWithSize.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import rx.Observable.Operator;
2424
import rx.functions.Action0;
2525
import rx.internal.util.atomic.SpscLinkedArrayQueue;
26-
import rx.subjects.Subject;
26+
import rx.subjects.*;
2727
import rx.subscriptions.Subscriptions;
2828

2929
/**

src/main/java/rx/internal/operators/OperatorWindowWithStartEndObservable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import rx.Observer;
2424
import rx.functions.Func1;
2525
import rx.observers.*;
26+
import rx.subjects.UnicastSubject;
2627
import rx.subscriptions.CompositeSubscription;
2728

2829
/**

src/main/java/rx/internal/operators/OperatorWindowWithTime.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import rx.Observer;
2626
import rx.functions.Action0;
2727
import rx.observers.*;
28+
import rx.subjects.UnicastSubject;
2829
import rx.subscriptions.Subscriptions;
2930

3031
/**

src/main/java/rx/internal/operators/UnicastSubject.java renamed to src/main/java/rx/subjects/UnicastSubject.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,18 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package rx.internal.operators;
16+
package rx.subjects;
1717

1818
import java.util.Queue;
1919
import java.util.concurrent.atomic.*;
2020

2121
import rx.*;
22+
import rx.annotations.Experimental;
2223
import rx.exceptions.*;
23-
import rx.functions.*;
24+
import rx.functions.Action0;
25+
import rx.internal.operators.*;
2426
import rx.internal.util.atomic.*;
2527
import rx.internal.util.unsafe.*;
26-
import rx.subjects.Subject;
27-
import rx.subscriptions.Subscriptions;
2828

2929
/**
3030
* A Subject variant which buffers events until a single Subscriber arrives and replays them to it
@@ -35,6 +35,7 @@
3535
*
3636
* @param <T> the input and output value type
3737
*/
38+
@Experimental
3839
public final class UnicastSubject<T> extends Subject<T, T> {
3940

4041
/**
@@ -111,7 +112,7 @@ public boolean hasObservers() {
111112
*
112113
* @param <T> the value type
113114
*/
114-
static final class State<T> extends AtomicLong implements Producer, Observer<T>, Action0, OnSubscribe<T> {
115+
static final class State<T> extends AtomicLong implements Producer, Observer<T>, OnSubscribe<T>, Subscription {
115116
/** */
116117
private static final long serialVersionUID = -9044104859202255786L;
117118
/** The single subscriber. */
@@ -250,7 +251,7 @@ public void request(long n) {
250251
@Override
251252
public void call(Subscriber<? super T> subscriber) {
252253
if (this.subscriber.compareAndSet(null, subscriber)) {
253-
subscriber.add(Subscriptions.create(this));
254+
subscriber.add(this);
254255
subscriber.setProducer(this);
255256
} else {
256257
subscriber.onError(new IllegalStateException("Only a single subscriber is allowed"));
@@ -326,7 +327,7 @@ void replay() {
326327
* Should be called only when the child unsubscribes
327328
*/
328329
@Override
329-
public void call() {
330+
public void unsubscribe() {
330331

331332
doTerminate();
332333

@@ -339,6 +340,12 @@ public void call() {
339340
}
340341
queue.clear();
341342
}
343+
344+
@Override
345+
public boolean isUnsubscribed() {
346+
return done;
347+
}
348+
342349
/**
343350
* Checks if one of the terminal conditions have been met: child unsubscribed,
344351
* an error happened or the source terminated and the queue is empty

src/test/java/rx/internal/operators/OperatorConcatTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import rx.observers.TestSubscriber;
4343
import rx.schedulers.Schedulers;
4444
import rx.schedulers.TestScheduler;
45-
import rx.subjects.Subject;
45+
import rx.subjects.*;
4646
import rx.subscriptions.BooleanSubscription;
4747

4848
public class OperatorConcatTest {

src/test/java/rx/internal/operators/BufferUntilSubscriberTest.java renamed to src/test/java/rx/subjects/BufferUntilSubscriberTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package rx.internal.operators;
16+
package rx.subjects;
1717

1818
import java.util.List;
1919
import java.util.concurrent.*;
@@ -26,7 +26,6 @@
2626
import rx.functions.*;
2727
import rx.observers.TestSubscriber;
2828
import rx.schedulers.Schedulers;
29-
import rx.subjects.PublishSubject;
3029

3130
public class BufferUntilSubscriberTest {
3231

0 commit comments

Comments
 (0)