contextCons
Result run(Query query, TransactionConfig config );
/**
- * Return the bookmark received following the last completed
- * {@linkplain Transaction transaction}. If no bookmark was received
- * or if this transaction was rolled back, the bookmark value will
- * be null.
+ * Return the last bookmark of this session.
+ *
+ * When no new bookmark is received, the initial bookmarks are returned as a composite {@link Bookmark} containing all initial bookmarks. This may happen
+ * when no work has been done using the session. If no initial bookmarks have been provided, an empty {@link Bookmark} is returned.
*
- * @return a reference to a previous transaction
+ * @return the last bookmark.
*/
+ @Deprecated
Bookmark lastBookmark();
/**
- * Signal that you are done using this session. In the default driver usage, closing and accessing sessions is
- * very low cost.
+ * Return a set of last bookmarks.
+ *
+ * When no new bookmark is received, the initial bookmarks are returned. This may happen when no work has been done using the session. Multivalued {@link
+ * Bookmark} instances will be mapped to distinct {@link Bookmark} instances. If no initial bookmarks have been provided, an empty set is returned.
+ *
+ * @return the immutable set of last bookmarks.
+ */
+ Set lastBookmarks();
+
+ /**
+ * Signal that you are done using this session. In the default driver usage, closing and accessing sessions is very low cost.
*/
@Override
void close();
diff --git a/driver/src/main/java/org/neo4j/driver/SessionConfig.java b/driver/src/main/java/org/neo4j/driver/SessionConfig.java
index 4286ad28ef..e8ec48acb2 100644
--- a/driver/src/main/java/org/neo4j/driver/SessionConfig.java
+++ b/driver/src/main/java/org/neo4j/driver/SessionConfig.java
@@ -26,6 +26,7 @@
import java.util.Optional;
import org.neo4j.driver.async.AsyncSession;
+import org.neo4j.driver.reactive.ReactiveSession;
import org.neo4j.driver.reactive.RxSession;
import static java.util.Objects.requireNonNull;
@@ -186,14 +187,12 @@ private Builder()
/**
* Set the initial bookmarks to be used in a session.
*
- * First transaction in a session will ensure that server hosting is at least as up-to-date as the
- * latest transaction referenced by the supplied bookmarks.
- * The bookmarks can be obtained via {@link Session#lastBookmark()}, {@link AsyncSession#lastBookmark()},
- * and/or {@link RxSession#lastBookmark()}.
+ * First transaction in a session will ensure that server hosting is at least as up-to-date as the latest transaction referenced by the supplied
+ * bookmarks. The bookmarks can be obtained via {@link Session#lastBookmarks()}, {@link AsyncSession#lastBookmarks()}, and/or {@link
+ * ReactiveSession#lastBookmarks()}.
*
- * @param bookmarks a series of initial bookmarks.
- * Both {@code null} value and empty array
- * are permitted, and indicate that the bookmarks do not exist or are unknown.
+ * @param bookmarks a series of initial bookmarks. Both {@code null} value and empty array are permitted, and indicate that the bookmarks do not exist
+ * or are unknown.
* @return this builder.
*/
public Builder withBookmarks( Bookmark... bookmarks )
@@ -210,14 +209,22 @@ public Builder withBookmarks( Bookmark... bookmarks )
}
/**
- * Set the initial bookmarks to be used in a session.
- * First transaction in a session will ensure that server hosting is at least as up-to-date as the
- * latest transaction referenced by the supplied bookmarks.
- * The bookmarks can be obtained via {@link Session#lastBookmark()}, {@link AsyncSession#lastBookmark()},
- * and/or {@link RxSession#lastBookmark()}.
+ * Set the initial bookmarks to be used in a session. First transaction in a session will ensure that server hosting is at least as up-to-date as the
+ * latest transaction referenced by the supplied bookmarks. The bookmarks can be obtained via {@link Session#lastBookmarks()}, {@link
+ * AsyncSession#lastBookmarks()}, and/or {@link ReactiveSession#lastBookmarks()}.
+ *
+ * Multiple immutable sets of bookmarks may be joined in the following way:
+ *
+ * {@code
+ * Set bookmarks = new HashSet<>();
+ * bookmarks.addAll( session1.lastBookmarks() );
+ * bookmarks.addAll( session2.lastBookmarks() );
+ * bookmarks.addAll( session3.lastBookmarks() );
+ * }
+ *
*
- * @param bookmarks initial references to some previous transactions. Both {@code null} value and empty iterable
- * are permitted, and indicate that the bookmarks do not exist or are unknown.
+ * @param bookmarks initial references to some previous transactions. Both {@code null} value and empty iterable are permitted, and indicate that the
+ * bookmarks do not exist or are unknown.
* @return this builder
*/
public Builder withBookmarks( Iterable bookmarks )
diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java b/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java
index 2b40688005..4d13fd9962 100644
--- a/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java
+++ b/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java
@@ -19,6 +19,7 @@
package org.neo4j.driver.async;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
@@ -382,15 +383,26 @@ default CompletionStage executeWriteAsync( AsyncTransactionCallback runAsync( Query query, TransactionConfig config );
/**
- * Return the bookmark received following the last completed
- * {@linkplain Transaction transaction}. If no bookmark was received
- * or if this transaction was rolled back, the bookmark value will
- * be null.
+ * Return the last bookmark of this session.
+ *
+ * When no new bookmark is received, the initial bookmarks are returned as a composite {@link Bookmark} containing all initial bookmarks. This may happen
+ * when no work has been done using the session. If no initial bookmarks have been provided, an empty {@link Bookmark} is returned.
*
- * @return a reference to a previous transaction
+ * @return the last bookmark.
*/
+ @Deprecated
Bookmark lastBookmark();
+ /**
+ * Return a set of last bookmarks.
+ *
+ * When no new bookmark is received, the initial bookmarks are returned. This may happen when no work has been done using the session. Multivalued {@link
+ * Bookmark} instances will be mapped to distinct {@link Bookmark} instances. If no initial bookmarks have been provided, an empty set is returned.
+ *
+ * @return the immutable set of last bookmarks.
+ */
+ Set lastBookmarks();
+
/**
* Signal that you are done using this session. In the default driver usage, closing and accessing sessions is
* very low cost.
diff --git a/driver/src/main/java/org/neo4j/driver/internal/BookmarkHolder.java b/driver/src/main/java/org/neo4j/driver/internal/BookmarksHolder.java
similarity index 78%
rename from driver/src/main/java/org/neo4j/driver/internal/BookmarkHolder.java
rename to driver/src/main/java/org/neo4j/driver/internal/BookmarksHolder.java
index d048d0948f..bdeae1e8f4 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/BookmarkHolder.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/BookmarksHolder.java
@@ -18,20 +18,23 @@
*/
package org.neo4j.driver.internal;
+import java.util.Collections;
+import java.util.Set;
+
import org.neo4j.driver.Bookmark;
-public interface BookmarkHolder
+public interface BookmarksHolder
{
- Bookmark getBookmark();
+ Set getBookmarks();
void setBookmark( Bookmark bookmark );
- BookmarkHolder NO_OP = new BookmarkHolder()
+ BookmarksHolder NO_OP = new BookmarksHolder()
{
@Override
- public Bookmark getBookmark()
+ public Set getBookmarks()
{
- return InternalBookmark.empty();
+ return Collections.emptySet();
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/DefaultBookmarkHolder.java b/driver/src/main/java/org/neo4j/driver/internal/DefaultBookmarksHolder.java
similarity index 66%
rename from driver/src/main/java/org/neo4j/driver/internal/DefaultBookmarkHolder.java
rename to driver/src/main/java/org/neo4j/driver/internal/DefaultBookmarksHolder.java
index 61f333bc32..2e294d8341 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/DefaultBookmarkHolder.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/DefaultBookmarksHolder.java
@@ -18,29 +18,33 @@
*/
package org.neo4j.driver.internal;
+import java.util.Collections;
+import java.util.Set;
+
import org.neo4j.driver.Bookmark;
/**
* @since 2.0
*/
-public class DefaultBookmarkHolder implements BookmarkHolder
+public class DefaultBookmarksHolder implements BookmarksHolder
{
- private volatile Bookmark bookmark;
+ private volatile Set bookmarks;
- public DefaultBookmarkHolder()
+ // for testing only
+ public DefaultBookmarksHolder()
{
- this( InternalBookmark.empty() );
+ this( Collections.emptySet() );
}
- public DefaultBookmarkHolder( Bookmark bookmark )
+ public DefaultBookmarksHolder( Set bookmarks )
{
- this.bookmark = bookmark;
+ this.bookmarks = bookmarks;
}
@Override
- public Bookmark getBookmark()
+ public Set getBookmarks()
{
- return bookmark;
+ return bookmarks;
}
@Override
@@ -48,7 +52,7 @@ public void setBookmark( Bookmark bookmark )
{
if ( bookmark != null && !bookmark.isEmpty() )
{
- this.bookmark = bookmark;
+ bookmarks = Collections.singleton( bookmark );
}
}
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalBookmark.java b/driver/src/main/java/org/neo4j/driver/internal/InternalBookmark.java
index c4ac5cbab7..cc920e0bec 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/InternalBookmark.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/InternalBookmark.java
@@ -117,6 +117,12 @@ public boolean isEmpty()
return values.isEmpty();
}
+ @Override
+ public String value()
+ {
+ return values.isEmpty() ? null : values.iterator().next();
+ }
+
@Override
public Set values()
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java
index 33008f71c4..c6378e50a8 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java
@@ -19,6 +19,7 @@
package org.neo4j.driver.internal;
import java.util.Map;
+import java.util.Set;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
@@ -140,7 +141,13 @@ public T executeWrite( TransactionCallback callback, TransactionConfig co
@Override
public Bookmark lastBookmark()
{
- return session.lastBookmark();
+ return InternalBookmark.from( session.lastBookmarks() );
+ }
+
+ @Override
+ public Set lastBookmarks()
+ {
+ return session.lastBookmarks();
}
private T transaction( AccessMode mode, TransactionWork work, TransactionConfig config )
@@ -149,19 +156,21 @@ private T transaction( AccessMode mode, TransactionWork work, Transaction
// caller thread will also be the one who sleeps between retries;
// it is unsafe to execute retries in the event loop threads because this can cause a deadlock
// event loop thread will bock and wait for itself to read some data
- return session.retryLogic().retry( () -> {
- try ( Transaction tx = beginTransaction( mode, config ) )
- {
-
- T result = work.execute( tx );
- if ( tx.isOpen() )
+ return session.retryLogic().retry(
+ () ->
{
- // commit tx if a user has not explicitly committed or rolled back the transaction
- tx.commit();
- }
- return result;
- }
- } );
+ try ( Transaction tx = beginTransaction( mode, config ) )
+ {
+
+ T result = work.execute( tx );
+ if ( tx.isOpen() )
+ {
+ // commit tx if a user has not explicitly committed or rolled back the transaction
+ tx.commit();
+ }
+ return result;
+ }
+ } );
}
private Transaction beginTransaction( AccessMode mode, TransactionConfig config )
diff --git a/driver/src/main/java/org/neo4j/driver/internal/ReadOnlyBookmarkHolder.java b/driver/src/main/java/org/neo4j/driver/internal/ReadOnlyBookmarksHolder.java
similarity index 75%
rename from driver/src/main/java/org/neo4j/driver/internal/ReadOnlyBookmarkHolder.java
rename to driver/src/main/java/org/neo4j/driver/internal/ReadOnlyBookmarksHolder.java
index c828f07aee..39cb18d51b 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/ReadOnlyBookmarkHolder.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/ReadOnlyBookmarksHolder.java
@@ -18,24 +18,26 @@
*/
package org.neo4j.driver.internal;
+import java.util.Set;
+
import org.neo4j.driver.Bookmark;
/**
* @since 2.0
*/
-public class ReadOnlyBookmarkHolder implements BookmarkHolder
+public class ReadOnlyBookmarksHolder implements BookmarksHolder
{
- private final Bookmark bookmark;
+ private final Set bookmarks;
- public ReadOnlyBookmarkHolder( Bookmark bookmark )
+ public ReadOnlyBookmarksHolder( Set bookmarks )
{
- this.bookmark = bookmark;
+ this.bookmarks = bookmarks;
}
@Override
- public Bookmark getBookmark()
+ public Set getBookmarks()
{
- return bookmark;
+ return bookmarks;
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
index 8a9e85634e..1f4b866692 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
@@ -18,10 +18,14 @@
*/
package org.neo4j.driver.internal;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.AccessMode;
+import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Config;
import org.neo4j.driver.Logging;
import org.neo4j.driver.SessionConfig;
@@ -50,12 +54,40 @@ public class SessionFactoryImpl implements SessionFactory
@Override
public NetworkSession newInstance( SessionConfig sessionConfig )
{
- BookmarkHolder bookmarkHolder = new DefaultBookmarkHolder( InternalBookmark.from( sessionConfig.bookmarks() ) );
+ BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder( toDistinctSet( sessionConfig.bookmarks() ) );
return createSession( connectionProvider, retryLogic, parseDatabaseName( sessionConfig ),
- sessionConfig.defaultAccessMode(), bookmarkHolder, parseFetchSize( sessionConfig ),
+ sessionConfig.defaultAccessMode(), bookmarksHolder, parseFetchSize( sessionConfig ),
sessionConfig.impersonatedUser().orElse( null ), logging );
}
+ private Set toDistinctSet( Iterable bookmarks )
+ {
+ Set set = new HashSet<>();
+ if ( bookmarks != null )
+ {
+ for ( Bookmark bookmark : bookmarks )
+ {
+ if ( bookmark != null )
+ {
+ Set values = bookmark.values();
+ int size = values.size();
+ if ( size == 1 )
+ {
+ set.add( bookmark );
+ }
+ else if ( size > 1 )
+ {
+ for ( String value : values )
+ {
+ set.add( Bookmark.from( value ) );
+ }
+ }
+ }
+ }
+ }
+ return Collections.unmodifiableSet( set );
+ }
+
private long parseFetchSize( SessionConfig sessionConfig )
{
return sessionConfig.fetchSize().orElse( defaultFetchSize );
@@ -64,8 +96,8 @@ private long parseFetchSize( SessionConfig sessionConfig )
private DatabaseName parseDatabaseName( SessionConfig sessionConfig )
{
return sessionConfig.database()
- .flatMap( name -> Optional.of( DatabaseNameUtil.database( name ) ) )
- .orElse( DatabaseNameUtil.defaultDatabase() );
+ .flatMap( name -> Optional.of( DatabaseNameUtil.database( name ) ) )
+ .orElse( DatabaseNameUtil.defaultDatabase() );
}
@Override
@@ -99,10 +131,10 @@ public ConnectionProvider getConnectionProvider()
}
private NetworkSession createSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, DatabaseName databaseName, AccessMode mode,
- BookmarkHolder bookmarkHolder, long fetchSize, String impersonatedUser, Logging logging )
+ BookmarksHolder bookmarksHolder, long fetchSize, String impersonatedUser, Logging logging )
{
return leakedSessionsLoggingEnabled
- ? new LeakLoggingNetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, impersonatedUser, fetchSize, logging )
- : new NetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, impersonatedUser, fetchSize, logging );
+ ? new LeakLoggingNetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarksHolder, impersonatedUser, fetchSize, logging )
+ : new NetworkSession( connectionProvider, retryLogic, databaseName, mode, bookmarksHolder, impersonatedUser, fetchSize, logging );
}
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ConnectionContext.java b/driver/src/main/java/org/neo4j/driver/internal/async/ConnectionContext.java
index 31efe28b90..f9e8d5a4a8 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ConnectionContext.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/ConnectionContext.java
@@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal.async;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
@@ -37,7 +38,7 @@ public interface ConnectionContext
AccessMode mode();
- Bookmark rediscoveryBookmark();
+ Set rediscoveryBookmarks();
String impersonatedUser();
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ImmutableConnectionContext.java b/driver/src/main/java/org/neo4j/driver/internal/async/ImmutableConnectionContext.java
index ccaa86c389..c92f7eb546 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ImmutableConnectionContext.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/ImmutableConnectionContext.java
@@ -18,6 +18,8 @@
*/
package org.neo4j.driver.internal.async;
+import java.util.Collections;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.neo4j.driver.AccessMode;
@@ -27,24 +29,23 @@
import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase;
import static org.neo4j.driver.internal.DatabaseNameUtil.systemDatabase;
-import static org.neo4j.driver.internal.InternalBookmark.empty;
/**
* A {@link Connection} shall fulfil this {@link ImmutableConnectionContext} when acquired from a connection provider.
*/
public class ImmutableConnectionContext implements ConnectionContext
{
- private static final ConnectionContext SINGLE_DB_CONTEXT = new ImmutableConnectionContext( defaultDatabase(), empty(), AccessMode.READ );
- private static final ConnectionContext MULTI_DB_CONTEXT = new ImmutableConnectionContext( systemDatabase(), empty(), AccessMode.READ );
+ private static final ConnectionContext SINGLE_DB_CONTEXT = new ImmutableConnectionContext( defaultDatabase(), Collections.emptySet(), AccessMode.READ );
+ private static final ConnectionContext MULTI_DB_CONTEXT = new ImmutableConnectionContext( systemDatabase(), Collections.emptySet(), AccessMode.READ );
private final CompletableFuture databaseNameFuture;
private final AccessMode mode;
- private final Bookmark rediscoveryBookmark;
+ private final Set rediscoveryBookmarks;
- public ImmutableConnectionContext( DatabaseName databaseName, Bookmark bookmark, AccessMode mode )
+ public ImmutableConnectionContext( DatabaseName databaseName, Set bookmarks, AccessMode mode )
{
this.databaseNameFuture = CompletableFuture.completedFuture( databaseName );
- this.rediscoveryBookmark = bookmark;
+ this.rediscoveryBookmarks = bookmarks;
this.mode = mode;
}
@@ -61,9 +62,9 @@ public AccessMode mode()
}
@Override
- public Bookmark rediscoveryBookmark()
+ public Set rediscoveryBookmarks()
{
- return rediscoveryBookmark;
+ return rediscoveryBookmarks;
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java
index 23c12fded6..a32f20138e 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java
@@ -18,7 +18,9 @@
*/
package org.neo4j.driver.internal.async;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -31,6 +33,7 @@
import org.neo4j.driver.async.AsyncTransactionCallback;
import org.neo4j.driver.async.AsyncTransactionWork;
import org.neo4j.driver.async.ResultCursor;
+import org.neo4j.driver.internal.InternalBookmark;
import org.neo4j.driver.internal.util.Futures;
import static java.util.Collections.emptyMap;
@@ -127,29 +130,38 @@ public CompletionStage executeWriteAsync( AsyncTransactionCallback CompletionStage transactionAsync( AccessMode mode, AsyncTransactionWork> work, TransactionConfig config )
+ @Override
+ public Set lastBookmarks()
{
- return session.retryLogic().retryAsync( () -> {
- CompletableFuture resultFuture = new CompletableFuture<>();
- CompletionStage txFuture = session.beginTransactionAsync( mode, config );
-
- txFuture.whenComplete( ( tx, completionError ) -> {
- Throwable error = Futures.completionExceptionCause( completionError );
- if ( error != null )
- {
- resultFuture.completeExceptionally( error );
- }
- else
- {
- executeWork( resultFuture, tx, work );
- }
- } );
+ return new HashSet<>( session.lastBookmarks() );
+ }
- return resultFuture;
- } );
+ private CompletionStage transactionAsync( AccessMode mode, AsyncTransactionWork> work, TransactionConfig config )
+ {
+ return session.retryLogic().retryAsync( () ->
+ {
+ CompletableFuture resultFuture = new CompletableFuture<>();
+ CompletionStage txFuture = session.beginTransactionAsync( mode, config );
+
+ txFuture.whenComplete(
+ ( tx, completionError ) ->
+ {
+ Throwable error = Futures.completionExceptionCause( completionError );
+ if ( error != null )
+ {
+ resultFuture.completeExceptionally( error );
+ }
+ else
+ {
+ executeWork( resultFuture, tx, work );
+ }
+ } );
+
+ return resultFuture;
+ } );
}
private void executeWork(CompletableFuture resultFuture, UnmanagedTransaction tx, AsyncTransactionWork> work )
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java
index 952c17e243..0b23b5f554 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java
@@ -20,7 +20,7 @@
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Logging;
-import org.neo4j.driver.internal.BookmarkHolder;
+import org.neo4j.driver.internal.BookmarksHolder;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.ConnectionProvider;
@@ -33,9 +33,9 @@ public class LeakLoggingNetworkSession extends NetworkSession
private final String stackTrace;
public LeakLoggingNetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, DatabaseName databaseName, AccessMode mode,
- BookmarkHolder bookmarkHolder, String impersonatedUser, long fetchSize, Logging logging )
+ BookmarksHolder bookmarksHolder, String impersonatedUser, long fetchSize, Logging logging )
{
- super( connectionProvider, retryLogic, databaseName, mode, bookmarkHolder, impersonatedUser, fetchSize, logging );
+ super( connectionProvider, retryLogic, databaseName, mode, bookmarksHolder, impersonatedUser, fetchSize, logging );
this.stackTrace = captureStackTrace();
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java
index 197e774a13..71f1ef094a 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java
@@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal.async;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
@@ -32,7 +33,7 @@
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.TransactionNestingException;
-import org.neo4j.driver.internal.BookmarkHolder;
+import org.neo4j.driver.internal.BookmarksHolder;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.FailableCursor;
import org.neo4j.driver.internal.ImpersonationUtil;
@@ -56,7 +57,7 @@ public class NetworkSession
private final RetryLogic retryLogic;
protected final Logger log;
- private final BookmarkHolder bookmarkHolder;
+ private final BookmarksHolder bookmarksHolder;
private final long fetchSize;
private volatile CompletionStage transactionStage = completedWithNull();
private volatile CompletionStage connectionStage = completedWithNull();
@@ -65,17 +66,17 @@ public class NetworkSession
private final AtomicBoolean open = new AtomicBoolean( true );
public NetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, DatabaseName databaseName, AccessMode mode,
- BookmarkHolder bookmarkHolder, String impersonatedUser, long fetchSize, Logging logging )
+ BookmarksHolder bookmarksHolder, String impersonatedUser, long fetchSize, Logging logging )
{
this.connectionProvider = connectionProvider;
this.mode = mode;
this.retryLogic = retryLogic;
this.log = new PrefixedLogger( "[" + hashCode() + "]", logging.getLog( getClass() ) );
- this.bookmarkHolder = bookmarkHolder;
+ this.bookmarksHolder = bookmarksHolder;
CompletableFuture databaseNameFuture = databaseName.databaseName()
.map( ignored -> CompletableFuture.completedFuture( databaseName ) )
.orElse( new CompletableFuture<>() );
- this.connectionContext = new NetworkSessionConnectionContext( databaseNameFuture, bookmarkHolder.getBookmark(), impersonatedUser );
+ this.connectionContext = new NetworkSessionConnectionContext( databaseNameFuture, bookmarksHolder.getBookmarks(), impersonatedUser );
this.fetchSize = fetchSize;
}
@@ -111,10 +112,10 @@ public CompletionStage beginTransactionAsync( AccessMode m
.thenCompose( ignore -> acquireConnection( mode ) )
.thenApply( connection -> ImpersonationUtil.ensureImpersonationSupport( connection, connection.impersonatedUser() ) )
.thenCompose( connection ->
- {
- UnmanagedTransaction tx = new UnmanagedTransaction( connection, bookmarkHolder, fetchSize );
- return tx.beginAsync( bookmarkHolder.getBookmark(), config );
- } );
+ {
+ UnmanagedTransaction tx = new UnmanagedTransaction( connection, bookmarksHolder, fetchSize );
+ return tx.beginAsync( bookmarksHolder.getBookmarks(), config );
+ } );
// update the reference to the only known transaction
CompletionStage currentTransactionStage = transactionStage;
@@ -140,9 +141,9 @@ public RetryLogic retryLogic()
return retryLogic;
}
- public Bookmark lastBookmark()
+ public Set lastBookmarks()
{
- return bookmarkHolder.getBookmark();
+ return bookmarksHolder.getBookmarks();
}
public CompletionStage releaseConnectionAsync()
@@ -219,7 +220,7 @@ private CompletionStage buildResultCursorFactory( Query que
{
ResultCursorFactory factory = connection
.protocol()
- .runInAutoCommitTransaction( connection, query, bookmarkHolder, config, fetchSize );
+ .runInAutoCommitTransaction( connection, query, bookmarksHolder, config, fetchSize );
return completedFuture( factory );
}
catch ( Throwable e )
@@ -338,16 +339,16 @@ private static class NetworkSessionConnectionContext implements ConnectionContex
private final CompletableFuture databaseNameFuture;
private AccessMode mode;
- // This bookmark is only used for rediscovery.
- // It has to be the initial bookmark given at the creation of the session.
- // As only that bookmark could carry extra system bookmarks
- private final Bookmark rediscoveryBookmark;
+ // These bookmarks are only used for rediscovery.
+ // They have to be the initial bookmarks given at the creation of the session.
+ // As only those bookmarks could carry extra system bookmarks
+ private final Set rediscoveryBookmarks;
private final String impersonatedUser;
- private NetworkSessionConnectionContext( CompletableFuture databaseNameFuture, Bookmark bookmark, String impersonatedUser )
+ private NetworkSessionConnectionContext( CompletableFuture databaseNameFuture, Set bookmarks, String impersonatedUser )
{
this.databaseNameFuture = databaseNameFuture;
- this.rediscoveryBookmark = bookmark;
+ this.rediscoveryBookmarks = bookmarks;
this.impersonatedUser = impersonatedUser;
}
@@ -370,9 +371,9 @@ public AccessMode mode()
}
@Override
- public Bookmark rediscoveryBookmark()
+ public Set rediscoveryBookmarks()
{
- return rediscoveryBookmark;
+ return rediscoveryBookmarks;
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java
index fbd7a985c7..a05cc884c1 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java
@@ -20,6 +20,7 @@
import java.util.Arrays;
import java.util.EnumSet;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
@@ -30,13 +31,12 @@
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Query;
-import org.neo4j.driver.Session;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
-import org.neo4j.driver.internal.BookmarkHolder;
+import org.neo4j.driver.internal.BookmarksHolder;
import org.neo4j.driver.internal.cursor.AsyncResultCursor;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.messaging.BoltProtocol;
@@ -59,7 +59,7 @@ private enum State
ACTIVE,
/**
- * This transaction has been terminated either because of explicit {@link Session#reset()} or because of a fatal connection error.
+ * This transaction has been terminated either because of a fatal connection error.
*/
TERMINATED,
@@ -84,7 +84,7 @@ private enum State
private final Connection connection;
private final BoltProtocol protocol;
- private final BookmarkHolder bookmarkHolder;
+ private final BookmarksHolder bookmarksHolder;
private final ResultCursorsHolder resultCursors;
private final long fetchSize;
private final Lock lock = new ReentrantLock();
@@ -93,23 +93,23 @@ private enum State
private CompletableFuture rollbackFuture;
private Throwable causeOfTermination;
- public UnmanagedTransaction( Connection connection, BookmarkHolder bookmarkHolder, long fetchSize )
+ public UnmanagedTransaction( Connection connection, BookmarksHolder bookmarksHolder, long fetchSize )
{
- this( connection, bookmarkHolder, fetchSize, new ResultCursorsHolder() );
+ this( connection, bookmarksHolder, fetchSize, new ResultCursorsHolder() );
}
- protected UnmanagedTransaction( Connection connection, BookmarkHolder bookmarkHolder, long fetchSize, ResultCursorsHolder resultCursors )
+ protected UnmanagedTransaction( Connection connection, BookmarksHolder bookmarksHolder, long fetchSize, ResultCursorsHolder resultCursors )
{
this.connection = connection;
this.protocol = connection.protocol();
- this.bookmarkHolder = bookmarkHolder;
+ this.bookmarksHolder = bookmarksHolder;
this.resultCursors = resultCursors;
this.fetchSize = fetchSize;
}
- public CompletionStage beginAsync( Bookmark initialBookmark, TransactionConfig config )
+ public CompletionStage beginAsync( Set initialBookmarks, TransactionConfig config )
{
- return protocol.beginTransaction( connection, initialBookmark, config )
+ return protocol.beginTransaction( connection, initialBookmarks, config )
.handle( ( ignore, beginError ) ->
{
if ( beginError != null )
@@ -240,7 +240,7 @@ private CompletionStage doCommitAsync( Throwable cursorFailure )
cursorFailure != causeOfTermination ? causeOfTermination : null )
: null
);
- return exception != null ? failedFuture( exception ) : protocol.commitTransaction( connection ).thenAccept( bookmarkHolder::setBookmark );
+ return exception != null ? failedFuture( exception ) : protocol.commitTransaction( connection ).thenAccept( bookmarksHolder::setBookmark );
}
private CompletionStage doRollbackAsync()
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionProvider.java
index e71f800cd5..ef51c38115 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionProvider.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionProvider.java
@@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal.cluster;
+import java.util.Set;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.Bookmark;
@@ -26,5 +27,6 @@
public interface ClusterCompositionProvider
{
- CompletionStage getClusterComposition( Connection connection, DatabaseName databaseName, Bookmark bookmark, String impersonatedUser );
+ CompletionStage getClusterComposition( Connection connection, DatabaseName databaseName, Set bookmarks,
+ String impersonatedUser );
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java
index 71f09cfc08..03d1884dbf 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java
@@ -19,14 +19,15 @@
package org.neo4j.driver.internal.cluster;
import java.util.HashMap;
+import java.util.Set;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Query;
import org.neo4j.driver.Value;
-import org.neo4j.driver.internal.BookmarkHolder;
+import org.neo4j.driver.internal.BookmarksHolder;
import org.neo4j.driver.internal.DatabaseName;
-import org.neo4j.driver.internal.ReadOnlyBookmarkHolder;
+import org.neo4j.driver.internal.ReadOnlyBookmarksHolder;
import org.neo4j.driver.internal.async.connection.DirectConnection;
import org.neo4j.driver.internal.messaging.BoltProtocolVersion;
import org.neo4j.driver.internal.spi.Connection;
@@ -50,9 +51,9 @@ public MultiDatabasesRoutingProcedureRunner( RoutingContext context )
}
@Override
- BookmarkHolder bookmarkHolder( Bookmark bookmark )
+ BookmarksHolder bookmarksHolder( Set bookmarks )
{
- return new ReadOnlyBookmarkHolder( bookmark );
+ return new ReadOnlyBookmarksHolder( bookmarks );
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java
index abd5ddc784..692147cb0d 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java
@@ -20,6 +20,7 @@
import java.net.UnknownHostException;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.Bookmark;
@@ -38,11 +39,11 @@ public interface Rediscovery
*
* @param routingTable the routing table for cluster composition lookup
* @param connectionPool the connection pool for connection acquisition
- * @param bookmark the bookmark that is presented to the server
+ * @param bookmarks the bookmarks that are presented to the server
* @param impersonatedUser the impersonated user for cluster composition lookup, should be {@code null} for non-impersonated requests
* @return cluster composition lookup result
*/
- CompletionStage lookupClusterComposition( RoutingTable routingTable, ConnectionPool connectionPool, Bookmark bookmark,
+ CompletionStage lookupClusterComposition( RoutingTable routingTable, ConnectionPool connectionPool, Set bookmarks,
String impersonatedUser );
List resolve() throws UnknownHostException;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java
index 692afa3063..10251c7905 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java
@@ -89,19 +89,19 @@ public RediscoveryImpl( BoltServerAddress initialRouter, ClusterCompositionProvi
*/
@Override
public CompletionStage lookupClusterComposition( RoutingTable routingTable, ConnectionPool connectionPool,
- Bookmark bookmark, String impersonatedUser )
+ Set bookmarks, String impersonatedUser )
{
CompletableFuture result = new CompletableFuture<>();
// if we failed discovery, we will chain all errors into this one.
ServiceUnavailableException baseError = new ServiceUnavailableException( String.format( NO_ROUTERS_AVAILABLE, routingTable.database().description() ) );
- lookupClusterComposition( routingTable, connectionPool, result, bookmark, impersonatedUser, baseError );
+ lookupClusterComposition( routingTable, connectionPool, result, bookmarks, impersonatedUser, baseError );
return result;
}
private void lookupClusterComposition( RoutingTable routingTable, ConnectionPool pool, CompletableFuture result,
- Bookmark bookmark, String impersonatedUser, Throwable baseError )
+ Set bookmarks, String impersonatedUser, Throwable baseError )
{
- lookup( routingTable, pool, bookmark, impersonatedUser, baseError )
+ lookup( routingTable, pool, bookmarks, impersonatedUser, baseError )
.whenComplete(
( compositionLookupResult, completionError ) ->
{
@@ -121,29 +121,29 @@ else if ( compositionLookupResult != null )
} );
}
- private CompletionStage lookup( RoutingTable routingTable, ConnectionPool connectionPool, Bookmark bookmark,
+ private CompletionStage lookup( RoutingTable routingTable, ConnectionPool connectionPool, Set bookmarks,
String impersonatedUser, Throwable baseError )
{
CompletionStage compositionStage;
if ( routingTable.preferInitialRouter() )
{
- compositionStage = lookupOnInitialRouterThenOnKnownRouters( routingTable, connectionPool, bookmark, impersonatedUser, baseError );
+ compositionStage = lookupOnInitialRouterThenOnKnownRouters( routingTable, connectionPool, bookmarks, impersonatedUser, baseError );
}
else
{
- compositionStage = lookupOnKnownRoutersThenOnInitialRouter( routingTable, connectionPool, bookmark, impersonatedUser, baseError );
+ compositionStage = lookupOnKnownRoutersThenOnInitialRouter( routingTable, connectionPool, bookmarks, impersonatedUser, baseError );
}
return compositionStage;
}
private CompletionStage lookupOnKnownRoutersThenOnInitialRouter( RoutingTable routingTable, ConnectionPool connectionPool,
- Bookmark bookmark, String impersonatedUser,
+ Set bookmarks, String impersonatedUser,
Throwable baseError )
{
Set seenServers = new HashSet<>();
- return lookupOnKnownRouters( routingTable, connectionPool, seenServers, bookmark, impersonatedUser, baseError )
+ return lookupOnKnownRouters( routingTable, connectionPool, seenServers, bookmarks, impersonatedUser, baseError )
.thenCompose(
compositionLookupResult ->
{
@@ -152,16 +152,16 @@ private CompletionStage lookupOnKnownRoutersThen
return completedFuture(
compositionLookupResult );
}
- return lookupOnInitialRouter( routingTable, connectionPool, seenServers, bookmark, impersonatedUser, baseError );
+ return lookupOnInitialRouter( routingTable, connectionPool, seenServers, bookmarks, impersonatedUser, baseError );
} );
}
private CompletionStage lookupOnInitialRouterThenOnKnownRouters( RoutingTable routingTable, ConnectionPool connectionPool,
- Bookmark bookmark, String impersonatedUser,
+ Set bookmarks, String impersonatedUser,
Throwable baseError )
{
Set seenServers = emptySet();
- return lookupOnInitialRouter( routingTable, connectionPool, seenServers, bookmark, impersonatedUser, baseError )
+ return lookupOnInitialRouter( routingTable, connectionPool, seenServers, bookmarks, impersonatedUser, baseError )
.thenCompose(
compositionLookupResult ->
{
@@ -170,12 +170,12 @@ private CompletionStage lookupOnInitialRouterThe
return completedFuture(
compositionLookupResult );
}
- return lookupOnKnownRouters( routingTable, connectionPool, new HashSet<>(), bookmark, impersonatedUser, baseError );
+ return lookupOnKnownRouters( routingTable, connectionPool, new HashSet<>(), bookmarks, impersonatedUser, baseError );
} );
}
private CompletionStage lookupOnKnownRouters( RoutingTable routingTable, ConnectionPool connectionPool,
- Set seenServers, Bookmark bookmark,
+ Set seenServers, Set bookmarks,
String impersonatedUser, Throwable baseError )
{
CompletableFuture result = completedWithNull();
@@ -191,7 +191,7 @@ private CompletionStage lookupOnKnownRouters( Ro
}
else
{
- return lookupOnRouter( address, true, routingTable, connectionPool, seenServers, bookmark, impersonatedUser, baseError );
+ return lookupOnRouter( address, true, routingTable, connectionPool, seenServers, bookmarks, impersonatedUser, baseError );
}
} );
}
@@ -199,7 +199,7 @@ private CompletionStage lookupOnKnownRouters( Ro
}
private CompletionStage lookupOnInitialRouter( RoutingTable routingTable, ConnectionPool connectionPool,
- Set seenServers, Bookmark bookmark,
+ Set seenServers, Set bookmarks,
String impersonatedUser, Throwable baseError )
{
List resolvedRouters;
@@ -224,14 +224,14 @@ private CompletionStage lookupOnInitialRouter( R
{
return completedFuture( composition );
}
- return lookupOnRouter( address, false, routingTable, connectionPool, null, bookmark, impersonatedUser, baseError );
+ return lookupOnRouter( address, false, routingTable, connectionPool, null, bookmarks, impersonatedUser, baseError );
} );
}
return result.thenApply( composition -> composition != null ? new ClusterCompositionLookupResult( composition, resolvedRouterSet ) : null );
}
private CompletionStage lookupOnRouter( BoltServerAddress routerAddress, boolean resolveAddress, RoutingTable routingTable,
- ConnectionPool connectionPool, Set seenServers, Bookmark bookmark,
+ ConnectionPool connectionPool, Set seenServers, Set bookmarks,
String impersonatedUser, Throwable baseError )
{
CompletableFuture addressFuture = CompletableFuture.completedFuture( routerAddress );
@@ -241,7 +241,7 @@ private CompletionStage lookupOnRouter( BoltServerAddress ro
.thenApply( address -> addAndReturn( seenServers, address ) )
.thenCompose( connectionPool::acquire )
.thenApply( connection -> ImpersonationUtil.ensureImpersonationSupport( connection, impersonatedUser ) )
- .thenCompose( connection -> provider.getClusterComposition( connection, routingTable.database(), bookmark, impersonatedUser ) )
+ .thenCompose( connection -> provider.getClusterComposition( connection, routingTable.database(), bookmarks, impersonatedUser ) )
.handle( ( response, error ) ->
{
Throwable cause = Futures.completionExceptionCause( error );
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RouteMessageRoutingProcedureRunner.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RouteMessageRoutingProcedureRunner.java
index 095944b069..479d646cc6 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RouteMessageRoutingProcedureRunner.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RouteMessageRoutingProcedureRunner.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
@@ -66,12 +67,12 @@ protected RouteMessageRoutingProcedureRunner( RoutingContext routingContext, Sup
}
@Override
- public CompletionStage run( Connection connection, DatabaseName databaseName, Bookmark bookmark, String impersonatedUser )
+ public CompletionStage run( Connection connection, DatabaseName databaseName, Set bookmarks, String impersonatedUser )
{
CompletableFuture