@@ -129,10 +129,7 @@ public void request(long n) {
129
129
}
130
130
}
131
131
public long produced (int n ) {
132
- if (get () != Long .MAX_VALUE ) {
133
- return addAndGet (-n );
134
- }
135
- return Long .MAX_VALUE ;
132
+ return addAndGet (-n );
136
133
}
137
134
}
138
135
@@ -314,81 +311,94 @@ void removeInner(InnerSubscriber<T> inner) {
314
311
*/
315
312
void tryEmit (InnerSubscriber <T > subscriber , T value ) {
316
313
boolean success = false ;
317
- synchronized (this ) {
318
- // if nobody is emitting and child has available requests
319
- if (!emitting && producer .get () != 0 ) {
320
- emitting = true ;
321
- success = true ;
314
+ long r = producer .get ();
315
+ if (r != 0L ) {
316
+ synchronized (this ) {
317
+ // if nobody is emitting and child has available requests
318
+ if (!emitting ) {
319
+ emitting = true ;
320
+ success = true ;
321
+ }
322
322
}
323
323
}
324
324
if (success ) {
325
- boolean skipFinal = false ;
325
+ emitScalar (subscriber , value , r );
326
+ } else {
327
+ queueScalar (subscriber , value );
328
+ }
329
+ }
330
+
331
+ protected void queueScalar (InnerSubscriber <T > subscriber , T value ) {
332
+ /*
333
+ * If the attempt to make a fast-path emission failed
334
+ * due to lack of requests or an ongoing emission,
335
+ * enqueue the value and try the slow emission path.
336
+ */
337
+ RxRingBuffer q = subscriber .queue ;
338
+ if (q == null ) {
339
+ q = RxRingBuffer .getSpscInstance ();
340
+ subscriber .add (q );
341
+ subscriber .queue = q ;
342
+ }
343
+ try {
344
+ q .onNext (nl .next (value ));
345
+ } catch (MissingBackpressureException ex ) {
346
+ subscriber .unsubscribe ();
347
+ subscriber .onError (ex );
348
+ return ;
349
+ } catch (IllegalStateException ex ) {
350
+ if (!subscriber .isUnsubscribed ()) {
351
+ subscriber .unsubscribe ();
352
+ subscriber .onError (ex );
353
+ }
354
+ return ;
355
+ }
356
+ emit ();
357
+ }
358
+
359
+ protected void emitScalar (InnerSubscriber <T > subscriber , T value , long r ) {
360
+ boolean skipFinal = false ;
361
+ try {
326
362
try {
327
- try {
328
- child .onNext (value );
329
- } catch (Throwable t ) {
330
- if (!delayErrors ) {
331
- Exceptions .throwIfFatal (t );
332
- skipFinal = true ;
333
- subscriber .unsubscribe ();
334
- subscriber .onError (t );
335
- return ;
336
- }
337
- getOrCreateErrorQueue ().offer (t );
338
- }
339
- producer .produced (1 );
340
- subscriber .requestMore (1 );
341
- // check if some state changed while emitting
342
- synchronized (this ) {
363
+ child .onNext (value );
364
+ } catch (Throwable t ) {
365
+ if (!delayErrors ) {
366
+ Exceptions .throwIfFatal (t );
343
367
skipFinal = true ;
344
- if (!missed ) {
345
- emitting = false ;
346
- return ;
347
- }
348
- missed = false ;
349
- }
350
- } finally {
351
- if (!skipFinal ) {
352
- synchronized (this ) {
353
- emitting = false ;
354
- }
368
+ subscriber .unsubscribe ();
369
+ subscriber .onError (t );
370
+ return ;
355
371
}
372
+ getOrCreateErrorQueue ().offer (t );
356
373
}
357
- /*
358
- * In the synchronized block below request(1) we check
359
- * if there was a concurrent emission attempt and if there was,
360
- * we stay in emission mode and enter the emission loop
361
- * which will take care all the queued up state and
362
- * emission possibilities.
363
- */
364
- emitLoop ();
365
- } else {
366
- /*
367
- * If the attempt to make a fast-path emission failed
368
- * due to lack of requests or an ongoing emission,
369
- * enqueue the value and try the slow emission path.
370
- */
371
- RxRingBuffer q = subscriber .queue ;
372
- if (q == null ) {
373
- q = RxRingBuffer .getSpscInstance ();
374
- subscriber .add (q );
375
- subscriber .queue = q ;
374
+ if (r != Long .MAX_VALUE ) {
375
+ producer .produced (1 );
376
376
}
377
- try {
378
- q .onNext (nl .next (value ));
379
- } catch (MissingBackpressureException ex ) {
380
- subscriber .unsubscribe ();
381
- subscriber .onError (ex );
382
- return ;
383
- } catch (IllegalStateException ex ) {
384
- if (!subscriber .isUnsubscribed ()) {
385
- subscriber .unsubscribe ();
386
- subscriber .onError (ex );
377
+ subscriber .requestMore (1 );
378
+ // check if some state changed while emitting
379
+ synchronized (this ) {
380
+ skipFinal = true ;
381
+ if (!missed ) {
382
+ emitting = false ;
383
+ return ;
384
+ }
385
+ missed = false ;
386
+ }
387
+ } finally {
388
+ if (!skipFinal ) {
389
+ synchronized (this ) {
390
+ emitting = false ;
387
391
}
388
- return ;
389
392
}
390
- emit ();
391
393
}
394
+ /*
395
+ * In the synchronized block below request(1) we check
396
+ * if there was a concurrent emission attempt and if there was,
397
+ * we stay in emission mode and enter the emission loop
398
+ * which will take care all the queued up state and
399
+ * emission possibilities.
400
+ */
401
+ emitLoop ();
392
402
}
393
403
394
404
public void requestMore (long n ) {
@@ -408,81 +418,94 @@ public void requestMore(long n) {
408
418
*/
409
419
void tryEmit (T value ) {
410
420
boolean success = false ;
411
- synchronized (this ) {
412
- // if nobody is emitting and child has available requests
413
- if (!emitting && producer .get () != 0 ) {
414
- emitting = true ;
415
- success = true ;
421
+ long r = producer .get ();
422
+ if (r != 0L ) {
423
+ synchronized (this ) {
424
+ // if nobody is emitting and child has available requests
425
+ if (!emitting ) {
426
+ emitting = true ;
427
+ success = true ;
428
+ }
416
429
}
417
430
}
418
431
if (success ) {
419
- boolean skipFinal = false ;
432
+ emitScalar (value , r );
433
+ } else {
434
+ queueScalar (value );
435
+ }
436
+ }
437
+
438
+ protected void queueScalar (T value ) {
439
+ /*
440
+ * If the attempt to make a fast-path emission failed
441
+ * due to lack of requests or an ongoing emission,
442
+ * enqueue the value and try the slow emission path.
443
+ */
444
+ RxRingBuffer q = this .queue ;
445
+ if (q == null ) {
446
+ q = RxRingBuffer .getSpscInstance ();
447
+ this .add (q );
448
+ this .queue = q ;
449
+ }
450
+ try {
451
+ q .onNext (nl .next (value ));
452
+ } catch (MissingBackpressureException ex ) {
453
+ this .unsubscribe ();
454
+ this .onError (ex );
455
+ return ;
456
+ } catch (IllegalStateException ex ) {
457
+ if (!this .isUnsubscribed ()) {
458
+ this .unsubscribe ();
459
+ this .onError (ex );
460
+ }
461
+ return ;
462
+ }
463
+ emit ();
464
+ }
465
+
466
+ protected void emitScalar (T value , long r ) {
467
+ boolean skipFinal = false ;
468
+ try {
420
469
try {
421
- try {
422
- child .onNext (value );
423
- } catch (Throwable t ) {
424
- if (!delayErrors ) {
425
- Exceptions .throwIfFatal (t );
426
- skipFinal = true ;
427
- this .unsubscribe ();
428
- this .onError (t );
429
- return ;
430
- }
431
- getOrCreateErrorQueue ().offer (t );
432
- }
433
- producer .produced (1 );
434
- this .requestMore (1 );
435
- // check if some state changed while emitting
436
- synchronized (this ) {
470
+ child .onNext (value );
471
+ } catch (Throwable t ) {
472
+ if (!delayErrors ) {
473
+ Exceptions .throwIfFatal (t );
437
474
skipFinal = true ;
438
- if (!missed ) {
439
- emitting = false ;
440
- return ;
441
- }
442
- missed = false ;
443
- }
444
- } finally {
445
- if (!skipFinal ) {
446
- synchronized (this ) {
447
- emitting = false ;
448
- }
475
+ this .unsubscribe ();
476
+ this .onError (t );
477
+ return ;
449
478
}
479
+ getOrCreateErrorQueue ().offer (t );
450
480
}
451
- /*
452
- * In the synchronized block below request(1) we check
453
- * if there was a concurrent emission attempt and if there was,
454
- * we stay in emission mode and enter the emission loop
455
- * which will take care all the queued up state and
456
- * emission possibilities.
457
- */
458
- emitLoop ();
459
- } else {
460
- /*
461
- * If the attempt to make a fast-path emission failed
462
- * due to lack of requests or an ongoing emission,
463
- * enqueue the value and try the slow emission path.
464
- */
465
- RxRingBuffer q = this .queue ;
466
- if (q == null ) {
467
- q = RxRingBuffer .getSpscInstance ();
468
- this .add (q );
469
- this .queue = q ;
481
+ if (r != Long .MAX_VALUE ) {
482
+ producer .produced (1 );
470
483
}
471
- try {
472
- q .onNext (nl .next (value ));
473
- } catch (MissingBackpressureException ex ) {
474
- this .unsubscribe ();
475
- this .onError (ex );
476
- return ;
477
- } catch (IllegalStateException ex ) {
478
- if (!this .isUnsubscribed ()) {
479
- this .unsubscribe ();
480
- this .onError (ex );
484
+ this .requestMore (1 );
485
+ // check if some state changed while emitting
486
+ synchronized (this ) {
487
+ skipFinal = true ;
488
+ if (!missed ) {
489
+ emitting = false ;
490
+ return ;
491
+ }
492
+ missed = false ;
493
+ }
494
+ } finally {
495
+ if (!skipFinal ) {
496
+ synchronized (this ) {
497
+ emitting = false ;
481
498
}
482
- return ;
483
499
}
484
- emit ();
485
500
}
501
+ /*
502
+ * In the synchronized block below request(1) we check
503
+ * if there was a concurrent emission attempt and if there was,
504
+ * we stay in emission mode and enter the emission loop
505
+ * which will take care all the queued up state and
506
+ * emission possibilities.
507
+ */
508
+ emitLoop ();
486
509
}
487
510
488
511
void emit () {
@@ -511,6 +534,7 @@ void emitLoop() {
511
534
RxRingBuffer svq = queue ;
512
535
513
536
long r = producer .get ();
537
+ boolean unbounded = r == Long .MAX_VALUE ;
514
538
515
539
// count the number of 'completed' sources to replenish them in batches
516
540
int replenishMain = 0 ;
@@ -549,7 +573,11 @@ void emitLoop() {
549
573
r --;
550
574
}
551
575
if (scalarEmission > 0 ) {
552
- r = producer .produced (scalarEmission );
576
+ if (unbounded ) {
577
+ r = Long .MAX_VALUE ;
578
+ } else {
579
+ r = producer .produced (scalarEmission );
580
+ }
553
581
}
554
582
if (r == 0L || o == null ) {
555
583
break ;
@@ -664,7 +692,11 @@ void emitLoop() {
664
692
produced ++;
665
693
}
666
694
if (produced > 0 ) {
667
- r = producer .produced (produced );
695
+ if (!unbounded ) {
696
+ r = producer .produced (produced );
697
+ } else {
698
+ r = Long .MAX_VALUE ;
699
+ }
668
700
is .requestMore (produced );
669
701
}
670
702
// if we run out of requests or queued values, break
0 commit comments