@@ -83,7 +83,8 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implemen
83
83
final NotificationLite <T > on ;
84
84
final boolean delayError ;
85
85
final Queue <Object > queue ;
86
- final int bufferSize ;
86
+ /** The emission threshold that should trigger a replenishing request. */
87
+ final int limit ;
87
88
88
89
// the status of the current stream
89
90
volatile boolean finished ;
@@ -97,6 +98,9 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implemen
97
98
* reading finished (acquire).
98
99
*/
99
100
Throwable error ;
101
+
102
+ /** Remembers how many elements have been emitted before the requests run out. */
103
+ long emitted ;
100
104
101
105
// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
102
106
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
@@ -105,12 +109,15 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boo
105
109
this .recursiveScheduler = scheduler .createWorker ();
106
110
this .delayError = delayError ;
107
111
this .on = NotificationLite .instance ();
108
- this .bufferSize = (bufferSize > 0 ) ? bufferSize : RxRingBuffer .SIZE ;
112
+ int calculatedSize = (bufferSize > 0 ) ? bufferSize : RxRingBuffer .SIZE ;
113
+ this .limit = calculatedSize - (calculatedSize >> 2 );
109
114
if (UnsafeAccess .isUnsafeAvailable ()) {
110
- queue = new SpscArrayQueue <Object >(this . bufferSize );
115
+ queue = new SpscArrayQueue <Object >(calculatedSize );
111
116
} else {
112
- queue = new SpscAtomicArrayQueue <Object >(this . bufferSize );
117
+ queue = new SpscAtomicArrayQueue <Object >(calculatedSize );
113
118
}
119
+ // signal that this is an async operator capable of receiving this many
120
+ request (calculatedSize );
114
121
}
115
122
116
123
void init () {
@@ -133,12 +140,6 @@ public void request(long n) {
133
140
localChild .add (this );
134
141
}
135
142
136
- @ Override
137
- public void onStart () {
138
- // signal that this is an async operator capable of receiving this many
139
- request (this .bufferSize );
140
- }
141
-
142
143
@ Override
143
144
public void onNext (final T t ) {
144
145
if (isUnsubscribed () || finished ) {
@@ -180,9 +181,8 @@ protected void schedule() {
180
181
// only execute this from schedule()
181
182
@ Override
182
183
public void call () {
183
- long emitted = 0L ;
184
-
185
184
long missed = 1L ;
185
+ long currentEmission = emitted ;
186
186
187
187
// these are accessed in a tight loop around atomics so
188
188
// loading them into local variables avoids the mandatory re-reading
@@ -197,7 +197,6 @@ public void call() {
197
197
198
198
for (;;) {
199
199
long requestAmount = requested .get ();
200
- long currentEmission = 0L ;
201
200
202
201
while (requestAmount != currentEmission ) {
203
202
boolean done = finished ;
@@ -215,28 +214,25 @@ public void call() {
215
214
localChild .onNext (localOn .getValue (v ));
216
215
217
216
currentEmission ++;
218
- emitted ++;
217
+ if (currentEmission == limit ) {
218
+ requestAmount = BackpressureUtils .produced (requested , currentEmission );
219
+ request (currentEmission );
220
+ currentEmission = 0L ;
221
+ }
219
222
}
220
223
221
224
if (requestAmount == currentEmission ) {
222
225
if (checkTerminated (finished , q .isEmpty (), localChild , q )) {
223
226
return ;
224
227
}
225
228
}
226
-
227
- if (currentEmission != 0L ) {
228
- BackpressureUtils .produced (requested , currentEmission );
229
- }
230
-
229
+
230
+ emitted = currentEmission ;
231
231
missed = counter .addAndGet (-missed );
232
232
if (missed == 0L ) {
233
233
break ;
234
234
}
235
235
}
236
-
237
- if (emitted != 0L ) {
238
- request (emitted );
239
- }
240
236
}
241
237
242
238
boolean checkTerminated (boolean done , boolean isEmpty , Subscriber <? super T > a , Queue <Object > q ) {
@@ -285,4 +281,4 @@ boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a,
285
281
return false ;
286
282
}
287
283
}
288
- }
284
+ }
0 commit comments