diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java index c3ec067d01..cf4ee10734 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java @@ -122,10 +122,13 @@ public CompletionStage runAsync(Query query, TransactionConfig con .thenApply(cursor -> cursor); // convert the return type } - public CompletionStage runRx(Query query, TransactionConfig config) { + public CompletionStage runRx( + Query query, TransactionConfig config, CompletionStage cursorPublishStage) { var newResultCursorStage = buildResultCursorFactory(query, config).thenCompose(ResultCursorFactory::rxResult); - resultCursorStage = newResultCursorStage.exceptionally(error -> null); + resultCursorStage = newResultCursorStage + .thenCompose(cursor -> cursor == null ? CompletableFuture.completedFuture(null) : cursorPublishStage) + .exceptionally(throwable -> null); return newResultCursorStage; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorFactoryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorFactoryImpl.java index 880be18f3a..c7f51952b5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorFactoryImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorFactoryImpl.java @@ -74,6 +74,7 @@ public CompletionStage asyncResult() { @Override public CompletionStage rxResult() { connection.writeAndFlush(runMessage, runHandler); - return runFuture.handle((ignored, error) -> new RxResultCursorImpl(error, runHandler, pullHandler)); + return runFuture.handle( + (ignored, error) -> new RxResultCursorImpl(error, runHandler, pullHandler, connection::release)); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursor.java index d1821b3976..55149e0164 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursor.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursor.java @@ -36,4 +36,13 @@ public interface RxResultCursor extends Subscription, FailableCursor { boolean isDone(); Throwable getRunError(); + + /** + * Rolls back this instance by releasing connection with RESET. + *

+ * This must never be called on a published instance. + * @return reset completion stage + * @since 5.11 + */ + CompletionStage rollback(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java index f2f90443bb..cca63f96c5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.BiConsumer; +import java.util.function.Supplier; import org.neo4j.driver.Record; import org.neo4j.driver.exceptions.TransactionNestingException; import org.neo4j.driver.internal.handlers.RunResponseHandler; @@ -41,23 +42,30 @@ public class RxResultCursorImpl implements RxResultCursor { private final RunResponseHandler runHandler; private final PullResponseHandler pullHandler; private final Throwable runResponseError; + private final Supplier> connectionReleaseSupplier; private boolean runErrorSurfaced; private final CompletableFuture summaryFuture = new CompletableFuture<>(); private boolean summaryFutureExposed; private boolean resultConsumed; private RecordConsumerStatus consumerStatus = NOT_INSTALLED; + // for testing only public RxResultCursorImpl(RunResponseHandler runHandler, PullResponseHandler pullHandler) { - this(null, runHandler, pullHandler); + this(null, runHandler, pullHandler, () -> CompletableFuture.completedFuture(null)); } - public RxResultCursorImpl(Throwable runError, RunResponseHandler runHandler, PullResponseHandler pullHandler) { + public RxResultCursorImpl( + Throwable runError, + RunResponseHandler runHandler, + PullResponseHandler pullHandler, + Supplier> connectionReleaseSupplier) { Objects.requireNonNull(runHandler); Objects.requireNonNull(pullHandler); this.runResponseError = runError; this.runHandler = runHandler; this.pullHandler = pullHandler; + this.connectionReleaseSupplier = connectionReleaseSupplier; installSummaryConsumer(); } @@ -130,6 +138,12 @@ public Throwable getRunError() { return runResponseError; } + @Override + public CompletionStage rollback() { + summaryFuture.complete(null); + return connectionReleaseSupplier.get(); + } + public CompletionStage summaryStage() { if (!isDone() && !resultConsumed) // the summary is called before record streaming { diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java index 97aff83e06..d03144c313 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java @@ -23,14 +23,19 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.neo4j.driver.AccessMode; import org.neo4j.driver.Bookmark; +import org.neo4j.driver.Query; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.TransactionNestingException; import org.neo4j.driver.internal.async.NetworkSession; import org.neo4j.driver.internal.async.UnmanagedTransaction; +import org.neo4j.driver.internal.cursor.RxResultCursor; import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.reactive.RxResult; import org.neo4j.driver.reactivestreams.ReactiveResult; @@ -142,6 +147,73 @@ public Set lastBookmarks() { return session.lastBookmarks(); } + protected Publisher run(Query query, TransactionConfig config, Function cursorToResult) { + var cursorPublishFuture = new CompletableFuture(); + var cursorReference = new AtomicReference(); + + return createSingleItemPublisher( + () -> runAsStage(query, config, cursorPublishFuture) + .thenApply(cursor -> { + cursorReference.set(cursor); + return cursor; + }) + .thenApply(cursorToResult), + () -> new IllegalStateException( + "Unexpected condition, run call has completed successfully with result being null"), + value -> { + if (value != null) { + cursorReference.get().rollback().whenComplete((unused, throwable) -> { + if (throwable != null) { + cursorPublishFuture.completeExceptionally(throwable); + } else { + cursorPublishFuture.complete(null); + } + }); + } + }) + .doOnNext(value -> cursorPublishFuture.complete(cursorReference.get())) + .doOnError(cursorPublishFuture::completeExceptionally); + } + + private CompletionStage runAsStage( + Query query, TransactionConfig config, CompletionStage finalStage) { + CompletionStage cursorStage; + try { + cursorStage = session.runRx(query, config, finalStage); + } catch (Throwable t) { + cursorStage = Futures.failedFuture(t); + } + + return cursorStage + .handle((cursor, throwable) -> { + if (throwable != null) { + return this.releaseConnectionAndRethrow(throwable); + } else { + var runError = cursor.getRunError(); + if (runError != null) { + return this.releaseConnectionAndRethrow(runError); + } else { + return CompletableFuture.completedFuture(cursor); + } + } + }) + .thenCompose(stage -> stage); + } + + private CompletionStage releaseConnectionAndRethrow(Throwable throwable) { + return session.releaseConnectionAsync().handle((ignored, releaseThrowable) -> { + if (releaseThrowable != null) { + throw Futures.combineErrors(throwable, releaseThrowable); + } else { + if (throwable instanceof RuntimeException e) { + throw e; + } else { + throw new CompletionException(throwable); + } + } + }); + } + protected Publisher doClose() { return createEmptyPublisher(session::closeAsync); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java index e7b7e4682b..67fb3736ac 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java @@ -23,7 +23,6 @@ import java.util.HashSet; import java.util.Set; -import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow.Publisher; import org.neo4j.driver.AccessMode; import org.neo4j.driver.Bookmark; @@ -31,13 +30,10 @@ import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.internal.async.NetworkSession; import org.neo4j.driver.internal.async.UnmanagedTransaction; -import org.neo4j.driver.internal.cursor.RxResultCursor; -import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.reactive.ReactiveResult; import org.neo4j.driver.reactive.ReactiveSession; import org.neo4j.driver.reactive.ReactiveTransaction; import org.neo4j.driver.reactive.ReactiveTransactionCallback; -import reactor.core.publisher.Mono; public class InternalReactiveSession extends AbstractReactiveSession implements ReactiveSession, BaseReactiveQueryRunner { @@ -89,30 +85,7 @@ public Publisher run(Query query) { @Override public Publisher run(Query query, TransactionConfig config) { - CompletionStage cursorStage; - try { - cursorStage = session.runRx(query, config); - } catch (Throwable t) { - cursorStage = Futures.failedFuture(t); - } - - return publisherToFlowPublisher(Mono.fromCompletionStage(cursorStage) - .onErrorResume(error -> Mono.fromCompletionStage(session.releaseConnectionAsync()) - .onErrorMap(releaseError -> Futures.combineErrors(error, releaseError)) - .then(Mono.error(error))) - .flatMap(cursor -> { - Mono publisher; - var runError = cursor.getRunError(); - if (runError != null) { - publisher = Mono.fromCompletionStage(session.releaseConnectionAsync()) - .onErrorMap(releaseError -> Futures.combineErrors(runError, releaseError)) - .then(Mono.error(runError)); - } else { - publisher = Mono.just(cursor); - } - return publisher; - }) - .map(InternalReactiveResult::new)); + return publisherToFlowPublisher(run(query, config, InternalReactiveResult::new)); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java index aa52e2ae6e..e135ffa187 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java @@ -96,7 +96,7 @@ public RxResult run(Query query) { public RxResult run(Query query, TransactionConfig config) { return new InternalRxResult(() -> { var resultCursorFuture = new CompletableFuture(); - session.runRx(query, config).whenComplete((cursor, completionError) -> { + session.runRx(query, config, resultCursorFuture).whenComplete((cursor, completionError) -> { if (cursor != null) { resultCursorFuture.complete(cursor); } else { diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java index bc4bf45d7e..445fae3ed7 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/RxUtils.java @@ -56,7 +56,7 @@ public static Publisher createEmptyPublisher(Supplier the type of the item to publish. * @return A publisher that succeeds exactly one item or fails with an error. */ - public static Publisher createSingleItemPublisher( + public static Mono createSingleItemPublisher( Supplier> supplier, Supplier nullResultThrowableSupplier, Consumer cancellationHandler) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveSession.java index 8f46bb12b5..4e8c60e51d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveSession.java @@ -20,22 +20,18 @@ import java.util.HashSet; import java.util.Set; -import java.util.concurrent.CompletionStage; import org.neo4j.driver.AccessMode; import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.internal.async.NetworkSession; import org.neo4j.driver.internal.async.UnmanagedTransaction; -import org.neo4j.driver.internal.cursor.RxResultCursor; import org.neo4j.driver.internal.reactive.AbstractReactiveSession; -import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.reactivestreams.ReactiveResult; import org.neo4j.driver.reactivestreams.ReactiveSession; import org.neo4j.driver.reactivestreams.ReactiveTransaction; import org.neo4j.driver.reactivestreams.ReactiveTransactionCallback; import org.reactivestreams.Publisher; -import reactor.core.publisher.Mono; public class InternalReactiveSession extends AbstractReactiveSession implements ReactiveSession, BaseReactiveQueryRunner { @@ -83,30 +79,7 @@ public Publisher run(Query query) { @Override public Publisher run(Query query, TransactionConfig config) { - CompletionStage cursorStage; - try { - cursorStage = session.runRx(query, config); - } catch (Throwable t) { - cursorStage = Futures.failedFuture(t); - } - - return Mono.fromCompletionStage(cursorStage) - .onErrorResume(error -> Mono.fromCompletionStage(session.releaseConnectionAsync()) - .onErrorMap(releaseError -> Futures.combineErrors(error, releaseError)) - .then(Mono.error(error))) - .flatMap(cursor -> { - Mono publisher; - var runError = cursor.getRunError(); - if (runError != null) { - publisher = Mono.fromCompletionStage(session.releaseConnectionAsync()) - .onErrorMap(releaseError -> Futures.combineErrors(runError, releaseError)) - .then(Mono.error(runError)); - } else { - publisher = Mono.just(cursor); - } - return publisher; - }) - .map(InternalReactiveResult::new); + return run(query, config, InternalReactiveResult::new); } @Override diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java index 1027e6ff6d..46159f6b43 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java @@ -20,13 +20,27 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.Flow; import java.util.function.Function; +import java.util.stream.IntStream; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.neo4j.driver.Config; +import org.neo4j.driver.ConnectionPoolMetrics; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; import org.neo4j.driver.reactive.ReactiveResult; @@ -34,7 +48,8 @@ import org.neo4j.driver.testutil.DatabaseExtension; import org.neo4j.driver.testutil.ParallelizableIT; import org.reactivestreams.Publisher; -import reactor.adapter.JdkFlowAdapter; +import org.reactivestreams.Subscription; +import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; @EnabledOnNeo4jWith(BOLT_V4) @@ -55,13 +70,111 @@ void shouldErrorWhenReactiveResultIsReturned(Function CompletableFuture.supplyAsync( + () -> { + var subscriptionFuture = new CompletableFuture(); + driver.session(ReactiveSession.class) + .run("UNWIND range (0,10000) AS x RETURN x") + .subscribe(new Flow.Subscriber<>() { + @Override + public void onSubscribe(Flow.Subscription subscription) { + subscriptionFuture.complete(subscription); + } + + @Override + public void onNext(ReactiveResult item) { + // ignored + } + + @Override + public void onError(Throwable throwable) { + // ignored + } + + @Override + public void onComplete() { + // ignored + } + }); + return subscriptionFuture.thenApplyAsync( + subscription -> { + if (request) { + subscription.request(1); + } + subscription.cancel(); + return subscription; + }, + executorService); + }, + executorService)) + .map(future -> future.thenCompose(itself -> itself)) + .toArray(CompletableFuture[]::new); + + CompletableFuture.allOf(subscriptionFutures).join(); + + // Subscription cancellation does not guarantee neither onComplete nor onError signal. + var timeout = Instant.now().plus(5, ChronoUnit.MINUTES); + var totalInUseConnections = -1; + while (Instant.now().isBefore(timeout)) { + totalInUseConnections = driver.metrics().connectionPoolMetrics().stream() + .map(ConnectionPoolMetrics::inUse) + .mapToInt(Integer::intValue) + .sum(); + if (totalInUseConnections == 0) { + return; + } + Thread.sleep(100); + } + fail(String.format("not all connections have been released, %d are still in use", totalInUseConnections)); + } + } + + @Test + void shouldRollbackResultOnSubscriptionCancellation() { + var config = Config.builder().withMaxConnectionPoolSize(1).build(); + try (var driver = neo4j.customDriver(config)) { + var session = driver.session(ReactiveSession.class); + var nodeId = UUID.randomUUID().toString(); + var cancellationFuture = new CompletableFuture(); + + flowPublisherToFlux(session.run("CREATE ({id: $id})", Map.of("id", nodeId))) + .subscribe(new BaseSubscriber<>() { + @Override + protected void hookOnSubscribe(Subscription subscription) { + subscription.cancel(); + cancellationFuture.complete(null); + } + }); + + cancellationFuture.join(); + + var nodesNum = flowPublisherToFlux(session.run("MATCH (n {id: $id}) RETURN n", Map.of("id", nodeId))) + .flatMap(result -> flowPublisherToFlux(result.records())) + .count() + .block(); + assertEquals(0, nodesNum); + } } static List>> managedTransactionsReturningReactiveResultPublisher() { return List.of( - session -> JdkFlowAdapter.flowPublisherToFlux(session.executeWrite(tx -> tx.run("RETURN 1"))), - session -> JdkFlowAdapter.flowPublisherToFlux(session.executeRead(tx -> tx.run("RETURN 1")))); + session -> flowPublisherToFlux(session.executeWrite(tx -> tx.run("RETURN 1"))), + session -> flowPublisherToFlux(session.executeRead(tx -> tx.run("RETURN 1")))); } } diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java index f03117fc97..77ba5dcf51 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java @@ -20,13 +20,25 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; import java.util.function.Function; +import java.util.stream.IntStream; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.neo4j.driver.Config; +import org.neo4j.driver.ConnectionPoolMetrics; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; import org.neo4j.driver.reactivestreams.ReactiveResult; @@ -34,7 +46,10 @@ import org.neo4j.driver.testutil.DatabaseExtension; import org.neo4j.driver.testutil.ParallelizableIT; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; @EnabledOnNeo4jWith(BOLT_V4) @ParallelizableIT @@ -57,6 +72,90 @@ void shouldErrorWhenReactiveResultIsReturned(Function CompletableFuture.supplyAsync( + () -> { + var subscriptionFuture = new CompletableFuture(); + driver.session(ReactiveSession.class) + .run("UNWIND range (0,10000) AS x RETURN x") + .subscribe(new BaseSubscriber<>() { + @Override + protected void hookOnSubscribe(Subscription subscription) { + // use subscription from another thread to avoid immediate cancellation + // within the subscribe method + subscriptionFuture.complete(subscription); + } + }); + return subscriptionFuture.thenApplyAsync( + subscription -> { + if (request) { + subscription.request(1); + } + subscription.cancel(); + return subscription; + }, + executorService); + }, + executorService)) + .map(future -> future.thenCompose(itself -> itself)) + .toArray(CompletableFuture[]::new); + + CompletableFuture.allOf(subscriptionFutures).join(); + + // Subscription cancellation does not guarantee neither onComplete nor onError signal. + var timeout = Instant.now().plus(5, ChronoUnit.MINUTES); + var totalInUseConnections = -1; + while (Instant.now().isBefore(timeout)) { + totalInUseConnections = driver.metrics().connectionPoolMetrics().stream() + .map(ConnectionPoolMetrics::inUse) + .mapToInt(Integer::intValue) + .sum(); + if (totalInUseConnections == 0) { + return; + } + Thread.sleep(100); + } + fail(String.format("not all connections have been released, %d are still in use", totalInUseConnections)); + } + } + + @Test + void shouldRollbackResultOnSubscriptionCancellation() { + var config = Config.builder().withMaxConnectionPoolSize(1).build(); + try (var driver = neo4j.customDriver(config)) { + var session = driver.session(ReactiveSession.class); + var nodeId = UUID.randomUUID().toString(); + var cancellationFuture = new CompletableFuture(); + + session.run("CREATE ({id: $id})", Map.of("id", nodeId)).subscribe(new BaseSubscriber<>() { + @Override + protected void hookOnSubscribe(Subscription subscription) { + subscription.cancel(); + cancellationFuture.complete(null); + } + }); + + cancellationFuture.join(); + + var nodesNum = Mono.fromDirect(session.run("MATCH (n {id: $id}) RETURN n", Map.of("id", nodeId))) + .flatMapMany(ReactiveResult::records) + .count() + .block(); + assertEquals(0, nodesNum); + } + } + static List>> managedTransactionsReturningReactiveResultPublisher() { return List.of( diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java index 74f9d40224..6d51f69c21 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java @@ -53,6 +53,7 @@ import java.util.Collections; import java.util.Set; +import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -99,7 +100,7 @@ void shouldFlushOnRunAsync() { @Test void shouldFlushOnRunRx() { setupSuccessfulRunRx(connection); - await(session.runRx(new Query("RETURN 1"), TransactionConfig.empty())); + await(session.runRx(new Query("RETURN 1"), TransactionConfig.empty(), CompletableFuture.completedStage(null))); verifyRunRx(connection, "RETURN 1"); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java index f462ce6050..ff306ea513 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java @@ -60,7 +60,7 @@ void shouldInstallSummaryConsumerWithoutReportingError() { var pullHandler = mock(PullResponseHandler.class); // When - new RxResultCursorImpl(error, runHandler, pullHandler); + new RxResultCursorImpl(error, runHandler, pullHandler, () -> CompletableFuture.completedStage(null)); // Then verify(pullHandler).installSummaryConsumer(any(BiConsumer.class)); @@ -160,7 +160,8 @@ void shouldInstallRecordConsumerAndReportError() { // When var runHandler = newRunResponseHandler(error); PullResponseHandler pullHandler = new ListBasedPullHandler(); - RxResultCursor cursor = new RxResultCursorImpl(error, runHandler, pullHandler); + RxResultCursor cursor = + new RxResultCursorImpl(error, runHandler, pullHandler, () -> CompletableFuture.completedStage(null)); cursor.installRecordConsumer(recordConsumer); // Then diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveSessionTest.java index 33a67a5943..0562c762d4 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveSessionTest.java @@ -98,14 +98,16 @@ void shouldDelegateRun(Function> runR RxResultCursor cursor = mock(RxResultCursorImpl.class); // Run succeeded with a cursor - when(session.runRx(any(Query.class), any(TransactionConfig.class))).thenReturn(completedFuture(cursor)); + when(session.runRx(any(Query.class), any(TransactionConfig.class), any())) + .thenReturn(completedFuture(cursor)); var rxSession = new InternalReactiveSession(session); // When var result = flowPublisherToFlux(runReturnOne.apply(rxSession)); + result.subscribe(); // Then - verify(session).runRx(any(Query.class), any(TransactionConfig.class)); + verify(session).runRx(any(Query.class), any(TransactionConfig.class), any()); StepVerifier.create(result).expectNextCount(1).verifyComplete(); } @@ -117,7 +119,8 @@ void shouldReleaseConnectionIfFailedToRun(Function error == t).verify(); - verify(session).runRx(any(Query.class), any(TransactionConfig.class)); + verify(session).runRx(any(Query.class), any(TransactionConfig.class), any()); verify(session).releaseConnectionAsync(); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java index f1b204d4f4..c5afe1e0ca 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java @@ -98,7 +98,8 @@ void shouldDelegateRun(Function runReturnOne) { RxResultCursor cursor = mock(RxResultCursorImpl.class); // Run succeeded with a cursor - when(session.runRx(any(Query.class), any(TransactionConfig.class))).thenReturn(completedFuture(cursor)); + when(session.runRx(any(Query.class), any(TransactionConfig.class), any())) + .thenReturn(completedFuture(cursor)); var rxSession = new InternalRxSession(session); // When @@ -107,7 +108,7 @@ void shouldDelegateRun(Function runReturnOne) { var cursorFuture = ((InternalRxResult) result).cursorFutureSupplier().get(); // Then - verify(session).runRx(any(Query.class), any(TransactionConfig.class)); + verify(session).runRx(any(Query.class), any(TransactionConfig.class), any()); assertThat(Futures.getNow(cursorFuture), equalTo(cursor)); } @@ -119,7 +120,8 @@ void shouldReleaseConnectionIfFailedToRun(Function runRetur var session = mock(NetworkSession.class); // Run failed with error - when(session.runRx(any(Query.class), any(TransactionConfig.class))).thenReturn(Futures.failedFuture(error)); + when(session.runRx(any(Query.class), any(TransactionConfig.class), any())) + .thenReturn(Futures.failedFuture(error)); when(session.releaseConnectionAsync()).thenReturn(Futures.completedWithNull()); var rxSession = new InternalRxSession(session); @@ -130,7 +132,7 @@ void shouldReleaseConnectionIfFailedToRun(Function runRetur var cursorFuture = ((InternalRxResult) result).cursorFutureSupplier().get(); // Then - verify(session).runRx(any(Query.class), any(TransactionConfig.class)); + verify(session).runRx(any(Query.class), any(TransactionConfig.class), any()); RuntimeException t = assertThrows(CompletionException.class, () -> Futures.getNow(cursorFuture)); assertThat(t.getCause(), equalTo(error)); verify(session).releaseConnectionAsync(); diff --git a/driver/src/test/java/org/neo4j/driver/testutil/DatabaseExtension.java b/driver/src/test/java/org/neo4j/driver/testutil/DatabaseExtension.java index be89388771..9a56e9ea1d 100644 --- a/driver/src/test/java/org/neo4j/driver/testutil/DatabaseExtension.java +++ b/driver/src/test/java/org/neo4j/driver/testutil/DatabaseExtension.java @@ -148,6 +148,10 @@ public Driver driver() { return driver; } + public Driver customDriver(Config config) { + return GraphDatabase.driver(boltUri, authToken, config); + } + public void deleteAndStartNeo4j(Map config) { Map updatedConfig = new HashMap<>(defaultConfig); updatedConfig.putAll(config);