Skip to content

Commit 99366ee

Browse files
Desislav-Petrovakarnokd
authored andcommitted
Adding eager concats to Single (#5976)
* Adding eager concats to Single * Removing unnecessary class - the utility already exists * Covering publisher sequence * Correcting javadoc * Correcting javadoc * Fixing javadoc "Single sources" -> "SingleSources" * Fixing javadoc "Maybe sources" -> "MaybeSources"
1 parent be12fe2 commit 99366ee

File tree

3 files changed

+144
-10
lines changed

3 files changed

+144
-10
lines changed

src/main/java/io/reactivex/Maybe.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
public abstract class Maybe<T> implements MaybeSource<T> {
5050

5151
/**
52-
* Runs multiple Maybe sources and signals the events of the first one that signals (cancelling
52+
* Runs multiple MaybeSources and signals the events of the first one that signals (cancelling
5353
* the rest).
5454
* <dl>
5555
* <dt><b>Scheduler:</b></dt>
@@ -68,7 +68,7 @@ public static <T> Maybe<T> amb(final Iterable<? extends MaybeSource<? extends T>
6868
}
6969

7070
/**
71-
* Runs multiple Maybe sources and signals the events of the first one that signals (cancelling
71+
* Runs multiple MaybeSources and signals the events of the first one that signals (cancelling
7272
* the rest).
7373
* <dl>
7474
* <dt><b>Scheduler:</b></dt>
@@ -412,7 +412,7 @@ public static <T> Flowable<T> concatEager(Iterable<? extends MaybeSource<? exten
412412
}
413413

414414
/**
415-
* Concatenates a Publisher sequence of Publishers eagerly into a single stream of values.
415+
* Concatenates a Publisher sequence of MaybeSources eagerly into a single stream of values.
416416
* <p>
417417
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
418418
* emitted source Publishers as they are observed. The operator buffers the values emitted by these

src/main/java/io/reactivex/Single.java

Lines changed: 81 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
public abstract class Single<T> implements SingleSource<T> {
6161

6262
/**
63-
* Runs multiple Single sources and signals the events of the first one that signals (cancelling
63+
* Runs multiple SingleSources and signals the events of the first one that signals (cancelling
6464
* the rest).
6565
* <dl>
6666
* <dt><b>Scheduler:</b></dt>
@@ -80,7 +80,7 @@ public static <T> Single<T> amb(final Iterable<? extends SingleSource<? extends
8080
}
8181

8282
/**
83-
* Runs multiple Single sources and signals the events of the first one that signals (cancelling
83+
* Runs multiple SingleSources and signals the events of the first one that signals (cancelling
8484
* the rest).
8585
* <dl>
8686
* <dt><b>Scheduler:</b></dt>
@@ -106,7 +106,7 @@ public static <T> Single<T> ambArray(final SingleSource<? extends T>... sources)
106106
}
107107

108108
/**
109-
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by
109+
* Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by
110110
* an Iterable sequence.
111111
* <dl>
112112
* <dt><b>Backpressure:</b></dt>
@@ -127,7 +127,7 @@ public static <T> Flowable<T> concat(Iterable<? extends SingleSource<? extends T
127127
}
128128

129129
/**
130-
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by
130+
* Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by
131131
* an Observable sequence.
132132
* <dl>
133133
* <dt><b>Scheduler:</b></dt>
@@ -147,7 +147,7 @@ public static <T> Observable<T> concat(ObservableSource<? extends SingleSource<?
147147
}
148148

149149
/**
150-
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by
150+
* Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by
151151
* a Publisher sequence.
152152
* <dl>
153153
* <dt><b>Backpressure:</b></dt>
@@ -169,7 +169,7 @@ public static <T> Flowable<T> concat(Publisher<? extends SingleSource<? extends
169169
}
170170

171171
/**
172-
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by
172+
* Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by
173173
* a Publisher sequence and prefetched by the specified amount.
174174
* <dl>
175175
* <dt><b>Backpressure:</b></dt>
@@ -299,7 +299,7 @@ public static <T> Flowable<T> concat(
299299
}
300300

301301
/**
302-
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided in
302+
* Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided in
303303
* an array.
304304
* <dl>
305305
* <dt><b>Backpressure:</b></dt>
@@ -320,6 +320,80 @@ public static <T> Flowable<T> concatArray(SingleSource<? extends T>... sources)
320320
return RxJavaPlugins.onAssembly(new FlowableConcatMap(Flowable.fromArray(sources), SingleInternalHelper.toFlowable(), 2, ErrorMode.BOUNDARY));
321321
}
322322

323+
/**
324+
* Concatenates a sequence of SingleSource eagerly into a single stream of values.
325+
* <p>
326+
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
327+
* source SingleSources. The operator buffers the value emitted by these SingleSources and then drains them
328+
* in order, each one after the previous one completes.
329+
* <dl>
330+
* <dt><b>Backpressure:</b></dt>
331+
* <dd>The operator honors backpressure from downstream.</dd>
332+
* <dt><b>Scheduler:</b></dt>
333+
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
334+
* </dl>
335+
* @param <T> the value type
336+
* @param sources a sequence of Single that need to be eagerly concatenated
337+
* @return the new Flowable instance with the specified concatenation behavior
338+
*/
339+
@SuppressWarnings({ "rawtypes", "unchecked" })
340+
@BackpressureSupport(BackpressureKind.FULL)
341+
@CheckReturnValue
342+
@SchedulerSupport(SchedulerSupport.NONE)
343+
public static <T> Flowable<T> concatArrayEager(SingleSource<? extends T>... sources) {
344+
return Flowable.fromArray(sources).concatMapEager(SingleInternalHelper.<T>toFlowable());
345+
}
346+
347+
/**
348+
* Concatenates a Publisher sequence of SingleSources eagerly into a single stream of values.
349+
* <p>
350+
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
351+
* emitted source Publishers as they are observed. The operator buffers the values emitted by these
352+
* Publishers and then drains them in order, each one after the previous one completes.
353+
* <dl>
354+
* <dt><b>Backpressure:</b></dt>
355+
* <dd>Backpressure is honored towards the downstream and the outer Publisher is
356+
* expected to support backpressure. Violating this assumption, the operator will
357+
* signal {@link io.reactivex.exceptions.MissingBackpressureException}.</dd>
358+
* <dt><b>Scheduler:</b></dt>
359+
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
360+
* </dl>
361+
* @param <T> the value type
362+
* @param sources a sequence of Publishers that need to be eagerly concatenated
363+
* @return the new Publisher instance with the specified concatenation behavior
364+
*/
365+
@SuppressWarnings({ "rawtypes", "unchecked" })
366+
@BackpressureSupport(BackpressureKind.FULL)
367+
@CheckReturnValue
368+
@SchedulerSupport(SchedulerSupport.NONE)
369+
public static <T> Flowable<T> concatEager(Publisher<? extends SingleSource<? extends T>> sources) {
370+
return Flowable.fromPublisher(sources).concatMapEager(SingleInternalHelper.<T>toFlowable());
371+
}
372+
373+
/**
374+
* Concatenates a sequence of SingleSources eagerly into a single stream of values.
375+
* <p>
376+
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
377+
* source SingleSources. The operator buffers the values emitted by these SingleSources and then drains them
378+
* in order, each one after the previous one completes.
379+
* <dl>
380+
* <dt><b>Backpressure:</b></dt>
381+
* <dd>Backpressure is honored towards the downstream.</dd>
382+
* <dt><b>Scheduler:</b></dt>
383+
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
384+
* </dl>
385+
* @param <T> the value type
386+
* @param sources a sequence of SingleSource that need to be eagerly concatenated
387+
* @return the new Flowable instance with the specified concatenation behavior
388+
*/
389+
@SuppressWarnings({ "rawtypes", "unchecked" })
390+
@BackpressureSupport(BackpressureKind.FULL)
391+
@CheckReturnValue
392+
@SchedulerSupport(SchedulerSupport.NONE)
393+
public static <T> Flowable<T> concatEager(Iterable<? extends SingleSource<? extends T>> sources) {
394+
return Flowable.fromIterable(sources).concatMapEager(SingleInternalHelper.<T>toFlowable());
395+
}
396+
323397
/**
324398
* Provides an API (via a cold Completable) that bridges the reactive world with the callback-style world.
325399
* <p>

src/test/java/io/reactivex/internal/operators/single/SingleConcatTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@
1414
package io.reactivex.internal.operators.single;
1515

1616
import static org.junit.Assert.assertEquals;
17+
import static org.junit.Assert.assertTrue;
1718

1819
import java.util.Arrays;
1920

21+
import io.reactivex.processors.PublishProcessor;
22+
import io.reactivex.subscribers.TestSubscriber;
2023
import org.junit.Test;
2124

2225
import io.reactivex.*;
@@ -67,6 +70,63 @@ public void concatArray() {
6770
}
6871
}
6972

73+
@SuppressWarnings("unchecked")
74+
@Test
75+
public void concatArrayEagerTest() {
76+
PublishProcessor<String> pp1 = PublishProcessor.create();
77+
PublishProcessor<String> pp2 = PublishProcessor.create();
78+
79+
TestSubscriber<String> ts = Single.concatArrayEager(pp1.single("1"), pp2.single("2")).test();
80+
81+
assertTrue(pp1.hasSubscribers());
82+
assertTrue(pp2.hasSubscribers());
83+
84+
pp2.onComplete();
85+
ts.assertEmpty();
86+
pp1.onComplete();
87+
88+
ts.assertResult("1", "2");
89+
ts.assertComplete();
90+
}
91+
92+
@SuppressWarnings("unchecked")
93+
@Test
94+
public void concatEagerIterableTest() {
95+
PublishProcessor<String> pp1 = PublishProcessor.create();
96+
PublishProcessor<String> pp2 = PublishProcessor.create();
97+
98+
TestSubscriber<String> ts = Single.concatEager(Arrays.asList(pp1.single("2"), pp2.single("1"))).test();
99+
100+
assertTrue(pp1.hasSubscribers());
101+
assertTrue(pp2.hasSubscribers());
102+
103+
pp2.onComplete();
104+
ts.assertEmpty();
105+
pp1.onComplete();
106+
107+
ts.assertResult("2", "1");
108+
ts.assertComplete();
109+
}
110+
111+
@SuppressWarnings("unchecked")
112+
@Test
113+
public void concatEagerPublisherTest() {
114+
PublishProcessor<String> pp1 = PublishProcessor.create();
115+
PublishProcessor<String> pp2 = PublishProcessor.create();
116+
117+
TestSubscriber<String> ts = Single.concatEager(Flowable.just(pp1.single("1"), pp2.single("2"))).test();
118+
119+
assertTrue(pp1.hasSubscribers());
120+
assertTrue(pp2.hasSubscribers());
121+
122+
pp2.onComplete();
123+
ts.assertEmpty();
124+
pp1.onComplete();
125+
126+
ts.assertResult("1", "2");
127+
ts.assertComplete();
128+
}
129+
70130
@SuppressWarnings("unchecked")
71131
@Test
72132
public void concatObservable() {

0 commit comments

Comments
 (0)