Skip to content

Commit de4ce93

Browse files
authored
3.x: ConnectableFlowable/ConnetableFlowable redesign (#6519)
1 parent 583c4e7 commit de4ce93

16 files changed

+1036
-985
lines changed

src/main/java/io/reactivex/Flowable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -12488,7 +12488,7 @@ public final <R> Flowable<R> publish(Function<? super Flowable<T>, ? extends Pub
1248812488
@SchedulerSupport(SchedulerSupport.NONE)
1248912489
public final ConnectableFlowable<T> publish(int bufferSize) {
1249012490
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
12491-
return FlowablePublish.create(this, bufferSize);
12491+
return RxJavaPlugins.onAssembly(new FlowablePublish<T>(this, bufferSize));
1249212492
}
1249312493

1249412494
/**

src/main/java/io/reactivex/Observable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -10207,7 +10207,7 @@ public final Observable<T> onTerminateDetach() {
1020710207
@CheckReturnValue
1020810208
@SchedulerSupport(SchedulerSupport.NONE)
1020910209
public final ConnectableObservable<T> publish() {
10210-
return ObservablePublish.create(this);
10210+
return RxJavaPlugins.onAssembly(new ObservablePublish<T>(this));
1021110211
}
1021210212

1021310213
/**

src/main/java/io/reactivex/flowables/ConnectableFlowable.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,24 @@
3434
* before the {@code Flowable} begins emitting items.
3535
* <p>
3636
* <img width="640" height="510" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/publishConnect.png" alt="">
37-
*
37+
* <p>
38+
* When the upstream terminates, the {@code ConnectableFlowable} remains in this terminated state and,
39+
* depending on the actual underlying implementation, relays cached events to late {@link Subscriber}s.
40+
* In order to reuse and restart this {@code ConnectableFlowable}, the {@link #reset()} method has to be called.
41+
* When called, this {@code ConnectableFlowable} will appear as fresh, unconnected source to new {@link Subscriber}s.
42+
* Disposing the connection will reset the {@code ConnectableFlowable} to its fresh state and there is no need to call
43+
* {@code reset()} in this case.
44+
* <p>
45+
* Note that although {@link #connect()} and {@link #reset()} are safe to call from multiple threads, it is recommended
46+
* a dedicated thread or business logic manages the connection or resetting of a {@code ConnectableFlowable} so that
47+
* there is no unwanted signal loss due to early {@code connect()} or {@code reset()} calls while {@code Subscriber}s are
48+
* still being subscribed to to this {@code ConnectableFlowable} to receive signals from the get go.
49+
* <p>
3850
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators">RxJava Wiki:
3951
* Connectable Observable Operators</a>
4052
* @param <T>
4153
* the type of items emitted by the {@code ConnectableFlowable}
54+
* @since 2.0.0
4255
*/
4356
public abstract class ConnectableFlowable<T> extends Flowable<T> {
4457

@@ -53,6 +66,14 @@ public abstract class ConnectableFlowable<T> extends Flowable<T> {
5366
*/
5467
public abstract void connect(@NonNull Consumer<? super Disposable> connection);
5568

69+
/**
70+
* Resets this ConnectableFlowable into its fresh state if it has terminated.
71+
* <p>
72+
* Calling this method on a fresh or active {@code ConnectableFlowable} has no effect.
73+
* @since 3.0.0
74+
*/
75+
public abstract void reset();
76+
5677
/**
5778
* Instructs the {@code ConnectableFlowable} to begin emitting the items from its underlying
5879
* {@link Flowable} to its {@link Subscriber}s.

src/main/java/io/reactivex/internal/disposables/ResettableConnectable.java

-54
This file was deleted.

0 commit comments

Comments
 (0)