Skip to content

Commit c8be88d

Browse files
Merge pull request ReactiveX#596 from akarnokd/BufferFix1
Fix for buffer not stopping when unsubscribed.
2 parents 65b3f6f + bcc779d commit c8be88d

File tree

3 files changed

+91
-11
lines changed

3 files changed

+91
-11
lines changed

rxjava-core/src/main/java/rx/operators/ChunkedOperation.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public OverlappingChunks(Observer<? super C> observer, Func0<? extends Chunk<T,
148148
* The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record.
149149
* <C> The type of object being tracked by the {@link Chunk}
150150
*/
151-
protected static class TimeAndSizeBasedChunks<T, C> extends Chunks<T, C> {
151+
protected static class TimeAndSizeBasedChunks<T, C> extends Chunks<T, C> implements Subscription {
152152

153153
private final ConcurrentMap<Chunk<T, C>, Subscription> subscriptions = new ConcurrentHashMap<Chunk<T, C>, Subscription>();
154154

@@ -207,6 +207,12 @@ public void pushValue(T value) {
207207
}
208208
}
209209
}
210+
@Override
211+
public void unsubscribe() {
212+
for (Subscription s : subscriptions.values()) {
213+
s.unsubscribe();
214+
}
215+
}
210216
}
211217

212218
/**
@@ -218,7 +224,7 @@ public void pushValue(T value) {
218224
* The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record.
219225
* <C> The type of object being tracked by the {@link Chunk}
220226
*/
221-
protected static class TimeBasedChunks<T, C> extends OverlappingChunks<T, C> {
227+
protected static class TimeBasedChunks<T, C> extends OverlappingChunks<T, C> implements Subscription {
222228

223229
private final ConcurrentMap<Chunk<T, C>, Subscription> subscriptions = new ConcurrentHashMap<Chunk<T, C>, Subscription>();
224230

@@ -250,6 +256,14 @@ public void emitChunk(Chunk<T, C> chunk) {
250256
subscriptions.remove(chunk);
251257
super.emitChunk(chunk);
252258
}
259+
260+
@Override
261+
public void unsubscribe() {
262+
for (Subscription s : subscriptions.values()) {
263+
s.unsubscribe();
264+
}
265+
}
266+
253267
}
254268

255269
/**

rxjava-core/src/main/java/rx/operators/OperationBuffer.java

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
import java.util.List;
1919
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.atomic.AtomicBoolean;
2021

2122
import rx.Observable;
2223
import rx.Observable.OnSubscribeFunc;
2324
import rx.Observer;
2425
import rx.Scheduler;
2526
import rx.Subscription;
2627
import rx.concurrency.Schedulers;
28+
import rx.subscriptions.CompositeSubscription;
2729
import rx.util.functions.Func0;
2830
import rx.util.functions.Func1;
2931

@@ -65,11 +67,14 @@ public static <T, TClosing> OnSubscribeFunc<List<T>> buffer(final Observable<T>
6567
public Subscription onSubscribe(Observer<? super List<T>> observer) {
6668
NonOverlappingChunks<T, List<T>> buffers = new NonOverlappingChunks<T, List<T>>(observer, OperationBuffer.<T> bufferMaker());
6769
ChunkCreator creator = new ObservableBasedSingleChunkCreator<T, List<T>, TClosing>(buffers, bufferClosingSelector);
68-
return source.subscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator));
70+
return new CompositeSubscription(
71+
new ChunkToSubscription(creator),
72+
source.subscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator))
73+
);
6974
}
7075
};
7176
}
72-
77+
7378
/**
7479
* <p>This method creates a {@link Func1} object which represents the buffer operation. This operation takes
7580
* values from the specified {@link Observable} source and stores them in the currently active chunks. Initially
@@ -101,7 +106,10 @@ public static <T, TOpening, TClosing> OnSubscribeFunc<List<T>> buffer(final Obse
101106
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
102107
OverlappingChunks<T, List<T>> buffers = new OverlappingChunks<T, List<T>>(observer, OperationBuffer.<T> bufferMaker());
103108
ChunkCreator creator = new ObservableBasedMultiChunkCreator<T, List<T>, TOpening, TClosing>(buffers, bufferOpenings, bufferClosingSelector);
104-
return source.subscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator));
109+
return new CompositeSubscription(
110+
new ChunkToSubscription(creator),
111+
source.subscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator))
112+
);
105113
}
106114
};
107115
}
@@ -156,7 +164,10 @@ public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<T> source, fi
156164
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
157165
Chunks<T, List<T>> chunks = new SizeBasedChunks<T, List<T>>(observer, OperationBuffer.<T> bufferMaker(), count);
158166
ChunkCreator creator = new SkippingChunkCreator<T, List<T>>(chunks, skip);
159-
return source.subscribe(new ChunkObserver<T, List<T>>(chunks, observer, creator));
167+
return new CompositeSubscription(
168+
new ChunkToSubscription(creator),
169+
source.subscribe(new ChunkObserver<T, List<T>>(chunks, observer, creator))
170+
);
160171
}
161172
};
162173
}
@@ -211,7 +222,10 @@ public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<T> source, fi
211222
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
212223
NonOverlappingChunks<T, List<T>> buffers = new NonOverlappingChunks<T, List<T>>(observer, OperationBuffer.<T> bufferMaker());
213224
ChunkCreator creator = new TimeBasedChunkCreator<T, List<T>>(buffers, timespan, unit, scheduler);
214-
return source.subscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator));
225+
return new CompositeSubscription(
226+
new ChunkToSubscription(creator),
227+
source.subscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator))
228+
);
215229
}
216230
};
217231
}
@@ -270,9 +284,13 @@ public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<T> source, fi
270284
return new OnSubscribeFunc<List<T>>() {
271285
@Override
272286
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
273-
Chunks<T, List<T>> chunks = new TimeAndSizeBasedChunks<T, List<T>>(observer, OperationBuffer.<T> bufferMaker(), count, timespan, unit, scheduler);
287+
TimeAndSizeBasedChunks<T, List<T>> chunks = new TimeAndSizeBasedChunks<T, List<T>>(observer, OperationBuffer.<T> bufferMaker(), count, timespan, unit, scheduler);
274288
ChunkCreator creator = new SingleChunkCreator<T, List<T>>(chunks);
275-
return source.subscribe(new ChunkObserver<T, List<T>>(chunks, observer, creator));
289+
return new CompositeSubscription(
290+
chunks,
291+
new ChunkToSubscription(creator),
292+
source.subscribe(new ChunkObserver<T, List<T>>(chunks, observer, creator))
293+
);
276294
}
277295
};
278296
}
@@ -331,9 +349,13 @@ public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<T> source, fi
331349
return new OnSubscribeFunc<List<T>>() {
332350
@Override
333351
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
334-
OverlappingChunks<T, List<T>> buffers = new TimeBasedChunks<T, List<T>>(observer, OperationBuffer.<T> bufferMaker(), timespan, unit, scheduler);
352+
TimeBasedChunks<T, List<T>> buffers = new TimeBasedChunks<T, List<T>>(observer, OperationBuffer.<T> bufferMaker(), timespan, unit, scheduler);
335353
ChunkCreator creator = new TimeBasedChunkCreator<T, List<T>>(buffers, timeshift, unit, scheduler);
336-
return source.subscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator));
354+
return new CompositeSubscription(
355+
buffers,
356+
new ChunkToSubscription(creator),
357+
source.subscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator))
358+
);
337359
}
338360
};
339361
}
@@ -355,4 +377,24 @@ public List<T> getContents() {
355377
return contents;
356378
}
357379
}
380+
381+
/**
382+
* Converts a chunk creator into a subscription which stops the chunk.
383+
*/
384+
private static class ChunkToSubscription implements Subscription {
385+
private ChunkCreator cc;
386+
private final AtomicBoolean done;
387+
public ChunkToSubscription(ChunkCreator cc) {
388+
this.cc = cc;
389+
this.done = new AtomicBoolean();
390+
}
391+
@Override
392+
public void unsubscribe() {
393+
if (done.compareAndSet(false, true)) {
394+
ChunkCreator cc0 = cc;
395+
cc = null;
396+
cc0.stop();
397+
}
398+
}
399+
}
358400
}

