19
19
import java .util .Collection ;
20
20
import java .util .List ;
21
21
import java .util .concurrent .ConcurrentLinkedQueue ;
22
+ import java .util .concurrent .atomic .AtomicBoolean ;
22
23
import java .util .concurrent .atomic .AtomicReference ;
23
24
24
25
import rx .Observable ;
@@ -343,10 +344,13 @@ public void unsubscribeOthers(AmbSubscriber<T> notThis) {
343
344
}
344
345
345
346
}
346
-
347
- private final Iterable <? extends Observable <? extends T >> sources ;
348
- private final Selection <T > selection = new Selection <T >();
349
-
347
+
348
+ //give default access instead of private as a micro-optimization
349
+ //for access from anonymous classes below
350
+ final Iterable <? extends Observable <? extends T >> sources ;
351
+ final Selection <T > selection = new Selection <T >();
352
+ final AtomicReference <AmbSubscriber <T >> choice = selection .choice ;
353
+
350
354
private OnSubscribeAmb (Iterable <? extends Observable <? extends T >> sources ) {
351
355
this .sources = sources ;
352
356
}
@@ -357,9 +361,10 @@ public void call(final Subscriber<? super T> subscriber) {
357
361
358
362
@ Override
359
363
public void call () {
360
- if (selection .choice .get () != null ) {
364
+ AmbSubscriber <T > c ;
365
+ if ((c = choice .get ()) != null ) {
361
366
// there is a single winner so we unsubscribe it
362
- selection . choice . get () .unsubscribe ();
367
+ c .unsubscribe ();
363
368
}
364
369
// if we are racing with others still existing, we'll also unsubscribe them
365
370
if (!selection .ambSubscribers .isEmpty ()) {
@@ -371,27 +376,47 @@ public void call() {
371
376
}
372
377
373
378
}));
379
+ //need to subscribe to all the sources
380
+ for (Observable <? extends T > source : sources ) {
381
+ if (subscriber .isUnsubscribed ()) {
382
+ return ;
383
+ }
384
+ AmbSubscriber <T > ambSubscriber = new AmbSubscriber <T >(0 , subscriber , selection );
385
+ selection .ambSubscribers .add (ambSubscriber );
386
+ // check again if choice has been made so can stop subscribing
387
+ // if all sources were backpressure aware then this check
388
+ // would be pointless given that 0 was requested above from each ambSubscriber
389
+ AmbSubscriber <T > c ;
390
+ if ((c = choice .get ()) != null ) {
391
+ // Already chose one, the rest can be skipped and we can clean up
392
+ selection .unsubscribeOthers (c );
393
+ return ;
394
+ }
395
+ source .unsafeSubscribe (ambSubscriber );
396
+ }
374
397
subscriber .setProducer (new Producer () {
375
398
376
399
@ Override
377
400
public void request (long n ) {
378
- if (selection .choice .get () != null ) {
401
+ final AmbSubscriber <T > c ;
402
+ if ((c = choice .get ()) != null ) {
379
403
// propagate the request to that single Subscriber that won
380
- selection . choice . get () .requestMore (n );
404
+ c .requestMore (n );
381
405
} else {
382
- for (Observable <? extends T > source : sources ) {
383
- if (subscriber .isUnsubscribed ()) {
384
- break ;
385
- }
386
- AmbSubscriber <T > ambSubscriber = new AmbSubscriber <T >(n , subscriber , selection );
387
- selection .ambSubscribers .add (ambSubscriber );
388
- // possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing)
389
- if (selection .choice .get () != null ) {
390
- // Already chose one, the rest can be skipped and we can clean up
391
- selection .unsubscribeOthers (selection .choice .get ());
392
- break ;
406
+ //propagate the request to all the amb subscribers
407
+ for (AmbSubscriber <T > ambSubscriber : selection .ambSubscribers ) {
408
+ if (!ambSubscriber .isUnsubscribed ()) {
409
+ // make a best endeavours check to not waste requests
410
+ // if first emission has already occurred
411
+ if (choice .get () == ambSubscriber ) {
412
+ ambSubscriber .requestMore (n );
413
+ // don't need to request from other subscribers because choice has been made
414
+ // and request has gone to choice
415
+ return ;
416
+ } else {
417
+ ambSubscriber .requestMore (n );
418
+ }
393
419
}
394
- source .unsafeSubscribe (ambSubscriber );
395
420
}
396
421
}
397
422
}
0 commit comments