-
Notifications
You must be signed in to change notification settings - Fork 157
API updates #325
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API updates #325
Changes from 14 commits
34378fc
75abc84
3046a10
bb2ea6c
f9c144e
927b6b5
69555d0
40df5df
9776f6e
4c8f409
2b565b7
de1b84c
6c9eac7
fd1a1ad
2714f6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,9 +18,13 @@ | |
| */ | ||
| package org.neo4j.driver.internal; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
|
|
||
| import org.neo4j.driver.internal.retry.RetryDecision; | ||
| import org.neo4j.driver.internal.retry.RetryLogic; | ||
| import org.neo4j.driver.internal.spi.Connection; | ||
| import org.neo4j.driver.internal.spi.ConnectionProvider; | ||
| import org.neo4j.driver.internal.spi.PooledConnection; | ||
|
|
@@ -37,13 +41,15 @@ | |
| import org.neo4j.driver.v1.Values; | ||
| import org.neo4j.driver.v1.exceptions.ClientException; | ||
| import org.neo4j.driver.v1.types.TypeSystem; | ||
| import org.neo4j.driver.v1.util.Function; | ||
|
|
||
| import static org.neo4j.driver.v1.Values.value; | ||
|
|
||
| public class NetworkSession implements Session, SessionResourcesHandler | ||
| { | ||
| private final ConnectionProvider connectionProvider; | ||
| private final AccessMode mode; | ||
| private final RetryLogic<RetryDecision> retryLogic; | ||
| protected final Logger logger; | ||
|
|
||
| private String lastBookmark; | ||
|
|
@@ -52,10 +58,12 @@ public class NetworkSession implements Session, SessionResourcesHandler | |
|
|
||
| private final AtomicBoolean isOpen = new AtomicBoolean( true ); | ||
|
|
||
| public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, Logging logging ) | ||
| public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic<RetryDecision> retryLogic, | ||
| Logging logging ) | ||
| { | ||
| this.connectionProvider = connectionProvider; | ||
| this.mode = mode; | ||
| this.retryLogic = retryLogic; | ||
| this.logger = logging.getLog( "Session-" + hashCode() ); | ||
| } | ||
|
|
||
|
|
@@ -92,12 +100,13 @@ public StatementResult run( Statement statement ) | |
| ensureNoOpenTransactionBeforeRunningSession(); | ||
|
|
||
| syncAndCloseCurrentConnection(); | ||
| currentConnection = acquireConnection(); | ||
| currentConnection = acquireConnection( mode ); | ||
|
|
||
| return run( currentConnection, statement, this ); | ||
| } | ||
|
|
||
| public static StatementResult run( Connection connection, Statement statement, SessionResourcesHandler resourcesHandler ) | ||
| public static StatementResult run( Connection connection, Statement statement, | ||
| SessionResourcesHandler resourcesHandler ) | ||
| { | ||
| InternalStatementResult result = new InternalStatementResult( connection, resourcesHandler, null, statement ); | ||
| connection.run( statement.text(), statement.parameters().asMap( Values.ofValue() ), | ||
|
|
@@ -116,7 +125,7 @@ public synchronized void reset() | |
| if ( currentTransaction != null ) | ||
| { | ||
| currentTransaction.markToClose(); | ||
| lastBookmark = currentTransaction.bookmark(); | ||
| setLastBookmark( currentTransaction.bookmark() ); | ||
| currentTransaction = null; | ||
| } | ||
| if ( currentConnection != null ) | ||
|
|
@@ -155,28 +164,41 @@ public void close() | |
| } | ||
| } | ||
| } | ||
|
|
||
| syncAndCloseCurrentConnection(); | ||
| } | ||
|
|
||
| @Override | ||
| public Transaction beginTransaction() | ||
| public synchronized Transaction beginTransaction() | ||
| { | ||
| return beginTransaction( null ); | ||
| return beginTransaction( mode ); | ||
| } | ||
|
|
||
| @Override | ||
| public synchronized Transaction beginTransaction( String bookmark ) | ||
| { | ||
| ensureSessionIsOpen(); | ||
| ensureNoOpenTransactionBeforeOpeningTransaction(); | ||
| setLastBookmark( bookmark ); | ||
| return beginTransaction(); | ||
| } | ||
|
|
||
| syncAndCloseCurrentConnection(); | ||
| currentConnection = acquireConnection(); | ||
| @Override | ||
| public <T> T readTransaction( Function<Transaction,T> work ) | ||
| { | ||
| return transaction( AccessMode.READ, work ); | ||
| } | ||
|
|
||
| currentTransaction = new ExplicitTransaction( currentConnection, this, bookmark ); | ||
| currentConnection.setResourcesHandler( this ); | ||
| return currentTransaction; | ||
| @Override | ||
| public <T> T writeTransaction( Function<Transaction,T> work ) | ||
| { | ||
| return transaction( AccessMode.WRITE, work ); | ||
| } | ||
|
|
||
| void setLastBookmark( String bookmark ) | ||
| { | ||
| if ( bookmark != null ) | ||
| { | ||
| lastBookmark = bookmark; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -203,7 +225,7 @@ public synchronized void onTransactionClosed( ExplicitTransaction tx ) | |
| if ( currentTransaction != null && currentTransaction == tx ) | ||
| { | ||
| closeCurrentConnection(); | ||
| lastBookmark = currentTransaction.bookmark(); | ||
| setLastBookmark( currentTransaction.bookmark() ); | ||
| currentTransaction = null; | ||
| } | ||
| } | ||
|
|
@@ -225,6 +247,47 @@ public synchronized void onConnectionError( boolean recoverable ) | |
| } | ||
| } | ||
|
|
||
| private synchronized <T> T transaction( AccessMode mode, Function<Transaction,T> work ) | ||
| { | ||
| RetryDecision decision = null; | ||
| List<Throwable> errors = null; | ||
|
|
||
| while ( true ) | ||
| { | ||
| try ( Transaction tx = beginTransaction( mode ) ) | ||
| { | ||
| return work.apply( tx ); | ||
| } | ||
| catch ( Throwable newError ) | ||
| { | ||
| decision = retryLogic.apply( newError, decision ); | ||
|
|
||
| if ( decision.shouldRetry() ) | ||
| { | ||
| errors = recordError( newError, errors ); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just directly chain the errors? why we have to first remember them (in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Printed stacktrace of a chain of suppressed methods looks quite strange, every next suppressed trace is shifted to the right. With this approach all previous errors will be on the same level :) |
||
| } | ||
| else | ||
| { | ||
| addSuppressed( newError, errors ); | ||
| throw newError; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private synchronized Transaction beginTransaction( AccessMode mode ) | ||
| { | ||
| ensureSessionIsOpen(); | ||
| ensureNoOpenTransactionBeforeOpeningTransaction(); | ||
|
|
||
| syncAndCloseCurrentConnection(); | ||
| currentConnection = acquireConnection( mode ); | ||
|
|
||
| currentTransaction = new ExplicitTransaction( currentConnection, this, lastBookmark ); | ||
| currentConnection.setResourcesHandler( this ); | ||
| return currentTransaction; | ||
| } | ||
|
|
||
| private void ensureNoUnrecoverableError() | ||
| { | ||
| if ( currentConnection != null && currentConnection.hasUnrecoverableErrors() ) | ||
|
|
@@ -268,7 +331,7 @@ private void ensureSessionIsOpen() | |
| } | ||
| } | ||
|
|
||
| private PooledConnection acquireConnection() | ||
| private PooledConnection acquireConnection( AccessMode mode ) | ||
| { | ||
| PooledConnection connection = connectionProvider.acquireConnection( mode ); | ||
| logger.debug( "Acquired connection " + connection.hashCode() ); | ||
|
|
@@ -312,4 +375,28 @@ private void closeCurrentConnection( boolean sync ) | |
| logger.debug( "Released connection " + connection.hashCode() ); | ||
| } | ||
| } | ||
|
|
||
| private static List<Throwable> recordError( Throwable error, List<Throwable> errors ) | ||
| { | ||
| if ( errors == null ) | ||
| { | ||
| errors = new ArrayList<>(); | ||
| } | ||
| errors.add( error ); | ||
| return errors; | ||
| } | ||
|
|
||
| private static void addSuppressed( Throwable error, List<Throwable> suppressedErrors ) | ||
| { | ||
| if ( suppressedErrors != null ) | ||
| { | ||
| for ( Throwable suppressedError : suppressedErrors ) | ||
| { | ||
| if ( error != suppressedError ) | ||
| { | ||
| error.addSuppressed( suppressedError ); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we disallow a user to set null bookmark? (It does not make many sense, but maybe a user wants to break CC from this tx?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I just tried to use same
#setLastBookmark()method everywhere and it prohibits nulls. Will fix.