Skip to content

Commit b7e384e

Browse files
authored
JAVA-2934: Handle empty non-final pages in ReactiveResultSetSubscription (#1544)
1 parent eea59c6 commit b7e384e

File tree

4 files changed

+46
-5
lines changed

4 files changed

+46
-5
lines changed

changelog/README.md

+4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
<!-- Note: contrary to 3.x, insert new entries *first* in their section -->
44

5+
### 4.12.0 (in progress)
6+
7+
- [bug] JAVA-2934: Handle empty non-final pages in ReactiveResultSetSubscription
8+
59
### 4.11.0
610

711
- [improvement] JAVA-2930: Allow Micrometer to record histograms for timers

core/src/main/java/com/datastax/dse/driver/internal/core/cql/reactive/ReactiveResultSetSubscription.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -277,13 +277,10 @@ private Object tryNext() {
277277
if (pages.poll() == null) {
278278
throw new AssertionError("Queue is empty, this should not happen");
279279
}
280-
current = pages.peek();
281280
// if the next page is readily available,
282281
// serve its first row now, no need to wait
283282
// for the next drain.
284-
if (current != null && current.hasMoreRows()) {
285-
return current.nextRow();
286-
}
283+
return tryNext();
287284
}
288285
}
289286
// No items available right now.

core/src/test/java/com/datastax/dse/driver/internal/core/cql/reactive/ReactiveResultSetSubscriptionTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,34 @@ public void should_report_error_on_intermediary_page() {
145145
assertThat(wasAppliedSubscriber.getElements()).hasSize(1).containsExactly(true);
146146
assertThat(wasAppliedSubscriber.getError()).isNull();
147147
}
148+
149+
@Test
150+
public void should_handle_empty_non_final_pages() {
151+
CompletableFuture<AsyncResultSet> future1 = new CompletableFuture<>();
152+
CompletableFuture<AsyncResultSet> future2 = new CompletableFuture<>();
153+
CompletableFuture<AsyncResultSet> future3 = new CompletableFuture<>();
154+
MockAsyncResultSet page1 = new MockAsyncResultSet(10, future2);
155+
MockAsyncResultSet page2 = new MockAsyncResultSet(0, future3);
156+
MockAsyncResultSet page3 = new MockAsyncResultSet(10, null);
157+
TestSubscriber<ReactiveRow> mainSubscriber = new TestSubscriber<>(1);
158+
TestSubscriber<ColumnDefinitions> colDefsSubscriber = new TestSubscriber<>();
159+
TestSubscriber<ExecutionInfo> execInfosSubscriber = new TestSubscriber<>();
160+
TestSubscriber<Boolean> wasAppliedSubscriber = new TestSubscriber<>();
161+
ReactiveResultSetSubscription<AsyncResultSet> subscription =
162+
new ReactiveResultSetSubscription<>(
163+
mainSubscriber, colDefsSubscriber, execInfosSubscriber, wasAppliedSubscriber);
164+
mainSubscriber.onSubscribe(subscription);
165+
subscription.start(() -> future1);
166+
future1.complete(page1);
167+
future2.complete(page2);
168+
// emulate backpressure
169+
subscription.request(1);
170+
future3.complete(page3);
171+
subscription.request(Long.MAX_VALUE);
172+
mainSubscriber.awaitTermination();
173+
assertThat(mainSubscriber.getError()).isNull();
174+
List<Row> expected = new ArrayList<>(page1.currentPage());
175+
expected.addAll(page3.currentPage());
176+
assertThat(mainSubscriber.getElements()).hasSize(20).extracting("row").isEqualTo(expected);
177+
}
148178
}

core/src/test/java/com/datastax/dse/driver/internal/core/cql/reactive/TestSubscriber.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,25 @@ public class TestSubscriber<T> implements Subscriber<T> {
3131

3232
private final List<T> elements = new ArrayList<>();
3333
private final CountDownLatch latch = new CountDownLatch(1);
34+
private final long demand;
3435
private Subscription subscription;
3536
private Throwable error;
3637

38+
public TestSubscriber() {
39+
this.demand = Long.MAX_VALUE;
40+
}
41+
42+
public TestSubscriber(long demand) {
43+
this.demand = demand;
44+
}
45+
3746
@Override
3847
public void onSubscribe(Subscription s) {
3948
if (subscription != null) {
4049
fail("already subscribed");
4150
}
4251
subscription = s;
43-
s.request(Long.MAX_VALUE);
52+
subscription.request(demand);
4453
}
4554

4655
@Override
@@ -71,5 +80,6 @@ public List<T> getElements() {
7180

7281
public void awaitTermination() {
7382
Uninterruptibles.awaitUninterruptibly(latch, 1, TimeUnit.MINUTES);
83+
if (latch.getCount() > 0) fail("subscriber not terminated");
7484
}
7585
}

0 commit comments

Comments
 (0)