25
25
import io .reactivex .internal .subscriptions .*;
26
26
import io .reactivex .internal .util .BackpressureHelper ;
27
27
import io .reactivex .observables .GroupedObservable ;
28
+ import io .reactivex .plugins .RxJavaPlugins ;
28
29
29
30
public final class OperatorGroupBy <T , K , V > implements Operator <GroupedObservable <K , V >, T >{
30
31
final Function <? super T , ? extends K > keySelector ;
@@ -44,7 +45,9 @@ public Subscriber<? super T> apply(Subscriber<? super GroupedObservable<K, V>> t
44
45
return new GroupBySubscriber <>(t , keySelector , valueSelector , bufferSize , delayError );
45
46
}
46
47
47
- public static final class GroupBySubscriber <T , K , V > extends AtomicInteger implements Subscriber <T >, Subscription {
48
+ public static final class GroupBySubscriber <T , K , V >
49
+ extends AtomicInteger
50
+ implements Subscriber <T >, Subscription {
48
51
/** */
49
52
private static final long serialVersionUID = -3688291656102519502L ;
50
53
@@ -54,6 +57,7 @@ public static final class GroupBySubscriber<T, K, V> extends AtomicInteger imple
54
57
final int bufferSize ;
55
58
final boolean delayError ;
56
59
final Map <Object , GroupedUnicast <K , V >> groups ;
60
+ final Queue <GroupedObservable <K , V >> queue ;
57
61
58
62
static final Object NULL_KEY = new Object ();
59
63
@@ -64,14 +68,28 @@ public static final class GroupBySubscriber<T, K, V> extends AtomicInteger imple
64
68
static final AtomicIntegerFieldUpdater <GroupBySubscriber > CANCELLED =
65
69
AtomicIntegerFieldUpdater .newUpdater (GroupBySubscriber .class , "cancelled" );
66
70
71
+ volatile long requested ;
72
+ @ SuppressWarnings ("rawtypes" )
73
+ static final AtomicLongFieldUpdater <GroupBySubscriber > REQUESTED =
74
+ AtomicLongFieldUpdater .newUpdater (GroupBySubscriber .class , "requested" );
75
+
76
+ volatile int groupCount ;
77
+ @ SuppressWarnings ("rawtypes" )
78
+ static final AtomicIntegerFieldUpdater <GroupBySubscriber > GROUP_COUNT =
79
+ AtomicIntegerFieldUpdater .newUpdater (GroupBySubscriber .class , "groupCount" );
80
+
81
+ Throwable error ;
82
+ volatile boolean done ;
83
+
67
84
public GroupBySubscriber (Subscriber <? super GroupedObservable <K , V >> actual , Function <? super T , ? extends K > keySelector , Function <? super T , ? extends V > valueSelector , int bufferSize , boolean delayError ) {
68
85
this .actual = actual ;
69
86
this .keySelector = keySelector ;
70
87
this .valueSelector = valueSelector ;
71
88
this .bufferSize = bufferSize ;
72
89
this .delayError = delayError ;
73
90
this .groups = new ConcurrentHashMap <>();
74
- this .lazySet (1 );
91
+ this .queue = new SpscLinkedArrayQueue <>(bufferSize );
92
+ GROUP_COUNT .lazySet (this , 1 );
75
93
}
76
94
77
95
@ Override
@@ -82,16 +100,24 @@ public void onSubscribe(Subscription s) {
82
100
83
101
this .s = s ;
84
102
actual .onSubscribe (this );
103
+ s .request (bufferSize );
85
104
}
86
105
87
106
@ Override
88
107
public void onNext (T t ) {
108
+ if (done ) {
109
+ return ;
110
+ }
111
+
112
+ final Queue <GroupedObservable <K , V >> q = this .queue ;
113
+ final Subscriber <? super GroupedObservable <K , V >> a = this .actual ;
114
+
89
115
K key ;
90
116
try {
91
117
key = keySelector .apply (t );
92
- } catch (Throwable e ) {
118
+ } catch (Throwable ex ) {
93
119
s .cancel ();
94
- onError ( e );
120
+ errorAll ( a , q , ex );
95
121
return ;
96
122
}
97
123
@@ -101,71 +127,74 @@ public void onNext(T t) {
101
127
if (group == null ) {
102
128
// if the main has been cancelled, stop creating groups
103
129
// and skip this value
104
- if (cancelled != 0 ) {
105
- s .request (1 );
130
+ if (cancelled == 0 ) {
131
+ group = GroupedUnicast .createWith (key , bufferSize , this , delayError );
132
+ groups .put (mapKey , group );
133
+
134
+ GROUP_COUNT .getAndIncrement (this );
135
+
136
+ notNew = false ;
137
+ q .offer (group );
138
+ drain ();
139
+ } else {
106
140
return ;
107
141
}
108
- notNew = true ;
109
-
110
- group = GroupedUnicast .createWith (key , bufferSize , this , delayError );
111
- groups .put (mapKey , group );
112
-
113
- getAndIncrement ();
114
-
115
- actual .onNext (group );
116
142
}
117
143
118
144
V v ;
119
145
try {
120
146
v = valueSelector .apply (t );
121
- } catch (Throwable e ) {
147
+ } catch (Throwable ex ) {
122
148
s .cancel ();
123
- onError ( e );
149
+ errorAll ( a , q , ex );
124
150
return ;
125
151
}
126
152
127
153
group .onNext (v );
128
-
154
+
129
155
if (notNew ) {
130
- s .request (1 ); // we spent this t on an existing group, request one more
156
+ s .request (1 );
131
157
}
132
158
}
133
159
134
160
@ Override
135
161
public void onError (Throwable t ) {
136
- List <GroupedUnicast <K , V >> list = new ArrayList <>(groups .values ());
137
- groups .clear ();
138
-
139
- for (GroupedUnicast <K , V > e : list ) {
140
- e .onError (t );
162
+ if (done ) {
163
+ RxJavaPlugins .onError (t );
164
+ return ;
141
165
}
142
-
143
- actual .onError (t );
166
+ error = t ;
167
+ done = true ;
168
+ GROUP_COUNT .decrementAndGet (this );
169
+ drain ();
144
170
}
145
171
146
172
@ Override
147
173
public void onComplete () {
148
- List <GroupedUnicast <K , V >> list = new ArrayList <>(groups .values ());
149
- groups .clear ();
150
-
151
- for (GroupedUnicast <K , V > e : list ) {
152
- e .onComplete ();
174
+ if (done ) {
175
+ return ;
153
176
}
154
-
155
- actual .onComplete ();
177
+ done = true ;
178
+ GROUP_COUNT .decrementAndGet (this );
179
+ drain ();
156
180
}
157
181
158
182
@ Override
159
183
public void request (long n ) {
160
- s .request (n );
184
+ if (SubscriptionHelper .validateRequest (n )) {
185
+ return ;
186
+ }
187
+
188
+ BackpressureHelper .add (REQUESTED , this , n );
189
+ drain ();
161
190
}
162
191
163
192
@ Override
164
193
public void cancel () {
165
194
// cancelling the main source means we don't want any more groups
166
195
// but running groups still require new values
167
196
if (CANCELLED .compareAndSet (this , 0 , 1 )) {
168
- if (decrementAndGet () == 0 ) {
197
+ if (GROUP_COUNT . decrementAndGet (this ) == 0 ) {
169
198
s .cancel ();
170
199
}
171
200
}
@@ -174,10 +203,100 @@ public void cancel() {
174
203
public void cancel (K key ) {
175
204
Object mapKey = key != null ? key : NULL_KEY ;
176
205
groups .remove (mapKey );
177
- if (decrementAndGet () == 0 ) {
206
+ if (GROUP_COUNT . decrementAndGet (this ) == 0 ) {
178
207
s .cancel ();
179
208
}
180
209
}
210
+
211
+ void drain () {
212
+ if (getAndIncrement () != 0 ) {
213
+ return ;
214
+ }
215
+
216
+ int missed = 1 ;
217
+
218
+ final Queue <GroupedObservable <K , V >> q = this .queue ;
219
+ final Subscriber <? super GroupedObservable <K , V >> a = this .actual ;
220
+
221
+ for (;;) {
222
+
223
+ if (checkTerminated (done , q .isEmpty (), a , q )) {
224
+ return ;
225
+ }
226
+
227
+ long r = requested ;
228
+ boolean unbounded = r == Long .MAX_VALUE ;
229
+ long e = 0L ;
230
+
231
+ while (r != 0 ) {
232
+ boolean d = done ;
233
+
234
+ GroupedObservable <K , V > t = q .poll ();
235
+
236
+ boolean empty = t == null ;
237
+
238
+ if (checkTerminated (d , empty , a , q )) {
239
+ return ;
240
+ }
241
+
242
+ if (empty ) {
243
+ break ;
244
+ }
245
+
246
+ a .onNext (t );
247
+
248
+ r --;
249
+ e --;
250
+ }
251
+
252
+ if (e != 0L ) {
253
+ if (!unbounded ) {
254
+ REQUESTED .addAndGet (this , e );
255
+ }
256
+ s .request (-e );
257
+ }
258
+
259
+ missed = addAndGet (-missed );
260
+ if (missed == 0 ) {
261
+ break ;
262
+ }
263
+ }
264
+ }
265
+
266
+ void errorAll (Subscriber <? super GroupedObservable <K , V >> a , Queue <?> q , Throwable ex ) {
267
+ q .clear ();
268
+ List <GroupedUnicast <K , V >> list = new ArrayList <>(groups .values ());
269
+ groups .clear ();
270
+
271
+ for (GroupedUnicast <K , V > e : list ) {
272
+ e .onError (ex );
273
+ }
274
+
275
+ a .onError (ex );
276
+ }
277
+
278
+ boolean checkTerminated (boolean d , boolean empty ,
279
+ Subscriber <? super GroupedObservable <K , V >> a , Queue <?> q ) {
280
+ if (d ) {
281
+ Throwable err = error ;
282
+ if (err != null ) {
283
+ errorAll (a , q , err );
284
+ return true ;
285
+ } else
286
+ if (empty ) {
287
+ List <GroupedUnicast <K , V >> list = new ArrayList <>(groups .values ());
288
+ groups .clear ();
289
+
290
+ for (GroupedUnicast <K , V > e : list ) {
291
+ e .onComplete ();
292
+ }
293
+
294
+ actual .onComplete ();
295
+ return true ;
296
+ }
297
+ }
298
+ return false ;
299
+ }
181
300
}
182
301
183
302
static final class GroupedUnicast <K , T > extends GroupedObservable <K , T > {
@@ -233,7 +352,12 @@ static final class State<T, K> extends AtomicInteger implements Subscription, Pu
233
352
@ SuppressWarnings ("rawtypes" )
234
353
static final AtomicReferenceFieldUpdater <State , Subscriber > ACTUAL =
235
354
AtomicReferenceFieldUpdater .newUpdater (State .class , Subscriber .class , "actual" );
236
-
355
+
356
+ volatile int once ;
357
+ @ SuppressWarnings ("rawtypes" )
358
+ static final AtomicIntegerFieldUpdater <State > ONCE =
359
+ AtomicIntegerFieldUpdater .newUpdater (State .class , "once" );
360
+
237
361
public State (int bufferSize , GroupBySubscriber <?, K , T > parent , K key , boolean delayError ) {
238
362
this .queue = new SpscLinkedArrayQueue <>(bufferSize );
239
363
this .parent = parent ;
@@ -247,7 +371,6 @@ public void request(long n) {
247
371
return ;
248
372
}
249
373
BackpressureHelper .add (REQUESTED , this , n );
250
- parent .request (n );
251
374
drain ();
252
375
}
253
376
@@ -262,8 +385,10 @@ public void cancel() {
262
385
263
386
@ Override
264
387
public void subscribe (Subscriber <? super T > s ) {
265
- if (ACTUAL .compareAndSet (this , null , s )) {
388
+ if (ONCE .compareAndSet (this , 0 , 1 )) {
266
389
s .onSubscribe (this );
390
+ ACTUAL .lazySet (this , s );
391
+ drain ();
267
392
} else {
268
393
EmptySubscription .error (new IllegalStateException ("Only one Subscriber allowed!" ), s );
269
394
}
@@ -332,6 +457,7 @@ void drain() {
332
457
if (!unbounded ) {
333
458
REQUESTED .addAndGet (this , e );
334
459
}
460
+ parent .s .request (-e );
335
461
}
336
462
}
337
463
0 commit comments