13
13
14
14
package io .reactivex .internal .operators .flowable ;
15
15
16
+ import java .util .NoSuchElementException ;
17
+
16
18
import org .reactivestreams .*;
17
19
18
20
import io .reactivex .*;
@@ -23,14 +25,17 @@ public final class FlowableSingle<T> extends AbstractFlowableWithUpstream<T, T>
23
25
24
26
final T defaultValue ;
25
27
26
- public FlowableSingle (Flowable <T > source , T defaultValue ) {
28
+ final boolean failOnEmpty ;
29
+
30
+ public FlowableSingle (Flowable <T > source , T defaultValue , boolean failOnEmpty ) {
27
31
super (source );
28
32
this .defaultValue = defaultValue ;
33
+ this .failOnEmpty = failOnEmpty ;
29
34
}
30
35
31
36
@ Override
32
37
protected void subscribeActual (Subscriber <? super T > s ) {
33
- source .subscribe (new SingleElementSubscriber <T >(s , defaultValue ));
38
+ source .subscribe (new SingleElementSubscriber <T >(s , defaultValue , failOnEmpty ));
34
39
}
35
40
36
41
static final class SingleElementSubscriber <T > extends DeferredScalarSubscription <T >
@@ -40,13 +45,16 @@ static final class SingleElementSubscriber<T> extends DeferredScalarSubscription
40
45
41
46
final T defaultValue ;
42
47
48
+ final boolean failOnEmpty ;
49
+
43
50
Subscription s ;
44
51
45
52
boolean done ;
46
53
47
- SingleElementSubscriber (Subscriber <? super T > actual , T defaultValue ) {
54
+ SingleElementSubscriber (Subscriber <? super T > actual , T defaultValue , boolean failOnEmpty ) {
48
55
super (actual );
49
56
this .defaultValue = defaultValue ;
57
+ this .failOnEmpty = failOnEmpty ;
50
58
}
51
59
52
60
@ Override
@@ -94,7 +102,11 @@ public void onComplete() {
94
102
v = defaultValue ;
95
103
}
96
104
if (v == null ) {
97
- actual .onComplete ();
105
+ if (failOnEmpty ) {
106
+ actual .onError (new NoSuchElementException ());
107
+ } else {
108
+ actual .onComplete ();
109
+ }
98
110
} else {
99
111
complete (v );
100
112
}
0 commit comments