From 3abf92574d3f8009a2fd6405ea07a31c909fcc88 Mon Sep 17 00:00:00 2001 From: Nigel Small Date: Wed, 31 Aug 2016 14:56:18 +0100 Subject: [PATCH 1/5] Bookmark in BEGIN --- ...nsaction.java => ExplicitTransaction.java} | 24 +++++- .../driver/internal/InternalSession.java | 21 ++++-- .../java/org/neo4j/driver/v1/Session.java | 75 ++++++++++++------- ...Test.java => ExplicitTransactionTest.java} | 8 +- .../driver/v1/util/TestNeo4jSession.java | 6 ++ 5 files changed, 92 insertions(+), 42 deletions(-) rename driver/src/main/java/org/neo4j/driver/internal/{InternalTransaction.java => ExplicitTransaction.java} (90%) rename driver/src/test/java/org/neo4j/driver/internal/{InternalTransactionTest.java => ExplicitTransactionTest.java} (94%) diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java similarity index 90% rename from driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java rename to driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index bc8b20942e..f149a86f07 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -34,10 +34,13 @@ import org.neo4j.driver.v1.exceptions.Neo4jException; import org.neo4j.driver.v1.types.TypeSystem; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; + import static org.neo4j.driver.v1.Values.ofValue; import static org.neo4j.driver.v1.Values.value; -public class InternalTransaction implements Transaction +class ExplicitTransaction implements Transaction { private enum State { @@ -68,13 +71,26 @@ private enum State private State state = State.ACTIVE; - public InternalTransaction( Connection conn, Runnable cleanup ) + ExplicitTransaction( Connection conn, Runnable cleanup ) + { + this( conn, cleanup, null ); + } + + ExplicitTransaction( Connection conn, Runnable cleanup, String bookmark ) { this.conn = conn; this.cleanup = cleanup; - // Note there is no sync here, so this will just value queued locally - conn.run( "BEGIN", Collections.emptyMap(), StreamCollector.NO_OP ); + final Map parameters; + if ( bookmark == null ) + { + parameters = emptyMap(); + } + else + { + parameters = singletonMap( "bookmark", value( bookmark ) ); + } + conn.run( "BEGIN", parameters, StreamCollector.NO_OP ); conn.discardAll(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java index 209860f34e..1cc1a15e87 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java @@ -52,7 +52,7 @@ public void run() } }; - private InternalTransaction currentTransaction; + private ExplicitTransaction currentTransaction; private AtomicBoolean isOpen = new AtomicBoolean( true ); public InternalSession( Connection connection, Logger logger ) @@ -152,17 +152,24 @@ public String server() @Override public Transaction beginTransaction() + { + return beginTransaction( null ); + } + + @Override + public Transaction beginTransaction( String bookmark ) { ensureConnectionIsValidBeforeOpeningTransaction(); - currentTransaction = new InternalTransaction( connection, txCleanup ); - connection.onError( new Runnable() { + currentTransaction = new ExplicitTransaction( connection, txCleanup, bookmark ); + connection.onError( new Runnable() + { @Override public void run() { - //must check if transaction has been closed - if (currentTransaction != null) + // must check if transaction has been closed + if ( currentTransaction != null ) { - if( connection.hasUnrecoverableErrors() ) + if ( connection.hasUnrecoverableErrors() ) { currentTransaction.markToClose(); } @@ -172,7 +179,7 @@ public void run() } } } - }); + } ); return currentTransaction; } diff --git a/driver/src/main/java/org/neo4j/driver/v1/Session.java b/driver/src/main/java/org/neo4j/driver/v1/Session.java index 4444bec2b5..0b22a03ed4 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Session.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Session.java @@ -1,41 +1,57 @@ /** * Copyright (c) 2002-2016 "Neo Technology," * Network Engine for Objects in Lund AB [http://neotechnology.com] - * + *

* This file is part of Neo4j. - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.neo4j.driver.v1; import org.neo4j.driver.v1.util.Resource; /** - * A live session with a Neo4j instance. + * A Session hosts a series of {@linkplain Transaction transactions} + * carried out against a database. Within the database, all statements are + * carried out within a transaction. Within application code, however, it is + * not always necessary to explicitly {@link #beginTransaction() begin a + * transaction}. If a statement is {@link #run} directly against a {@link + * Session}, the server will automatically BEGIN and + * COMMIT that statement within its own transaction. This type + * of transaction is known as an autocommit transaction. *

- * Sessions serve two purposes. For one, they are an optimization. By keeping state on the database side, we can - * avoid re-transmitting certain metadata over and over. + * Explicit transactions allow multiple statements to be committed as part of + * a single atomic operation and can be rolled back if necessary. They can also + * be used to ensure causal consistency, meaning that an application + * can run a series of queries on different members of a cluster, while + * ensuring that each query sees the state of graph at least as up-to-date as + * the graph seen by the previous query. For more on causal consistency, see + * the Neo4j clustering manual. *

- * Sessions also serve a role in transaction isolation and ordering semantics. Neo4j requires - * "sticky sessions", meaning all requests within one session must always go to the same Neo4j instance. + * Typically, a session will wrap a TCP connection. Such a connection will be + * acquired from a connection pool and released back there when the session is + * destroyed. One connection can therefore be adopted by many sessions, + * although by only one at a time. Application code should never need to deal + * directly with connection management. *

- * Session objects are not thread safe, if you want to run concurrent operations against the database, - * simply create multiple sessions objects. - * - *

Important note on semantics

- * - * Please see the section under {@link StatementRunner} for an important overview of the guarantees - * the session gives you around when statements are executed. + * A session inherits its destination address and permissions from its + * underlying connection. This means that one session may only ever target one + * machine within a cluster and does not support re-authentication. To achieve + * otherwise requires creation of a separate session. + *

+ * Similarly, multiple sessions should be used when working with concurrency; + * session implementations are generally not thread safe. * * @since 1.0 */ @@ -44,20 +60,25 @@ public interface Session extends Resource, StatementRunner String LOG_NAME = "session"; /** - * Begin a new transaction in this session. A session can have at most one transaction running at a time, if you - * want to run multiple concurrent transactions, you should use multiple concurrent sessions. - *

- * All data operations in Neo4j are transactional. However, for convenience we provide a {@link #run(String)} - * method directly on this session interface as well. When you use that method, your statement automatically gets - * wrapped in a transaction. - *

- * If you want to run multiple statements in the same transaction, you should wrap them in a transaction using this - * method. + * Begin a new explicit {@linkplain Transaction transaction}. At + * most one transaction may exist in a session at any point in time. To + * maintain multiple concurrent transactions, use multiple concurrent + * sessions. * - * @return a new transaction + * @return a new {@link Transaction} */ Transaction beginTransaction(); + /** + * Begin a new explicit {@linkplain Transaction transaction}, + * requiring that the server hosting is at least as up-to-date as the + * transaction referenced by the supplied bookmark. + * + * @param bookmark a reference to a previous transaction + * @return a new {@link Transaction} + */ + Transaction beginTransaction( String bookmark ); + /** * Reset the current session. This sends an immediate RESET signal to the server which both interrupts * any statement that is currently executing and ignores any subsequently queued statements. Following diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java similarity index 94% rename from driver/src/test/java/org/neo4j/driver/internal/InternalTransactionTest.java rename to driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java index 2754b0d020..7f4bb03de2 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/InternalTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java @@ -33,7 +33,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -public class InternalTransactionTest +public class ExplicitTransactionTest { @Test public void shouldRollbackOnImplicitFailure() throws Throwable @@ -42,7 +42,7 @@ public void shouldRollbackOnImplicitFailure() throws Throwable Connection conn = mock( Connection.class ); when( conn.isOpen() ).thenReturn( true ); Runnable cleanup = mock( Runnable.class ); - InternalTransaction tx = new InternalTransaction( conn, cleanup ); + ExplicitTransaction tx = new ExplicitTransaction( conn, cleanup ); // When tx.close(); @@ -66,7 +66,7 @@ public void shouldRollbackOnExplicitFailure() throws Throwable Connection conn = mock( Connection.class ); when( conn.isOpen() ).thenReturn( true ); Runnable cleanup = mock( Runnable.class ); - InternalTransaction tx = new InternalTransaction( conn, cleanup ); + ExplicitTransaction tx = new ExplicitTransaction( conn, cleanup ); // When tx.failure(); @@ -92,7 +92,7 @@ public void shouldCommitOnSuccess() throws Throwable Connection conn = mock( Connection.class ); when( conn.isOpen() ).thenReturn( true ); Runnable cleanup = mock( Runnable.class ); - InternalTransaction tx = new InternalTransaction( conn, cleanup ); + ExplicitTransaction tx = new ExplicitTransaction( conn, cleanup ); // When tx.success(); diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java index 8c3f648010..d26b319439 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java @@ -106,6 +106,12 @@ public Transaction beginTransaction() return realSession.beginTransaction(); } + @Override + public Transaction beginTransaction( String bookmark ) + { + return realSession.beginTransaction( bookmark ); + } + @Override public void reset() { From 37fad843586a25664c7f78fc7e38e13fddf1a07f Mon Sep 17 00:00:00 2001 From: Nigel Small Date: Thu, 1 Sep 2016 13:43:49 +0100 Subject: [PATCH 2/5] Receive bookmark on COMMIT --- .../driver/internal/BookmarkCollector.java | 38 +++++++++++++++++ .../neo4j/driver/internal/DirectDriver.java | 2 +- .../driver/internal/ExplicitTransaction.java | 28 +++++++++---- .../internal/InternalStatementResult.java | 34 +++++++++------ ...ternalSession.java => NetworkSession.java} | 23 ++++++++--- .../net/ConcurrencyGuardingConnection.java | 10 ++--- .../driver/internal/net/SocketConnection.java | 28 ++++++------- .../internal/net/SocketResponseHandler.java | 41 +++++++++++-------- .../net/pooling/PooledConnection.java | 14 +++---- .../PooledConnectionReleaseConsumer.java | 6 +-- .../{StreamCollector.java => Collector.java} | 23 +++++++---- .../neo4j/driver/internal/spi/Connection.java | 14 +++---- .../internal/summary/SummaryBuilder.java | 10 ++++- .../java/org/neo4j/driver/v1/Session.java | 10 +++++ .../internal/ExplicitTransactionTest.java | 27 ++++++------ .../neo4j/driver/internal/ExtractTest.java | 2 +- .../internal/InternalStatementResultTest.java | 2 +- ...ssionTest.java => NetworkSessionTest.java} | 8 ++-- .../ConcurrencyGuardingConnectionTest.java | 2 +- .../net/LoggingResponseHandlerTest.java | 10 ++--- .../net/SocketResponseHandlerTest.java | 4 +- .../pooling/ConnectionInvalidationTest.java | 16 ++++---- .../driver/v1/util/TestNeo4jSession.java | 6 +++ 23 files changed, 232 insertions(+), 126 deletions(-) create mode 100644 driver/src/main/java/org/neo4j/driver/internal/BookmarkCollector.java rename driver/src/main/java/org/neo4j/driver/internal/{InternalSession.java => NetworkSession.java} (93%) rename driver/src/main/java/org/neo4j/driver/internal/spi/{StreamCollector.java => Collector.java} (89%) rename driver/src/test/java/org/neo4j/driver/internal/{InternalSessionTest.java => NetworkSessionTest.java} (93%) diff --git a/driver/src/main/java/org/neo4j/driver/internal/BookmarkCollector.java b/driver/src/main/java/org/neo4j/driver/internal/BookmarkCollector.java new file mode 100644 index 0000000000..f40ed6e7cf --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/BookmarkCollector.java @@ -0,0 +1,38 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import org.neo4j.driver.internal.spi.Collector.NoOperationCollector; + +class BookmarkCollector extends NoOperationCollector +{ + private final ExplicitTransaction transaction; + + BookmarkCollector( ExplicitTransaction transaction ) + { + this.transaction = transaction; + } + + @Override + public void bookmark( String bookmark ) + { + transaction.setBookmark( bookmark ); + } + +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java b/driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java index 94c56c1f13..5c6c24dbdf 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java @@ -45,7 +45,7 @@ public DirectDriver( BoltServerAddress address, ConnectionSettings connectionSet @Override public Session session() { - return new InternalSession( connections.acquire( address ), log ); + return new NetworkSession( connections.acquire( address ), log ); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index f149a86f07..db61edf2d5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -22,7 +22,7 @@ import java.util.Map; import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.internal.spi.StreamCollector; +import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.types.InternalTypeSystem; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Statement; @@ -69,6 +69,7 @@ private enum State private final Runnable cleanup; private final Connection conn; + private String bookmark = null; private State state = State.ACTIVE; ExplicitTransaction( Connection conn, Runnable cleanup ) @@ -90,8 +91,8 @@ private enum State { parameters = singletonMap( "bookmark", value( bookmark ) ); } - conn.run( "BEGIN", parameters, StreamCollector.NO_OP ); - conn.discardAll(); + conn.run( "BEGIN", parameters, Collector.NO_OP ); + conn.discardAll( Collector.NO_OP ); } @Override @@ -121,15 +122,15 @@ public void close() { if ( state == State.MARKED_SUCCESS ) { - conn.run( "COMMIT", Collections.emptyMap(), StreamCollector.NO_OP ); - conn.discardAll(); + conn.run( "COMMIT", Collections.emptyMap(), Collector.NO_OP ); + conn.discardAll( new BookmarkCollector( this ) ); conn.sync(); state = State.SUCCEEDED; } else if ( state == State.MARKED_FAILED || state == State.ACTIVE ) { - conn.run( "ROLLBACK", Collections.emptyMap(), StreamCollector.NO_OP ); - conn.discardAll(); + conn.run( "ROLLBACK", Collections.emptyMap(), Collector.NO_OP ); + conn.discardAll( new BookmarkCollector( this ) ); conn.sync(); state = State.ROLLED_BACK; } @@ -175,7 +176,7 @@ public StatementResult run( Statement statement ) try { - InternalStatementResult cursor = new InternalStatementResult( conn, statement ); + InternalStatementResult cursor = new InternalStatementResult( conn, this, statement ); conn.run( statement.text(), statement.parameters().asMap( ofValue() ), cursor.runResponseCollector() ); @@ -220,4 +221,15 @@ public void markToClose() { state = State.FAILED; } + + public String bookmark() + { + return bookmark; + } + + void setBookmark( String bookmark ) + { + this.bookmark = bookmark; + } + } diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java index 1303e9ce80..d28eabe25b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java @@ -25,7 +25,7 @@ import java.util.Queue; import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.internal.spi.StreamCollector; +import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.summary.SummaryBuilder; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Statement; @@ -47,8 +47,8 @@ public class InternalStatementResult implements StatementResult { private final Connection connection; - private final StreamCollector runResponseCollector; - private final StreamCollector pullAllResponseCollector; + private final Collector runResponseCollector; + private final Collector pullAllResponseCollector; private final Queue recordBuffer = new LinkedList<>(); private List keys = null; @@ -57,16 +57,16 @@ public class InternalStatementResult implements StatementResult private long position = -1; private boolean done = false; - public InternalStatementResult( Connection connection, Statement statement ) + InternalStatementResult( Connection connection, ExplicitTransaction transaction, Statement statement ) { this.connection = connection; this.runResponseCollector = newRunResponseCollector(); - this.pullAllResponseCollector = newPullAllResponseCollector( statement ); + this.pullAllResponseCollector = newStreamResponseCollector( transaction, statement ); } - private StreamCollector newRunResponseCollector() + private Collector newRunResponseCollector() { - return new StreamCollector.NoOperationStreamCollector() + return new Collector.NoOperationCollector() { @Override public void keys( String[] names ) @@ -91,11 +91,11 @@ public void resultAvailableAfter( long l ) }; } - private StreamCollector newPullAllResponseCollector( Statement statement ) + private Collector newStreamResponseCollector( final ExplicitTransaction transaction, final Statement statement ) { final SummaryBuilder summaryBuilder = new SummaryBuilder( statement ); - return new StreamCollector.NoOperationStreamCollector() + return new Collector.NoOperationCollector() { @Override public void record( Value[] fields ) @@ -134,7 +134,17 @@ public void notifications( List notifications ) } @Override - public void done() { + public void bookmark( String bookmark ) + { + if ( transaction != null ) + { + transaction.setBookmark( bookmark ); + } + } + + @Override + public void done() + { summary = summaryBuilder.build(); done = true; } @@ -153,12 +163,12 @@ public void resultConsumedAfter(long l) }; } - StreamCollector runResponseCollector() + Collector runResponseCollector() { return runResponseCollector; } - StreamCollector pullAllResponseCollector() + Collector pullAllResponseCollector() { return pullAllResponseCollector; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java similarity index 93% rename from driver/src/main/java/org/neo4j/driver/internal/InternalSession.java rename to driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 1cc1a15e87..c56d1a3f7b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -36,26 +36,31 @@ import static org.neo4j.driver.v1.Values.value; -public class InternalSession implements Session +public class NetworkSession implements Session { private final Connection connection; - private final Logger logger; - /** Called when a transaction object is closed */ + private String lastBookmark = null; + + // Called when a transaction object is closed private final Runnable txCleanup = new Runnable() { @Override public void run() { - currentTransaction = null; + if ( currentTransaction != null ) + { + lastBookmark = currentTransaction.bookmark(); + currentTransaction = null; + } } }; private ExplicitTransaction currentTransaction; private AtomicBoolean isOpen = new AtomicBoolean( true ); - public InternalSession( Connection connection, Logger logger ) + NetworkSession( Connection connection, Logger logger ) { this.connection = connection; this.logger = logger; @@ -91,7 +96,7 @@ public StatementResult run( String statementText, Value statementParameters ) public StatementResult run( Statement statement ) { ensureConnectionIsValidBeforeRunningSession(); - InternalStatementResult cursor = new InternalStatementResult( connection, statement ); + InternalStatementResult cursor = new InternalStatementResult( connection, null, statement ); connection.run( statement.text(), statement.parameters().asMap( Values.ofValue() ), cursor.runResponseCollector() ); connection.pullAll( cursor.pullAllResponseCollector() ); connection.flush(); @@ -183,6 +188,12 @@ public void run() return currentTransaction; } + @Override + public String lastBookmark() + { + return lastBookmark; + } + @Override public TypeSystem typeSystem() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java index 5d197c2441..6e162c56b7 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.internal.spi.StreamCollector; +import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; @@ -58,7 +58,7 @@ public void init( String clientName, Map authToken ) @Override public void run( String statement, Map parameters, - StreamCollector collector ) + Collector collector ) { try { @@ -72,12 +72,12 @@ public void run( String statement, Map parameters, } @Override - public void discardAll() + public void discardAll( Collector collector ) { try { markAsInUse(); - delegate.discardAll(); + delegate.discardAll( collector ); } finally { @@ -86,7 +86,7 @@ public void discardAll() } @Override - public void pullAll( StreamCollector collector ) + public void pullAll( Collector collector ) { try { diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java index 673cfe8e5c..5dc460bf22 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java @@ -27,11 +27,10 @@ import org.neo4j.driver.internal.messaging.InitMessage; import org.neo4j.driver.internal.messaging.Message; -import org.neo4j.driver.internal.messaging.PullAllMessage; import org.neo4j.driver.internal.messaging.RunMessage; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.internal.spi.StreamCollector; +import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Value; @@ -40,6 +39,7 @@ import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE; import static org.neo4j.driver.internal.messaging.DiscardAllMessage.DISCARD_ALL; +import static org.neo4j.driver.internal.messaging.PullAllMessage.PULL_ALL; import static org.neo4j.driver.internal.messaging.ResetMessage.RESET; public class SocketConnection implements Connection @@ -47,7 +47,7 @@ public class SocketConnection implements Connection private final Queue pendingMessages = new LinkedList<>(); private final SocketResponseHandler responseHandler; private AtomicBoolean interrupted = new AtomicBoolean( false ); - private final StreamCollector.InitStreamCollector initStreamCollector = new StreamCollector.InitStreamCollector(); + private final Collector.InitCollector initCollector = new Collector.InitCollector(); private final SocketClient socket; @@ -71,38 +71,38 @@ public SocketConnection( BoltServerAddress address, SecurityPlan securityPlan, L @Override public void init( String clientName, Map authToken ) { - queueMessage( new InitMessage( clientName, authToken ), initStreamCollector ); + queueMessage( new InitMessage( clientName, authToken ), initCollector ); sync(); } @Override - public void run( String statement, Map parameters, StreamCollector collector ) + public void run( String statement, Map parameters, Collector collector ) { queueMessage( new RunMessage( statement, parameters ), collector ); } @Override - public void discardAll() + public void discardAll( Collector collector ) { - queueMessage( DISCARD_ALL, StreamCollector.NO_OP ); + queueMessage( DISCARD_ALL, collector ); } @Override - public void pullAll( StreamCollector collector ) + public void pullAll( Collector collector ) { - queueMessage( PullAllMessage.PULL_ALL, collector ); + queueMessage( PULL_ALL, collector ); } @Override public void reset() { - queueMessage( RESET, StreamCollector.RESET ); + queueMessage( RESET, Collector.RESET ); } @Override public void ackFailure() { - queueMessage( ACK_FAILURE, StreamCollector.ACK_FAILURE ); + queueMessage( ACK_FAILURE, Collector.ACK_FAILURE ); } @Override @@ -180,7 +180,7 @@ else if ( e instanceof SocketTimeoutException ) } } - private void queueMessage( Message msg, StreamCollector collector ) + private void queueMessage( Message msg, Collector collector ) { pendingMessages.add( msg ); responseHandler.appendResultCollector( collector ); @@ -215,7 +215,7 @@ public void resetAsync() { if( interrupted.compareAndSet( false, true ) ) { - queueMessage( RESET, new StreamCollector.ResetStreamCollector( new Runnable() + queueMessage( RESET, new Collector.ResetCollector( new Runnable() { @Override public void run() @@ -236,6 +236,6 @@ public boolean isInterrupted() @Override public String server() { - return initStreamCollector.server( ); + return initCollector.server( ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java index cff56c80b3..c3eb327114 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java @@ -23,7 +23,7 @@ import java.util.Queue; import org.neo4j.driver.internal.messaging.MessageHandler; -import org.neo4j.driver.internal.spi.StreamCollector; +import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.summary.InternalNotification; import org.neo4j.driver.internal.summary.InternalPlan; import org.neo4j.driver.internal.summary.InternalProfiledPlan; @@ -39,7 +39,7 @@ public class SocketResponseHandler implements MessageHandler { - private final Queue collectors = new LinkedList<>(); + private final Queue collectors = new LinkedList<>(); /** If a failure occurs, the error gets stored here */ private Neo4jException error; @@ -52,14 +52,14 @@ public int collectorsWaiting() @Override public void handleRecordMessage( Value[] fields ) { - StreamCollector collector = collectors.element(); + Collector collector = collectors.element(); collector.record( fields ); } @Override public void handleFailureMessage( String code, String message ) { - StreamCollector collector = collectors.remove(); + Collector collector = collectors.remove(); String[] parts = code.split( "\\." ); String classification = parts[1]; switch ( classification ) @@ -83,7 +83,7 @@ public void handleFailureMessage( String code, String message ) @Override public void handleSuccessMessage( Map meta ) { - StreamCollector collector = collectors.remove(); + Collector collector = collectors.remove(); collectServer( collector, meta.get( "server" )); collectFields( collector, meta.get( "fields" ) ); collectType( collector, meta.get( "type" ) ); @@ -93,10 +93,11 @@ public void handleSuccessMessage( Map meta ) collectNotifications( collector, meta.get( "notifications" ) ); collectResultAvailableAfter( collector, meta.get("result_available_after")); collectResultConsumedAfter( collector, meta.get("result_consumed_after")); + collectBookmark( collector, meta.get( "bookmark" ) ); collector.doneSuccess(); } - private void collectServer( StreamCollector collector, Value server ) + private void collectServer( Collector collector, Value server ) { if (server != null) { @@ -104,7 +105,7 @@ private void collectServer( StreamCollector collector, Value server ) } } - private void collectResultAvailableAfter( StreamCollector collector, Value resultAvailableAfter ) + private void collectResultAvailableAfter( Collector collector, Value resultAvailableAfter ) { if (resultAvailableAfter != null) { @@ -112,7 +113,7 @@ private void collectResultAvailableAfter( StreamCollector collector, Value resul } } - private void collectResultConsumedAfter( StreamCollector collector, Value resultConsumedAfter ) + private void collectResultConsumedAfter( Collector collector, Value resultConsumedAfter ) { if (resultConsumedAfter != null) { @@ -120,7 +121,7 @@ private void collectResultConsumedAfter( StreamCollector collector, Value result } } - private void collectNotifications( StreamCollector collector, Value notifications ) + private void collectNotifications( Collector collector, Value notifications ) { if ( notifications != null ) { @@ -130,7 +131,7 @@ private void collectNotifications( StreamCollector collector, Value notification } } - private void collectPlan( StreamCollector collector, Value plan ) + private void collectPlan( Collector collector, Value plan ) { if ( plan != null ) { @@ -138,7 +139,7 @@ private void collectPlan( StreamCollector collector, Value plan ) } } - private void collectProfile( StreamCollector collector, Value plan ) + private void collectProfile( Collector collector, Value plan ) { if ( plan != null ) { @@ -146,7 +147,7 @@ private void collectProfile( StreamCollector collector, Value plan ) } } - private void collectFields( StreamCollector collector, Value fieldValue ) + private void collectFields( Collector collector, Value fieldValue ) { if ( fieldValue != null ) { @@ -163,7 +164,7 @@ private void collectFields( StreamCollector collector, Value fieldValue ) } } - private void collectType( StreamCollector collector, Value type ) + private void collectType( Collector collector, Value type ) { if ( type != null ) { @@ -171,7 +172,7 @@ private void collectType( StreamCollector collector, Value type ) } } - private void collectStatistics( StreamCollector collector, Value stats ) + private void collectStatistics( Collector collector, Value stats ) { if ( stats != null ) { @@ -193,6 +194,14 @@ private void collectStatistics( StreamCollector collector, Value stats ) } } + private void collectBookmark( Collector collector, Value bookmark ) + { + if ( bookmark != null ) + { + collector.bookmark( bookmark.asString() ); + } + } + private int statsValue( Value stats, String name ) { Value value = stats.get( name ); @@ -202,7 +211,7 @@ private int statsValue( Value stats, String name ) @Override public void handleIgnoredMessage() { - StreamCollector collector = collectors.remove(); + Collector collector = collectors.remove(); if (collector != null) { collector.doneIgnored(); @@ -245,7 +254,7 @@ public void handleRunMessage( String statement, Map parameters ) } - public void appendResultCollector( StreamCollector collector ) + public void appendResultCollector( Collector collector ) { assert collector != null; diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java index c4c9406761..3b22643a2b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java @@ -21,7 +21,7 @@ import java.util.Map; import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.internal.spi.StreamCollector; +import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Consumer; import org.neo4j.driver.v1.Value; @@ -85,7 +85,7 @@ public void init( String clientName, Map authToken ) @Override public void run( String statement, Map parameters, - StreamCollector collector ) + Collector collector ) { try { @@ -98,26 +98,26 @@ public void run( String statement, Map parameters, } @Override - public void discardAll() + public void discardAll( Collector collector ) { try { - delegate.discardAll(); + delegate.discardAll( collector ); } - catch(RuntimeException e) + catch ( RuntimeException e ) { onDelegateException( e ); } } @Override - public void pullAll( StreamCollector collector ) + public void pullAll( Collector collector ) { try { delegate.pullAll( collector ); } - catch(RuntimeException e) + catch ( RuntimeException e ) { onDelegateException( e ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java index 9c9b852892..cfbd1ff827 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java @@ -23,7 +23,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import org.neo4j.driver.internal.spi.StreamCollector; +import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.util.Consumer; import org.neo4j.driver.v1.Value; @@ -117,8 +117,8 @@ private boolean ping( PooledConnection conn ) { try { - conn.run( "RETURN 1 // JavaDriver poll to test connection", NO_PARAMETERS, StreamCollector.NO_OP ); - conn.pullAll( StreamCollector.NO_OP ); + conn.run( "RETURN 1 // JavaDriver poll to test connection", NO_PARAMETERS, Collector.NO_OP ); + conn.pullAll( Collector.NO_OP ); conn.sync(); return true; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/StreamCollector.java b/driver/src/main/java/org/neo4j/driver/internal/spi/Collector.java similarity index 89% rename from driver/src/main/java/org/neo4j/driver/internal/spi/StreamCollector.java rename to driver/src/main/java/org/neo4j/driver/internal/spi/Collector.java index a00d755595..d6a5a20405 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/StreamCollector.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/Collector.java @@ -29,11 +29,11 @@ import org.neo4j.driver.v1.summary.StatementType; import org.neo4j.driver.v1.summary.SummaryCounters; -public interface StreamCollector +public interface Collector { - StreamCollector NO_OP = new NoOperationStreamCollector(); + Collector NO_OP = new NoOperationCollector(); - StreamCollector ACK_FAILURE = new NoOperationStreamCollector() + Collector ACK_FAILURE = new NoOperationCollector() { @Override public void doneFailure( Neo4jException error ) @@ -50,7 +50,7 @@ public void doneIgnored() } }; - class InitStreamCollector extends NoOperationStreamCollector + class InitCollector extends NoOperationCollector { private String server; @Override @@ -72,18 +72,18 @@ public String server() } } - StreamCollector RESET = new ResetStreamCollector(); + Collector RESET = new ResetCollector(); - class ResetStreamCollector extends NoOperationStreamCollector + class ResetCollector extends NoOperationCollector { private final Runnable doneSuccessCallBack; - public ResetStreamCollector() + public ResetCollector() { this( null ); } - public ResetStreamCollector( Runnable doneSuccessCallBack ) + public ResetCollector( Runnable doneSuccessCallBack ) { this.doneSuccessCallBack = doneSuccessCallBack; } @@ -113,7 +113,7 @@ public void doneSuccess() } - class NoOperationStreamCollector implements StreamCollector + class NoOperationCollector implements Collector { @Override public void keys( String[] names ) {} @@ -136,6 +136,9 @@ public void profile( ProfiledPlan plan ) {} @Override public void notifications( List notifications ) {} + @Override + public void bookmark( String bookmark ) {} + @Override public void done() {} @@ -183,6 +186,8 @@ public void server( String server ){} void notifications( List notifications ); + void bookmark( String bookmark ); + void done(); void doneSuccess(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java index 3f5807476c..616f2289b9 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java @@ -40,30 +40,30 @@ public interface Connection extends AutoCloseable * for retrieval. * @param parameters a map value of parameters */ - void run( String statement, Map parameters, StreamCollector collector ); + void run( String statement, Map parameters, Collector collector ); /** * Queue a discard all action, consuming any items left in the current stream.This will - * close the stream once its completed, allowing another {@link #run(String, java.util.Map, StreamCollector) run} + * close the stream once its completed, allowing another {@link #run(String, java.util.Map, Collector) run} */ - void discardAll(); + void discardAll( Collector collector ); /** * Queue a pull-all action, output will be handed to the collector once the pull starts. This will - * close the stream once its completed, allowing another {@link #run(String, java.util.Map, StreamCollector) run} + * close the stream once its completed, allowing another {@link #run(String, java.util.Map, Collector) run} */ - void pullAll( StreamCollector collector ); + void pullAll( Collector collector ); /** * Queue a reset action, throw {@link org.neo4j.driver.v1.exceptions.ClientException} if an ignored message is received. This will - * close the stream once its completed, allowing another {@link #run(String, java.util.Map, StreamCollector) run} + * close the stream once its completed, allowing another {@link #run(String, java.util.Map, Collector) run} */ void reset(); /** * Queue a ack_failure action, valid output could only be success. Throw {@link org.neo4j.driver.v1.exceptions.ClientException} if * a failure or ignored message is received. This will close the stream once it is completed, allowing another - * {@link #run(String, java.util.Map, StreamCollector) run} + * {@link #run(String, java.util.Map, Collector) run} */ void ackFailure(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/summary/SummaryBuilder.java b/driver/src/main/java/org/neo4j/driver/internal/summary/SummaryBuilder.java index dd5bbc942c..15bd3e1ba1 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/summary/SummaryBuilder.java +++ b/driver/src/main/java/org/neo4j/driver/internal/summary/SummaryBuilder.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; -import org.neo4j.driver.internal.spi.StreamCollector; +import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.v1.Statement; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; @@ -34,7 +34,7 @@ import org.neo4j.driver.v1.summary.StatementType; import org.neo4j.driver.v1.summary.SummaryCounters; -public class SummaryBuilder implements StreamCollector +public class SummaryBuilder implements Collector { private final Statement statement; @@ -127,6 +127,12 @@ public void notifications( List notifications ) } } + @Override + public void bookmark( String bookmark ) + { + // intentionally empty + } + @Override public void done() { diff --git a/driver/src/main/java/org/neo4j/driver/v1/Session.java b/driver/src/main/java/org/neo4j/driver/v1/Session.java index 0b22a03ed4..47d930fc33 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Session.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Session.java @@ -79,6 +79,16 @@ public interface Session extends Resource, StatementRunner */ Transaction beginTransaction( String bookmark ); + /** + * Return the bookmark received following the last completed + * {@linkplain Transaction transaction}. If no bookmark was received + * or if this transaction was rolled back, the bookmark value will + * be null. + * + * @return a reference to a previous transaction + */ + String lastBookmark(); + /** * Reset the current session. This sends an immediate RESET signal to the server which both interrupts * any statement that is currently executing and ignores any subsequently queued statements. Following diff --git a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java index 7f4bb03de2..7477e41e1a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java @@ -24,9 +24,10 @@ import java.util.Collections; import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.internal.spi.StreamCollector; +import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.v1.Value; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -49,11 +50,11 @@ public void shouldRollbackOnImplicitFailure() throws Throwable // Then InOrder order = inOrder( conn ); - order.verify( conn ).run( "BEGIN", Collections.emptyMap(), StreamCollector.NO_OP ); - order.verify( conn ).discardAll(); + order.verify( conn ).run( "BEGIN", Collections.emptyMap(), Collector.NO_OP ); + order.verify( conn ).discardAll( any( BookmarkCollector.class ) ); order.verify( conn ).isOpen(); - order.verify( conn ).run( "ROLLBACK", Collections.emptyMap(), StreamCollector.NO_OP ); - order.verify( conn ).discardAll(); + order.verify( conn ).run( "ROLLBACK", Collections.emptyMap(), Collector.NO_OP ); + order.verify( conn ).discardAll( any( BookmarkCollector.class ) ); order.verify( conn ).sync(); verify( cleanup ).run(); verifyNoMoreInteractions( conn, cleanup ); @@ -75,11 +76,11 @@ public void shouldRollbackOnExplicitFailure() throws Throwable // Then InOrder order = inOrder( conn ); - order.verify( conn ).run( "BEGIN", Collections.emptyMap(), StreamCollector.NO_OP); - order.verify( conn ).discardAll(); + order.verify( conn ).run( "BEGIN", Collections.emptyMap(), Collector.NO_OP ); + order.verify( conn ).discardAll( any( BookmarkCollector.class ) ); order.verify( conn ).isOpen(); - order.verify( conn ).run( "ROLLBACK", Collections.emptyMap(), StreamCollector.NO_OP); - order.verify( conn ).discardAll(); + order.verify( conn ).run( "ROLLBACK", Collections.emptyMap(), Collector.NO_OP ); + order.verify( conn ).discardAll( any( BookmarkCollector.class ) ); order.verify( conn ).sync(); verify( cleanup ).run(); verifyNoMoreInteractions( conn, cleanup ); @@ -101,11 +102,11 @@ public void shouldCommitOnSuccess() throws Throwable // Then InOrder order = inOrder( conn ); - order.verify( conn ).run( "BEGIN", Collections.emptyMap(), StreamCollector.NO_OP); - order.verify( conn ).discardAll(); + order.verify( conn ).run( "BEGIN", Collections.emptyMap(), Collector.NO_OP ); + order.verify( conn ).discardAll( any( BookmarkCollector.class ) ); order.verify( conn ).isOpen(); - order.verify( conn ).run( "COMMIT", Collections.emptyMap(), StreamCollector.NO_OP); - order.verify( conn ).discardAll(); + order.verify( conn ).run( "COMMIT", Collections.emptyMap(), Collector.NO_OP ); + order.verify( conn ).discardAll( any( BookmarkCollector.class ) ); order.verify( conn ).sync(); verify( cleanup ).run(); verifyNoMoreInteractions( conn, cleanup ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/ExtractTest.java b/driver/src/test/java/org/neo4j/driver/internal/ExtractTest.java index 07671f9228..f4cdef4438 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ExtractTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ExtractTest.java @@ -170,7 +170,7 @@ public void testFields() throws Exception Connection connection = mock( Connection.class ); String statement = ""; - InternalStatementResult cursor = new InternalStatementResult( connection, new Statement( statement ) ); + InternalStatementResult cursor = new InternalStatementResult( connection, null, new Statement( statement ) ); cursor.runResponseCollector().keys( new String[]{"k1"} ); cursor.runResponseCollector().done(); cursor.pullAllResponseCollector().record( new Value[]{value( 42 )} ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultTest.java b/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultTest.java index 3278ae86ed..0476f192c5 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultTest.java @@ -391,7 +391,7 @@ private StatementResult createResult( int numberOfRecords ) Connection connection = mock( Connection.class ); String statement = ""; - final InternalStatementResult cursor = new InternalStatementResult( connection, new Statement( statement ) ); + final InternalStatementResult cursor = new InternalStatementResult( connection, null, new Statement( statement ) ); // Each time the cursor calls `recieveOne`, we'll run one of these, // to emulate how messages are handed over to the cursor diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java similarity index 93% rename from driver/src/test/java/org/neo4j/driver/internal/InternalSessionTest.java rename to driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 1aa249b37d..6d2b56741b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/InternalSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -36,20 +36,20 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class InternalSessionTest +public class NetworkSessionTest { @Rule public ExpectedException exception = ExpectedException.none(); private final Connection mock = mock( Connection.class ); - private InternalSession sess = new InternalSession( mock, new DevNullLogger() ); + private NetworkSession sess = new NetworkSession( mock, new DevNullLogger() ); @Test public void shouldSendAllOnRun() throws Throwable { // Given when( mock.isOpen() ).thenReturn( true ); - InternalSession sess = new InternalSession( mock, new DevNullLogger() ); + NetworkSession sess = new NetworkSession( mock, new DevNullLogger() ); // When sess.run( "whatever" ); @@ -144,7 +144,7 @@ public void shouldNotAllowMoreTransactionsInSessionWhileConnectionClosed() throw public void shouldGetExceptionIfTryingToCloseSessionMoreThanOnce() throws Throwable { // Given - InternalSession sess = new InternalSession( mock(Connection.class), mock(Logger.class) ); + NetworkSession sess = new NetworkSession( mock(Connection.class), mock(Logger.class) ); try { sess.close(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnectionTest.java index d05653d827..8e3dc46844 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnectionTest.java @@ -120,7 +120,7 @@ public Void apply( Connection connection ) @Override public Void apply( Connection connection ) { - connection.discardAll(); + connection.discardAll(null); return null; } }; diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/LoggingResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/LoggingResponseHandlerTest.java index 24659dad55..7456e2100c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/LoggingResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/LoggingResponseHandlerTest.java @@ -33,7 +33,7 @@ import org.neo4j.driver.internal.messaging.ResetMessage; import org.neo4j.driver.internal.messaging.RunMessage; import org.neo4j.driver.internal.messaging.SuccessMessage; -import org.neo4j.driver.internal.spi.StreamCollector; +import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.v1.Value; import static org.junit.Assert.assertEquals; @@ -115,7 +115,7 @@ public void shouldLogAckFailureMessage() throws Throwable public void shouldLogSuccessMessage() throws Throwable { // When - handler.appendResultCollector( StreamCollector.NO_OP ); + handler.appendResultCollector( Collector.NO_OP ); handler.handleSuccessMessage( new HashMap() ); // Then @@ -127,7 +127,7 @@ public void shouldLogSuccessMessage() throws Throwable public void shouldLogRecordMessage() throws Throwable { // When - handler.appendResultCollector( StreamCollector.NO_OP ); + handler.appendResultCollector( Collector.NO_OP ); handler.handleRecordMessage( new Value[]{} ); // Then @@ -139,7 +139,7 @@ public void shouldLogRecordMessage() throws Throwable public void shouldLogFailureMessage() throws Throwable { // When - handler.appendResultCollector( StreamCollector.NO_OP ); + handler.appendResultCollector( Collector.NO_OP ); handler.handleFailureMessage( "code.error", "message" ); // Then @@ -151,7 +151,7 @@ public void shouldLogFailureMessage() throws Throwable public void shouldLogIgnoredMessage() throws Throwable { // When - handler.appendResultCollector( StreamCollector.NO_OP ); + handler.appendResultCollector( Collector.NO_OP ); handler.handleIgnoredMessage(); // Then diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/SocketResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/SocketResponseHandlerTest.java index 3aa5dbd1ee..53b1345bcf 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/SocketResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/SocketResponseHandlerTest.java @@ -24,7 +24,7 @@ import java.util.Collections; import java.util.Map; -import org.neo4j.driver.internal.spi.StreamCollector; +import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.summary.InternalSummaryCounters; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.summary.Plan; @@ -44,7 +44,7 @@ public class SocketResponseHandlerTest { private final SocketResponseHandler handler = new SocketResponseHandler(); - private final StreamCollector collector = mock( StreamCollector.class ); + private final Collector collector = mock( Collector.class ); @Before public void setup() diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java index 7222858c60..a57b0b1c4a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java @@ -26,10 +26,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import org.neo4j.driver.internal.net.pooling.PoolSettings; -import org.neo4j.driver.internal.net.pooling.PooledConnection; import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.internal.spi.StreamCollector; +import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Consumers; import org.neo4j.driver.v1.Config; @@ -67,7 +65,7 @@ public void shouldInvalidateConnectionThatIsOld() throws Throwable { // Given a connection that's broken Mockito.doThrow( new ClientException( "That didn't work" ) ) - .when( delegate ).run( anyString(), anyMap(), any( StreamCollector.class ) ); + .when( delegate ).run( anyString(), anyMap(), any( Collector.class ) ); PoolSettings poolSettings = PoolSettings.defaultSettings(); when( clock.millis() ).thenReturn( 0L, poolSettings.idleTimeBeforeConnectionTest() + 1L ); PooledConnection conn = new PooledConnection( delegate, Consumers.noOp(), clock ); @@ -87,7 +85,7 @@ public void shouldNotInvalidateConnectionThatIsNotOld() throws Throwable { // Given a connection that's broken Mockito.doThrow( new ClientException( "That didn't work" ) ) - .when( delegate ).run( anyString(), anyMap(), any( StreamCollector.class ) ); + .when( delegate ).run( anyString(), anyMap(), any( Collector.class ) ); Config config = Config.defaultConfig(); PoolSettings poolSettings = PoolSettings.defaultSettings(); when( clock.millis() ).thenReturn( 0L, poolSettings.idleTimeBeforeConnectionTest() - 1L ); @@ -145,12 +143,12 @@ public void shouldInvalidateOnProtocolViolationExceptions() throws Throwable private void assertUnrecoverable( Neo4jException exception ) { doThrow( exception ).when( delegate ) - .run( eq("assert unrecoverable"), anyMap(), any( StreamCollector.class ) ); + .run( eq("assert unrecoverable"), anyMap(), any( Collector.class ) ); // When try { - conn.run( "assert unrecoverable", new HashMap( ), StreamCollector.NO_OP ); + conn.run( "assert unrecoverable", new HashMap( ), Collector.NO_OP ); fail( "Should've rethrown exception" ); } catch ( Neo4jException e ) @@ -172,12 +170,12 @@ private void assertUnrecoverable( Neo4jException exception ) @SuppressWarnings( "unchecked" ) private void assertRecoverable( Neo4jException exception ) { - doThrow( exception ).when( delegate ).run( eq("assert recoverable"), anyMap(), any( StreamCollector.class ) ); + doThrow( exception ).when( delegate ).run( eq("assert recoverable"), anyMap(), any( Collector.class ) ); // When try { - conn.run( "assert recoverable", new HashMap( ), StreamCollector.NO_OP ); + conn.run( "assert recoverable", new HashMap( ), Collector.NO_OP ); fail( "Should've rethrown exception" ); } catch ( Neo4jException e ) diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java index d26b319439..c0884408e3 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java @@ -112,6 +112,12 @@ public Transaction beginTransaction( String bookmark ) return realSession.beginTransaction( bookmark ); } + @Override + public String lastBookmark() + { + return realSession.lastBookmark(); + } + @Override public void reset() { From 3d5502d586fe01183cbb1941e954684ba5c84f55 Mon Sep 17 00:00:00 2001 From: Nigel Small Date: Thu, 1 Sep 2016 14:19:52 +0100 Subject: [PATCH 3/5] Fixed server version --- .../src/test/java/org/neo4j/driver/v1/util/ServerVersion.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/ServerVersion.java b/driver/src/test/java/org/neo4j/driver/v1/util/ServerVersion.java index 01fb05879e..db9a7449a6 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/ServerVersion.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/ServerVersion.java @@ -39,7 +39,7 @@ private ServerVersion( int major, int minor, int patch ) this.patch = patch; } public static final ServerVersion v3_1_0 = new ServerVersion(3, 1, 0); - public static final ServerVersion v3_0_0 = new ServerVersion(3, 1, 0); + public static final ServerVersion v3_0_0 = new ServerVersion(3, 0, 0); public static ServerVersion version( String server ) { From f17f44709f1a8af985821dcfd49b61b8c1f6b0a5 Mon Sep 17 00:00:00 2001 From: Nigel Small Date: Mon, 5 Sep 2016 10:21:21 +0100 Subject: [PATCH 4/5] Added bookmark it --- .../driver/v1/integration/BookmarkIT.java | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 driver/src/test/java/org/neo4j/driver/v1/integration/BookmarkIT.java diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/BookmarkIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/BookmarkIT.java new file mode 100644 index 0000000000..bb7616a317 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/BookmarkIT.java @@ -0,0 +1,48 @@ +package org.neo4j.driver.v1.integration; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.util.ServerVersion; +import org.neo4j.driver.v1.util.TestNeo4jSession; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assume.assumeTrue; + +import static org.neo4j.driver.v1.util.ServerVersion.v3_1_0; + +public class BookmarkIT +{ + @Rule + public ExpectedException exception = ExpectedException.none(); + @Rule + public TestNeo4jSession session = new TestNeo4jSession(); + + @Before + public void assumeBookmarkSupport() + { + System.out.println(session.server()); + assumeTrue( ServerVersion.version( session.server() ).greaterThanOrEqual( v3_1_0 ) ); + } + + @Test + public void shouldReceiveBookmarkOnSuccessfulCommit() throws Throwable + { + // Given + assertNull( session.lastBookmark() ); + + // When + try ( Transaction tx = session.beginTransaction() ) + { + tx.run( "CREATE (a:Person)" ); + tx.success(); + } + + // Then + assertNotNull( session.lastBookmark() ); + } +} From 9a44a8148860940f89f9c687b0de62c4ac692e8c Mon Sep 17 00:00:00 2001 From: Nigel Small Date: Tue, 6 Sep 2016 10:23:38 +0100 Subject: [PATCH 5/5] Added license headers --- .../java/org/neo4j/driver/v1/Session.java | 10 ++++----- .../driver/v1/integration/BookmarkIT.java | 21 ++++++++++++++++++- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/v1/Session.java b/driver/src/main/java/org/neo4j/driver/v1/Session.java index 47d930fc33..a487f15ffc 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Session.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Session.java @@ -1,15 +1,15 @@ /** * Copyright (c) 2002-2016 "Neo Technology," * Network Engine for Objects in Lund AB [http://neotechnology.com] - *

+ * * This file is part of Neo4j. - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/BookmarkIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/BookmarkIT.java index bb7616a317..335884a1cb 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/BookmarkIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/BookmarkIT.java @@ -1,3 +1,22 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.neo4j.driver.v1.integration; import org.junit.Before; @@ -25,7 +44,7 @@ public class BookmarkIT @Before public void assumeBookmarkSupport() { - System.out.println(session.server()); + System.out.println( session.server() ); assumeTrue( ServerVersion.version( session.server() ).greaterThanOrEqual( v3_1_0 ) ); }