File tree 2 files changed +19
-3
lines changed
main/java/rx/internal/operators
test/java/rx/internal/operators 2 files changed +19
-3
lines changed Original file line number Diff line number Diff line change @@ -475,7 +475,7 @@ protected void queueScalar(T value) {
475
475
}
476
476
this .queue = q ;
477
477
}
478
- if (!q .offer (value )) {
478
+ if (!q .offer (nl . next ( value ) )) {
479
479
unsubscribe ();
480
480
onError (OnErrorThrowable .addValueAsLastCause (new MissingBackpressureException (), value ));
481
481
return ;
Original file line number Diff line number Diff line change 28
28
import org .mockito .*;
29
29
30
30
import rx .*;
31
- import rx .Observable .OnSubscribe ;
32
- import rx .Scheduler .Worker ;
33
31
import rx .Observable ;
32
+ import rx .Observable .OnSubscribe ;
34
33
import rx .Observer ;
34
+ import rx .Scheduler .Worker ;
35
35
import rx .functions .*;
36
36
import rx .internal .util .RxRingBuffer ;
37
37
import rx .observers .TestSubscriber ;
@@ -1353,4 +1353,20 @@ public void zeroMaxConcurrent() {
1353
1353
assertEquals ("maxConcurrent > 0 required but it was 0" , e .getMessage ());
1354
1354
}
1355
1355
}
1356
+
1357
+ @ Test
1358
+ public void mergeJustNull () {
1359
+ TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
1360
+
1361
+ Observable .range (1 , 2 ).flatMap (new Func1 <Integer , Observable <Integer >>() {
1362
+ @ Override
1363
+ public Observable <Integer > call (Integer t ) {
1364
+ return Observable .just (null );
1365
+ }
1366
+ }).subscribe (ts );
1367
+
1368
+ ts .assertValues (null , null );
1369
+ ts .assertNoErrors ();
1370
+ ts .assertCompleted ();
1371
+ }
1356
1372
}
You can’t perform that action at this time.
0 commit comments