@@ -177,7 +177,10 @@ public void request(long n) {
177
177
178
178
}
179
179
180
- private static final class Zip <R > extends AtomicLong {
180
+ static final class Zip <R > extends AtomicLong {
181
+ /** */
182
+ private static final long serialVersionUID = 5995274816189928317L ;
183
+
181
184
final Observer <? super R > child ;
182
185
private final FuncN <? extends R > zipFunction ;
183
186
private final CompositeSubscription childSubscription = new CompositeSubscription ();
@@ -186,7 +189,7 @@ private static final class Zip<R> extends AtomicLong {
186
189
int emitted = 0 ; // not volatile/synchronized as accessed inside COUNTER_UPDATER block
187
190
188
191
/* initialized when started in `start` */
189
- private Object [] observers ;
192
+ private volatile Object [] observers ;
190
193
private AtomicLong requested ;
191
194
192
195
public Zip (final Subscriber <? super R > child , FuncN <? extends R > zipFunction ) {
@@ -197,14 +200,16 @@ public Zip(final Subscriber<? super R> child, FuncN<? extends R> zipFunction) {
197
200
198
201
@ SuppressWarnings ("unchecked" )
199
202
public void start (@ SuppressWarnings ("rawtypes" ) Observable [] os , AtomicLong requested ) {
200
- observers = new Object [os .length ];
201
- this .requested = requested ;
203
+ Object [] observers = new Object [os .length ];
202
204
for (int i = 0 ; i < os .length ; i ++) {
203
205
InnerSubscriber io = new InnerSubscriber ();
204
206
observers [i ] = io ;
205
207
childSubscription .add (io );
206
208
}
207
-
209
+
210
+ this .requested = requested ;
211
+ this .observers = observers ; // full memory barrier: release all above
212
+
208
213
for (int i = 0 ; i < os .length ; i ++) {
209
214
os [i ].unsafeSubscribe ((InnerSubscriber ) observers [i ]);
210
215
}
0 commit comments