@@ -106,28 +106,16 @@ public static final class GroupBySubscriber<T, K, V>
106
106
107
107
final ProducerArbiter s ;
108
108
109
- volatile int cancelled ;
110
- @ SuppressWarnings ("rawtypes" )
111
- static final AtomicIntegerFieldUpdater <GroupBySubscriber > CANCELLED =
112
- AtomicIntegerFieldUpdater .newUpdater (GroupBySubscriber .class , "cancelled" );
109
+ final AtomicBoolean cancelled ;
113
110
114
- volatile long requested ;
115
- @ SuppressWarnings ("rawtypes" )
116
- static final AtomicLongFieldUpdater <GroupBySubscriber > REQUESTED =
117
- AtomicLongFieldUpdater .newUpdater (GroupBySubscriber .class , "requested" );
118
-
119
- volatile int groupCount ;
120
- @ SuppressWarnings ("rawtypes" )
121
- static final AtomicIntegerFieldUpdater <GroupBySubscriber > GROUP_COUNT =
122
- AtomicIntegerFieldUpdater .newUpdater (GroupBySubscriber .class , "groupCount" );
111
+ final AtomicLong requested ;
112
+
113
+ final AtomicInteger groupCount ;
123
114
124
115
Throwable error ;
125
116
volatile boolean done ;
126
117
127
- volatile int wip ;
128
- @ SuppressWarnings ("rawtypes" )
129
- static final AtomicIntegerFieldUpdater <GroupBySubscriber > WIP =
130
- AtomicIntegerFieldUpdater .newUpdater (GroupBySubscriber .class , "wip" );
118
+ final AtomicInteger wip ;
131
119
132
120
public GroupBySubscriber (Subscriber <? super GroupedObservable <K , V >> actual , Func1 <? super T , ? extends K > keySelector , Func1 <? super T , ? extends V > valueSelector , int bufferSize , boolean delayError ) {
133
121
this .actual = actual ;
@@ -137,10 +125,13 @@ public GroupBySubscriber(Subscriber<? super GroupedObservable<K, V>> actual, Fun
137
125
this .delayError = delayError ;
138
126
this .groups = new ConcurrentHashMap <Object , GroupedUnicast <K , V >>();
139
127
this .queue = new ConcurrentLinkedQueue <GroupedObservable <K , V >>();
140
- GROUP_COUNT .lazySet (this , 1 );
141
128
this .s = new ProducerArbiter ();
142
129
this .s .request (bufferSize );
143
130
this .producer = new GroupByProducer (this );
131
+ this .cancelled = new AtomicBoolean ();
132
+ this .requested = new AtomicLong ();
133
+ this .groupCount = new AtomicInteger (1 );
134
+ this .wip = new AtomicInteger ();
144
135
}
145
136
146
137
@ Override
@@ -172,11 +163,11 @@ public void onNext(T t) {
172
163
if (group == null ) {
173
164
// if the main has been cancelled, stop creating groups
174
165
// and skip this value
175
- if (cancelled == 0 ) {
166
+ if (! cancelled . get () ) {
176
167
group = GroupedUnicast .createWith (key , bufferSize , this , delayError );
177
168
groups .put (mapKey , group );
178
169
179
- GROUP_COUNT .getAndIncrement (this );
170
+ groupCount .getAndIncrement ();
180
171
181
172
notNew = false ;
182
173
q .offer (group );
@@ -210,7 +201,7 @@ public void onError(Throwable t) {
210
201
}
211
202
error = t ;
212
203
done = true ;
213
- GROUP_COUNT .decrementAndGet (this );
204
+ groupCount .decrementAndGet ();
214
205
drain ();
215
206
}
216
207
@@ -226,7 +217,7 @@ public void onCompleted() {
226
217
groups .clear ();
227
218
228
219
done = true ;
229
- GROUP_COUNT .decrementAndGet (this );
220
+ groupCount .decrementAndGet ();
230
221
drain ();
231
222
}
232
223
@@ -235,15 +226,15 @@ public void requestMore(long n) {
235
226
throw new IllegalArgumentException ("n >= 0 required but it was " + n );
236
227
}
237
228
238
- BackpressureUtils .getAndAddRequest (REQUESTED , this , n );
229
+ BackpressureUtils .getAndAddRequest (requested , n );
239
230
drain ();
240
231
}
241
232
242
233
public void cancel () {
243
234
// cancelling the main source means we don't want any more groups
244
235
// but running groups still require new values
245
- if (CANCELLED .compareAndSet (this , 0 , 1 )) {
246
- if (GROUP_COUNT .decrementAndGet (this ) == 0 ) {
236
+ if (cancelled .compareAndSet (false , true )) {
237
+ if (groupCount .decrementAndGet () == 0 ) {
247
238
unsubscribe ();
248
239
}
249
240
}
@@ -252,14 +243,14 @@ public void cancel() {
252
243
public void cancel (K key ) {
253
244
Object mapKey = key != null ? key : NULL_KEY ;
254
245
if (groups .remove (mapKey ) != null ) {
255
- if (GROUP_COUNT .decrementAndGet (this ) == 0 ) {
246
+ if (groupCount .decrementAndGet () == 0 ) {
256
247
unsubscribe ();
257
248
}
258
249
}
259
250
}
260
251
261
252
void drain () {
262
- if (WIP .getAndIncrement (this ) != 0 ) {
253
+ if (wip .getAndIncrement () != 0 ) {
263
254
return ;
264
255
}
265
256
@@ -274,7 +265,7 @@ void drain() {
274
265
return ;
275
266
}
276
267
277
- long r = requested ;
268
+ long r = requested . get () ;
278
269
boolean unbounded = r == Long .MAX_VALUE ;
279
270
long e = 0L ;
280
271
@@ -301,12 +292,12 @@ void drain() {
301
292
302
293
if (e != 0L ) {
303
294
if (!unbounded ) {
304
- REQUESTED .addAndGet (this , e );
295
+ requested .addAndGet (e );
305
296
}
306
297
s .request (-e );
307
298
}
308
299
309
- missed = WIP .addAndGet (this , -missed );
300
+ missed = wip .addAndGet (-missed );
310
301
if (missed == 0 ) {
311
302
break ;
312
303
}
@@ -378,35 +369,27 @@ static final class State<T, K> extends AtomicInteger implements Producer, Subscr
378
369
final GroupBySubscriber <?, K , T > parent ;
379
370
final boolean delayError ;
380
371
381
- volatile long requested ;
382
- @ SuppressWarnings ("rawtypes" )
383
- static final AtomicLongFieldUpdater <State > REQUESTED =
384
- AtomicLongFieldUpdater .newUpdater (State .class , "requested" );
372
+ final AtomicLong requested ;
385
373
386
374
volatile boolean done ;
387
375
Throwable error ;
388
376
389
- volatile int cancelled ;
390
- @ SuppressWarnings ("rawtypes" )
391
- static final AtomicIntegerFieldUpdater <State > CANCELLED =
392
- AtomicIntegerFieldUpdater .newUpdater (State .class , "cancelled" );
393
-
394
- volatile Subscriber <? super T > actual ;
395
- @ SuppressWarnings ("rawtypes" )
396
- static final AtomicReferenceFieldUpdater <State , Subscriber > ACTUAL =
397
- AtomicReferenceFieldUpdater .newUpdater (State .class , Subscriber .class , "actual" );
377
+ final AtomicBoolean cancelled ;
378
+
379
+ final AtomicReference <Subscriber <? super T >> actual ;
398
380
399
- volatile int once ;
400
- @ SuppressWarnings ("rawtypes" )
401
- static final AtomicIntegerFieldUpdater <State > ONCE =
402
- AtomicIntegerFieldUpdater .newUpdater (State .class , "once" );
381
+ final AtomicBoolean once ;
403
382
404
383
405
384
public State (int bufferSize , GroupBySubscriber <?, K , T > parent , K key , boolean delayError ) {
406
385
this .queue = new ConcurrentLinkedQueue <Object >();
407
386
this .parent = parent ;
408
387
this .key = key ;
409
388
this .delayError = delayError ;
389
+ this .cancelled = new AtomicBoolean ();
390
+ this .actual = new AtomicReference <Subscriber <? super T >>();
391
+ this .once = new AtomicBoolean ();
392
+ this .requested = new AtomicLong ();
410
393
}
411
394
412
395
@ Override
@@ -415,19 +398,19 @@ public void request(long n) {
415
398
throw new IllegalArgumentException ("n >= required but it was " + n );
416
399
}
417
400
if (n != 0L ) {
418
- BackpressureUtils .getAndAddRequest (REQUESTED , this , n );
401
+ BackpressureUtils .getAndAddRequest (requested , n );
419
402
drain ();
420
403
}
421
404
}
422
405
423
406
@ Override
424
407
public boolean isUnsubscribed () {
425
- return cancelled != 0 ;
408
+ return cancelled . get () ;
426
409
}
427
410
428
411
@ Override
429
412
public void unsubscribe () {
430
- if (CANCELLED .compareAndSet (this , 0 , 1 )) {
413
+ if (cancelled .compareAndSet (false , true )) {
431
414
if (getAndIncrement () == 0 ) {
432
415
parent .cancel (key );
433
416
}
@@ -436,10 +419,10 @@ public void unsubscribe() {
436
419
437
420
@ Override
438
421
public void call (Subscriber <? super T > s ) {
439
- if (ONCE .compareAndSet (this , 0 , 1 )) {
422
+ if (once .compareAndSet (false , true )) {
440
423
s .add (this );
441
424
s .setProducer (this );
442
- ACTUAL .lazySet (this , s );
425
+ actual .lazySet (s );
443
426
drain ();
444
427
} else {
445
428
s .onError (new IllegalStateException ("Only one Subscriber allowed!" ));
@@ -475,15 +458,15 @@ void drain() {
475
458
476
459
final Queue <Object > q = queue ;
477
460
final boolean delayError = this .delayError ;
478
- Subscriber <? super T > a = actual ;
461
+ Subscriber <? super T > a = actual . get () ;
479
462
NotificationLite <T > nl = NotificationLite .instance ();
480
463
for (;;) {
481
464
if (a != null ) {
482
465
if (checkTerminated (done , q .isEmpty (), a , delayError )) {
483
466
return ;
484
467
}
485
468
486
- long r = requested ;
469
+ long r = requested . get () ;
487
470
boolean unbounded = r == Long .MAX_VALUE ;
488
471
long e = 0 ;
489
472
@@ -508,7 +491,7 @@ void drain() {
508
491
509
492
if (e != 0L ) {
510
493
if (!unbounded ) {
511
- REQUESTED .addAndGet (this , e );
494
+ requested .addAndGet (e );
512
495
}
513
496
parent .s .request (-e );
514
497
}
@@ -519,13 +502,13 @@ void drain() {
519
502
break ;
520
503
}
521
504
if (a == null ) {
522
- a = actual ;
505
+ a = actual . get () ;
523
506
}
524
507
}
525
508
}
526
509
527
510
boolean checkTerminated (boolean d , boolean empty , Subscriber <? super T > a , boolean delayError ) {
528
- if (cancelled != 0 ) {
511
+ if (cancelled . get () ) {
529
512
queue .clear ();
530
513
parent .cancel (key );
531
514
return true ;
0 commit comments