File tree Expand file tree Collapse file tree 3 files changed +73
-12
lines changed
main/java/io/reactivex/internal/operators/observable
test/java/io/reactivex/internal/operators Expand file tree Collapse file tree 3 files changed +73
-12
lines changed Original file line number Diff line number Diff line change @@ -334,6 +334,7 @@ void drainLoop() {
334334 if (checkTerminate ()) {
335335 return ;
336336 }
337+ int innerCompleted = 0 ;
337338 SimplePlainQueue <U > svq = queue ;
338339
339340 if (svq != null ) {
@@ -349,9 +350,18 @@ void drainLoop() {
349350 }
350351
351352 child .onNext (o );
353+ innerCompleted ++;
352354 }
353355 }
354356
357+ if (innerCompleted != 0 ) {
358+ if (maxConcurrency != Integer .MAX_VALUE ) {
359+ subscribeMore (innerCompleted );
360+ innerCompleted = 0 ;
361+ }
362+ continue ;
363+ }
364+
355365 boolean d = done ;
356366 svq = queue ;
357367 InnerObserver <?, ?>[] inner = observers .get ();
@@ -376,7 +386,6 @@ void drainLoop() {
376386 return ;
377387 }
378388
379- int innerCompleted = 0 ;
380389 if (n != 0 ) {
381390 long startId = lastId ;
382391 int index = lastIndex ;
@@ -463,27 +472,33 @@ void drainLoop() {
463472
464473 if (innerCompleted != 0 ) {
465474 if (maxConcurrency != Integer .MAX_VALUE ) {
466- while (innerCompleted -- != 0 ) {
467- ObservableSource <? extends U > p ;
468- synchronized (this ) {
469- p = sources .poll ();
470- if (p == null ) {
471- wip --;
472- continue ;
473- }
474- }
475- subscribeInner (p );
476- }
475+ subscribeMore (innerCompleted );
476+ innerCompleted = 0 ;
477477 }
478478 continue ;
479479 }
480+
480481 missed = addAndGet (-missed );
481482 if (missed == 0 ) {
482483 break ;
483484 }
484485 }
485486 }
486487
488+ void subscribeMore (int innerCompleted ) {
489+ while (innerCompleted -- != 0 ) {
490+ ObservableSource <? extends U > p ;
491+ synchronized (this ) {
492+ p = sources .poll ();
493+ if (p == null ) {
494+ wip --;
495+ continue ;
496+ }
497+ }
498+ subscribeInner (p );
499+ }
500+ }
501+
487502 boolean checkTerminate () {
488503 if (cancelled ) {
489504 return true ;
Original file line number Diff line number Diff line change @@ -1157,4 +1157,27 @@ public void innerErrorsMainCancelled() {
11571157
11581158 assertFalse ("Has subscribers?" , pp1 .hasSubscribers ());
11591159 }
1160+
1161+ @ Test (timeout = 5000 )
1162+ public void mixedScalarAsync () {
1163+ for (int i = 0 ; i < TestHelper .RACE_DEFAULT_LOOPS ; i ++) {
1164+ Flowable
1165+ .range (0 , 20 )
1166+ .flatMap (new Function <Integer , Publisher <?>>() {
1167+ @ Override
1168+ public Publisher <?> apply (Integer integer ) throws Exception {
1169+ if (integer % 5 != 0 ) {
1170+ return Flowable
1171+ .just (integer );
1172+ }
1173+
1174+ return Flowable
1175+ .just (-integer )
1176+ .observeOn (Schedulers .computation ());
1177+ }
1178+ }, false , 1 )
1179+ .ignoreElements ()
1180+ .blockingAwait ();
1181+ }
1182+ }
11601183}
Original file line number Diff line number Diff line change @@ -1118,4 +1118,27 @@ public void innerErrorsMainCancelled() {
11181118
11191119 assertFalse ("Has subscribers?" , ps1 .hasObservers ());
11201120 }
1121+
1122+ @ Test (timeout = 5000 )
1123+ public void mixedScalarAsync () {
1124+ for (int i = 0 ; i < TestHelper .RACE_DEFAULT_LOOPS ; i ++) {
1125+ Observable
1126+ .range (0 , 20 )
1127+ .flatMap (new Function <Integer , ObservableSource <?>>() {
1128+ @ Override
1129+ public ObservableSource <?> apply (Integer integer ) throws Exception {
1130+ if (integer % 5 != 0 ) {
1131+ return Observable
1132+ .just (integer );
1133+ }
1134+
1135+ return Observable
1136+ .just (-integer )
1137+ .observeOn (Schedulers .computation ());
1138+ }
1139+ }, false , 1 )
1140+ .ignoreElements ()
1141+ .blockingAwait ();
1142+ }
1143+ }
11211144}
You can’t perform that action at this time.
0 commit comments