Skip to content

Commit 3f6c4fd

Browse files
committed
1.x: fix from(Iterable) error handling of Iterable/Iterator (#3862)
* 1.x: fix from(Iterable) error handling of Iterable/Iterator * Check dead-on-arrival Subscribers * Use n again to avoid a potential cache-miss with get()
1 parent 95389c2 commit 3f6c4fd

File tree

2 files changed

+325
-49
lines changed

2 files changed

+325
-49
lines changed

src/main/java/rx/internal/operators/OnSubscribeFromIterable.java

Lines changed: 94 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import rx.*;
2222
import rx.Observable.OnSubscribe;
23+
import rx.exceptions.Exceptions;
2324

2425
/**
2526
* Converts an {@code Iterable} sequence into an {@code Observable}.
@@ -42,11 +43,25 @@ public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
4243

4344
@Override
4445
public void call(final Subscriber<? super T> o) {
45-
final Iterator<? extends T> it = is.iterator();
46-
if (!it.hasNext() && !o.isUnsubscribed())
47-
o.onCompleted();
48-
else
49-
o.setProducer(new IterableProducer<T>(o, it));
46+
final Iterator<? extends T> it;
47+
boolean b;
48+
49+
try {
50+
it = is.iterator();
51+
52+
b = it.hasNext();
53+
} catch (Throwable ex) {
54+
Exceptions.throwOrReport(ex, o);
55+
return;
56+
}
57+
58+
if (!o.isUnsubscribed()) {
59+
if (!b) {
60+
o.onCompleted();
61+
} else {
62+
o.setProducer(new IterableProducer<T>(o, it));
63+
}
64+
}
5065
}
5166

5267
private static final class IterableProducer<T> extends AtomicLong implements Producer {
@@ -81,55 +96,98 @@ void slowpath(long n) {
8196
final Iterator<? extends T> it = this.it;
8297

8398
long r = n;
84-
while (true) {
85-
/*
86-
* This complicated logic is done to avoid touching the
87-
* volatile `requested` value during the loop itself. If
88-
* it is touched during the loop the performance is
89-
* impacted significantly.
90-
*/
91-
long numToEmit = r;
92-
while (true) {
99+
long e = 0;
100+
101+
for (;;) {
102+
while (e != r) {
93103
if (o.isUnsubscribed()) {
94104
return;
95-
} else if (it.hasNext()) {
96-
if (--numToEmit >= 0) {
97-
o.onNext(it.next());
98-
} else
99-
break;
100-
} else if (!o.isUnsubscribed()) {
101-
o.onCompleted();
105+
}
106+
107+
T value;
108+
109+
try {
110+
value = it.next();
111+
} catch (Throwable ex) {
112+
Exceptions.throwOrReport(ex, o);
102113
return;
103-
} else {
104-
// is unsubscribed
114+
}
115+
116+
o.onNext(value);
117+
118+
if (o.isUnsubscribed()) {
105119
return;
106120
}
121+
122+
boolean b;
123+
124+
try {
125+
b = it.hasNext();
126+
} catch (Throwable ex) {
127+
Exceptions.throwOrReport(ex, o);
128+
return;
129+
}
130+
131+
if (!b) {
132+
if (!o.isUnsubscribed()) {
133+
o.onCompleted();
134+
}
135+
return;
136+
}
137+
138+
e++;
107139
}
108-
r = addAndGet(-r);
109-
if (r == 0L) {
110-
// we're done emitting the number requested so
111-
// return
112-
return;
140+
141+
r = get();
142+
if (e == r) {
143+
r = BackpressureUtils.produced(this, e);
144+
if (r == 0L) {
145+
break;
146+
}
147+
e = 0L;
113148
}
114-
115149
}
150+
116151
}
117152

118153
void fastpath() {
119154
// fast-path without backpressure
120155
final Subscriber<? super T> o = this.o;
121156
final Iterator<? extends T> it = this.it;
122157

123-
while (true) {
158+
for (;;) {
124159
if (o.isUnsubscribed()) {
125160
return;
126-
} else if (it.hasNext()) {
127-
o.onNext(it.next());
128-
} else if (!o.isUnsubscribed()) {
129-
o.onCompleted();
161+
}
162+
163+
T value;
164+
165+
try {
166+
value = it.next();
167+
} catch (Throwable ex) {
168+
Exceptions.throwOrReport(ex, o);
169+
return;
170+
}
171+
172+
o.onNext(value);
173+
174+
if (o.isUnsubscribed()) {
130175
return;
131-
} else {
132-
// is unsubscribed
176+
}
177+
178+
boolean b;
179+
180+
try {
181+
b = it.hasNext();
182+
} catch (Throwable ex) {
183+
Exceptions.throwOrReport(ex, o);
184+
return;
185+
}
186+
187+
if (!b) {
188+
if (!o.isUnsubscribed()) {
189+
o.onCompleted();
190+
}
133191
return;
134192
}
135193
}

0 commit comments

Comments
 (0)