rxjava-core/src/test/java/rx/operators/OperationBufferTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static rx.operators.OperationBuffer.*;
2020

2121
import java.util.ArrayList;
22+
import java.util.Arrays;
2223
import java.util.List;
2324
import java.util.concurrent.CountDownLatch;
2425
import java.util.concurrent.TimeUnit;
@@ -27,6 +28,8 @@
2728
import org.junit.Test;
2829
import org.mockito.InOrder;
2930
import org.mockito.Mockito;
31+
import static org.mockito.Mockito.mock;
32+
import static org.mockito.Mockito.times;
3033

3134
import rx.Observable;
3235
import rx.Observer;
@@ -359,4 +362,25 @@ public void call() {
359362
}
360363
}, delay, TimeUnit.MILLISECONDS);
361364
}
365+
366+
@Test
367+
public void testBufferStopsWhenUnsubscribed1() {
368+
Observable<Integer> source = Observable.never();
369+
370+
Observer<List<Integer>> o = mock(Observer.class);
371+
372+
Subscription s = source.buffer(100, 200, TimeUnit.MILLISECONDS, scheduler).subscribe(o);
373+
374+
InOrder inOrder = Mockito.inOrder(o);
375+
376+
scheduler.advanceTimeBy(1001, TimeUnit.MILLISECONDS);
377+
378+
inOrder.verify(o, times(5)).onNext(Arrays.<Integer>asList());
379+
380+
s.unsubscribe();
381+
382+
scheduler.advanceTimeBy(999, TimeUnit.MILLISECONDS);
383+
384+
inOrder.verifyNoMoreInteractions();
385+
}
362386
}

0 commit comments

Comments
 (0)