Skip to content

Commit 3bfc275

Browse files
authored
2.x: add ParallelFlowable.sequentialDelayError (#5117)
* 2.x: add ParallelFlowable.sequentialDelayError * Fix javadoc, make sure failed rails are ignored.
1 parent 421c5bb commit 3bfc275

File tree

6 files changed

+567
-58
lines changed

6 files changed

+567
-58
lines changed

src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java

Lines changed: 47 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,10 @@ static final class ParallelDispatcher<T>
101101
this.subscribers = subscribers;
102102
this.prefetch = prefetch;
103103
this.limit = prefetch - (prefetch >> 2);
104-
this.requests = new AtomicLongArray(subscribers.length);
105-
this.emissions = new long[subscribers.length];
104+
int m = subscribers.length;
105+
this.requests = new AtomicLongArray(m + m + 1);
106+
this.requests.lazySet(m + m, m);
107+
this.emissions = new long[m];
106108
}
107109

108110
@Override
@@ -145,42 +147,56 @@ public void onSubscribe(Subscription s) {
145147
}
146148

147149
void setupSubscribers() {
148-
final int m = subscribers.length;
150+
Subscriber<? super T>[] subs = subscribers;
151+
final int m = subs.length;
149152

150153
for (int i = 0; i < m; i++) {
151154
if (cancelled) {
152155
return;
153156
}
154-
final int j = i;
155157

156158
subscriberCount.lazySet(i + 1);
157159

158-
subscribers[i].onSubscribe(new Subscription() {
159-
@Override
160-
public void request(long n) {
161-
if (SubscriptionHelper.validate(n)) {
162-
AtomicLongArray ra = requests;
163-
for (;;) {
164-
long r = ra.get(j);
165-
if (r == Long.MAX_VALUE) {
166-
return;
167-
}
168-
long u = BackpressureHelper.addCap(r, n);
169-
if (ra.compareAndSet(j, r, u)) {
170-
break;
171-
}
172-
}
173-
if (subscriberCount.get() == m) {
174-
drain();
175-
}
160+
subs[i].onSubscribe(new RailSubscription(i, m));
161+
}
162+
}
163+
164+
final class RailSubscription implements Subscription {
165+
166+
final int j;
167+
168+
final int m;
169+
170+
RailSubscription(int j, int m) {
171+
this.j = j;
172+
this.m = m;
173+
}
174+
175+
@Override
176+
public void request(long n) {
177+
if (SubscriptionHelper.validate(n)) {
178+
AtomicLongArray ra = requests;
179+
for (;;) {
180+
long r = ra.get(j);
181+
if (r == Long.MAX_VALUE) {
182+
return;
183+
}
184+
long u = BackpressureHelper.addCap(r, n);
185+
if (ra.compareAndSet(j, r, u)) {
186+
break;
176187
}
177188
}
178-
179-
@Override
180-
public void cancel() {
181-
ParallelDispatcher.this.cancel();
189+
if (subscriberCount.get() == m) {
190+
drain();
182191
}
183-
});
192+
}
193+
}
194+
195+
@Override
196+
public void cancel() {
197+
if (requests.compareAndSet(m + j, 0L, 1L)) {
198+
ParallelDispatcher.this.cancel(m + m);
199+
}
184200
}
185201
}
186202

@@ -209,8 +225,8 @@ public void onComplete() {
209225
drain();
210226
}
211227

212-
void cancel() {
213-
if (!cancelled) {
228+
void cancel(int m) {
229+
if (requests.decrementAndGet(m) == 0L) {
214230
cancelled = true;
215231
this.s.cancel();
216232

@@ -268,7 +284,7 @@ void drainAsync() {
268284

269285
long ridx = r.get(idx);
270286
long eidx = e[idx];
271-
if (ridx != eidx) {
287+
if (ridx != eidx && r.get(n + idx) == 0) {
272288

273289
T v;
274290

@@ -356,7 +372,7 @@ void drainSync() {
356372

357373
long ridx = r.get(idx);
358374
long eidx = e[idx];
359-
if (ridx != eidx) {
375+
if (ridx != eidx && r.get(n + idx) == 0) {
360376

361377
T v;
362378

0 commit comments

Comments
 (0)