Skip to content

Commit 43471c0

Browse files
committed
JAVA-2934: Handle empty non-final pages in ReactiveResultSetSubscription
1 parent eea59c6 commit 43471c0

File tree

4 files changed

+41
-5
lines changed

4 files changed

+41
-5
lines changed

changelog/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
<!-- Note: contrary to 3.x, insert new entries *first* in their section -->
44

5+
### 4.12.0 (in progress)
6+
7+
- [bug] JAVA-2934: Handle empty non-final pages in ReactiveResultSetSubscription
8+
59
### 4.11.0
610

711
- [improvement] JAVA-2930: Allow Micrometer to record histograms for timers

core/src/main/java/com/datastax/dse/driver/internal/core/cql/reactive/ReactiveResultSetSubscription.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -277,13 +277,10 @@ private Object tryNext() {
277277
if (pages.poll() == null) {
278278
throw new AssertionError("Queue is empty, this should not happen");
279279
}
280-
current = pages.peek();
281280
// if the next page is readily available,
282281
// serve its first row now, no need to wait
283282
// for the next drain.
284-
if (current != null && current.hasMoreRows()) {
285-
return current.nextRow();
286-
}
283+
return tryNext();
287284
}
288285
}
289286
// No items available right now.

core/src/test/java/com/datastax/dse/driver/internal/core/cql/reactive/ReactiveResultSetSubscriptionTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,30 @@ public void should_report_error_on_intermediary_page() {
145145
assertThat(wasAppliedSubscriber.getElements()).hasSize(1).containsExactly(true);
146146
assertThat(wasAppliedSubscriber.getError()).isNull();
147147
}
148+
149+
@Test
150+
public void should_handle_empty_non_final_pages() {
151+
CompletableFuture<AsyncResultSet> future1 = new CompletableFuture<>();
152+
CompletableFuture<AsyncResultSet> future2 = new CompletableFuture<>();
153+
CompletableFuture<AsyncResultSet> future3 = new CompletableFuture<>();
154+
MockAsyncResultSet page1 = new MockAsyncResultSet(10, future2);
155+
MockAsyncResultSet page2 = new MockAsyncResultSet(0, future3);
156+
MockAsyncResultSet page3 = new MockAsyncResultSet(0, null);
157+
TestSubscriber<ReactiveRow> mainSubscriber = new TestSubscriber<>(1);
158+
TestSubscriber<ColumnDefinitions> colDefsSubscriber = new TestSubscriber<>();
159+
TestSubscriber<ExecutionInfo> execInfosSubscriber = new TestSubscriber<>();
160+
TestSubscriber<Boolean> wasAppliedSubscriber = new TestSubscriber<>();
161+
ReactiveResultSetSubscription<AsyncResultSet> subscription =
162+
new ReactiveResultSetSubscription<>(
163+
mainSubscriber, colDefsSubscriber, execInfosSubscriber, wasAppliedSubscriber);
164+
mainSubscriber.onSubscribe(subscription);
165+
subscription.start(() -> future1);
166+
future1.complete(page1);
167+
future2.complete(page2);
168+
// emulate backpressure
169+
subscription.request(1);
170+
future3.complete(page3);
171+
subscription.request(Long.MAX_VALUE);
172+
mainSubscriber.awaitTermination();
173+
}
148174
}

core/src/test/java/com/datastax/dse/driver/internal/core/cql/reactive/TestSubscriber.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,25 @@ public class TestSubscriber<T> implements Subscriber<T> {
3131

3232
private final List<T> elements = new ArrayList<>();
3333
private final CountDownLatch latch = new CountDownLatch(1);
34+
private long demand;
3435
private Subscription subscription;
3536
private Throwable error;
3637

38+
public TestSubscriber() {
39+
this.demand = Long.MAX_VALUE;
40+
}
41+
42+
public TestSubscriber(long demand) {
43+
this.demand = demand;
44+
}
45+
3746
@Override
3847
public void onSubscribe(Subscription s) {
3948
if (subscription != null) {
4049
fail("already subscribed");
4150
}
4251
subscription = s;
43-
s.request(Long.MAX_VALUE);
52+
subscription.request(demand);
4453
}
4554

4655
@Override

0 commit comments

Comments
 (0)