1515 */
1616package rx .internal .operators ;
1717
18+ import java .util .Queue ;
1819import java .util .concurrent .atomic .*;
1920
2021import rx .Observable .Operator ;
2122import rx .*;
2223import rx .exceptions .MissingBackpressureException ;
2324import rx .functions .Action0 ;
24- import rx .internal .util .RxRingBuffer ;
25- import rx .internal .util .unsafe .SpscArrayQueue ;
25+ import rx .internal .util .* ;
26+ import rx .internal .util .unsafe .* ;
2627import rx .schedulers .*;
2728
2829/**
@@ -64,7 +65,7 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
6465 final ScheduledUnsubscribe scheduledUnsubscribe ;
6566 final NotificationLite <T > on = NotificationLite .instance ();
6667
67- final SpscArrayQueue <Object > queue = new SpscArrayQueue < Object >( RxRingBuffer . SIZE ) ;
68+ final Queue <Object > queue ;
6869 volatile boolean completed = false ;
6970 volatile boolean failure = false ;
7071
@@ -84,6 +85,11 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
8485 public ObserveOnSubscriber (Scheduler scheduler , Subscriber <? super T > child ) {
8586 this .child = child ;
8687 this .recursiveScheduler = scheduler .createWorker ();
88+ if (UnsafeAccess .isUnsafeAvailable ()) {
89+ queue = new SpscArrayQueue <Object >(RxRingBuffer .SIZE );
90+ } else {
91+ queue = new SynchronizedQueue <Object >(RxRingBuffer .SIZE );
92+ }
8793 this .scheduledUnsubscribe = new ScheduledUnsubscribe (recursiveScheduler );
8894 child .add (scheduledUnsubscribe );
8995 child .setProducer (new Producer () {
0 commit comments