Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions neo4j/bookmarks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package neo4j

/*
Bookmarks is a holder for server-side bookmarks which are used for causally chaining sessions.
See also CombineBookmarks.
Note:
Will be changed from being a type alias to being a struct in 6.0. Please use BookmarksFromRawValues for construction
from raw values and BookmarksToRawValues for accessing the raw values.
*/
type Bookmarks = []string

/*
CombineBookmarks is a helper method to combine []Bookmarks into a single Bookmarks instance.
Let s1, s2, s3 be Session interfaces. You can easily causally chain the sessions like so:
s4 := driver.NewSession(neo4j.SessionConfig{
Bookmarks: neo4j.CombineBookmarks(s1.LastBookmarks(), s2.LastBookmarks(), s3.LastBookmarks()),
})
The server will then make sure to execute all transactions in s4 after any that were already executed in s1, s2, or s3
at the time of calling LastBookmarks.
*/
func CombineBookmarks(bookmarks... Bookmarks) Bookmarks {
var lenSum int
for _, b := range bookmarks {
lenSum += len(b)
}
res := make([]string, lenSum)
var i int
for _, b := range bookmarks {
i += copy(res[i:], b)
}
return res
}

/*
BookmarksToRawValues exposes the raw server-side bookmarks.
You should not need to use this method unless you want to serialize bookmarks.
See Session.LastBookmarks and CombineBookmarks for alternatives.
*/
func BookmarksToRawValues(bookmarks Bookmarks) []string {
return bookmarks
}

/*
BookmarksFromRawValues creates Bookmarks from raw server-side bookmarks.
You should not need to use this method unless you want to de-serialize bookmarks.
See Session.LastBookmarks and CombineBookmarks for alternatives.
*/
func BookmarksFromRawValues(values... string) Bookmarks {
return values
}
32 changes: 32 additions & 0 deletions neo4j/bookmarks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package neo4j

import (
"testing"
"testing/quick"
)

func TestCombineBookmarks(t *testing.T) {
f := func(slices []Bookmarks) bool {
concatenation := CombineBookmarks(slices...)
totalLen := 0
for _, s := range slices {
totalLen += len(s)
}
if totalLen != len(concatenation) {
return false
}
i := 0
for _, slice := range slices {
for _, str := range slice {
if str != concatenation[i] {
return false
}
i += 1
}
}
return true
}
if err := quick.Check(f, nil); err != nil {
t.Error(err)
}
}
2 changes: 1 addition & 1 deletion neo4j/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func TestDriverSessionCreation(t *testing.T) {
name string
testing string
mode AccessMode
bookmarks []string
bookmarks Bookmarks
}{
{"Write", "bolt://localhost:7687", AccessModeWrite, []string(nil)},
{"Read", "bolt://localhost:7687", AccessModeRead, []string(nil)},
Expand Down
25 changes: 23 additions & 2 deletions neo4j/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,15 @@ type TransactionWork func(tx Transaction) (interface{}, error)
// Session represents a logical connection (which is not tied to a physical connection)
// to the server
type Session interface {
// LastBookmarks returns the bookmark received following the last successfully completed transaction.
// If no bookmark was received or if this transaction was rolled back, the initial set of bookmarks will be
// returned.
LastBookmarks() Bookmarks
// LastBookmark returns the bookmark received following the last successfully completed transaction.
// If no bookmark was received or if this transaction was rolled back, the bookmark value will not be changed.
// Deprecated: since version 5.0. Will be removed in 6.0. Use LastBookmarks instead.
// Warning: this method can lead to unexpected behaviour if the session has not yet successfully completed a
// transaction.
LastBookmark() string
// BeginTransaction starts a new explicit transaction on this session
BeginTransaction(configurers ...func(*TransactionConfig)) (Transaction, error)
Expand All @@ -55,15 +62,15 @@ type Session interface {
// SessionConfig is used to configure a new session, its zero value uses safe defaults.
type SessionConfig struct {
// AccessMode used when using Session.Run and explicit transactions. Used to route query to
// to read or write servers when running in a cluster. Session.ReadTransaction and Session.WriteTransaction
// read or write servers when running in a cluster. Session.ReadTransaction and Session.WriteTransaction
// does not rely on this mode.
AccessMode AccessMode
// Bookmarks are the initial bookmarks used to ensure that the executing server is at least up
// to date to the point represented by the latest of the provided bookmarks. After running commands
// on the session the bookmark can be retrieved with Session.LastBookmark. All commands executing
// within the same session will automatically use the bookmark from the previous command in the
// session.
Bookmarks []string
Bookmarks Bookmarks
// DatabaseName contains the name of the database that the commands in the session will execute on.
DatabaseName string
// FetchSize defines how many records to pull from server in each batch.
Expand Down Expand Up @@ -169,6 +176,16 @@ func newSession(config *Config, sessConfig SessionConfig, router sessionRouter,
}
}

func (s *session) LastBookmarks() Bookmarks {
// Pick up bookmark from pending auto-commit if there is a bookmark on it
if s.txAuto != nil {
s.retrieveBookmarks(s.txAuto.conn)
}

// Report bookmarks from previously closed connection or from initial set
return s.bookmarks
}

func (s *session) LastBookmark() string {
// Pick up bookmark from pending auto-commit if there is a bookmark on it
if s.txAuto != nil {
Expand Down Expand Up @@ -505,6 +522,10 @@ func (s *sessionWithError) LastBookmark() string {
return ""
}

func (s *sessionWithError) LastBookmarks() Bookmarks {
return []string{}
}

func (s *sessionWithError) BeginTransaction(configurers ...func(*TransactionConfig)) (Transaction, error) {
return nil, s.err
}
Expand Down
30 changes: 15 additions & 15 deletions neo4j/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestSession(st *testing.T) {
return &router, &pool, sess
}

createSessionWithBookmarks := func(bookmarks []string) (*RouterFake, *PoolFake, *session) {
createSessionWithBookmarks := func(bookmarks Bookmarks) (*RouterFake, *PoolFake, *session) {
sessConfig := SessionConfig{AccessMode: AccessModeRead, Bookmarks: bookmarks, BoltLogger: &boltLogger}
return createSessionFromConfig(sessConfig)
}
Expand Down Expand Up @@ -165,14 +165,14 @@ func TestSession(st *testing.T) {
})

st.Run("Bookmarking", func(bt *testing.T) {
bt.Run("Initial bookmark is used for LastBookmark", func(t *testing.T) {
_, _, sess := createSessionWithBookmarks([]string{"b1", "b2"})
AssertStringEqual(t, sess.LastBookmark(), "b2")
bt.Run("Initial bookmarks are returned from LastBookmarks", func(t *testing.T) {
_, _, sess := createSessionWithBookmarks(BookmarksFromRawValues("b1", "b2"))
AssertDeepEquals(t, sess.LastBookmarks(), BookmarksFromRawValues("b1", "b2"))
})

bt.Run("Initial bookmarks are used and cleaned up before usage", func(t *testing.T) {
dirtyBookmarks := []string{"", "b1", "", "b2", ""}
cleanBookmarks := []string{"b1", "b2"}
dirtyBookmarks := BookmarksFromRawValues("", "b1", "", "b2", "")
cleanBookmarks := BookmarksFromRawValues("b1", "b2")
_, pool, sess := createSessionWithBookmarks(dirtyBookmarks)
err := errors.New("make all fail")
conn := &ConnFake{Alive: true, RunErr: err, TxBeginErr: err}
Expand All @@ -194,9 +194,9 @@ func TestSession(st *testing.T) {
}
})

bt.Run("LastBookmark is empty when no initial bookmark", func(t *testing.T) {
bt.Run("LastBookmarks is empty when no initial bookmark", func(t *testing.T) {
_, _, sess := createSession()
AssertStringEqual(t, sess.LastBookmark(), "")
AssertLen(t, sess.LastBookmarks(), 0)
})
})

Expand Down Expand Up @@ -256,17 +256,17 @@ func TestSession(st *testing.T) {

sess.Run("cypher", nil)
AssertIntEqual(t, bufferCalls, 0)
AssertStringEqual(t, sess.LastBookmark(), "")
AssertLen(t, sess.LastBookmarks(), 0)
// Should call Buffer on connection to ensure that first Run is buffered and
// it's bookmark retrieved
sess.Run("cypher", nil)
AssertStringEqual(t, sess.LastBookmark(), "1")
AssertDeepEquals(t, BookmarksToRawValues(sess.LastBookmarks()), []string{"1"})
result, _ := sess.Run("cypher", nil)
AssertStringEqual(t, sess.LastBookmark(), "2")
AssertDeepEquals(t, BookmarksToRawValues(sess.LastBookmarks()), []string{"2"})
// And finally consuming the last result should give a new bookmark
AssertIntEqual(t, consumeCalls, 0)
result.Consume()
AssertStringEqual(t, sess.LastBookmark(), "1")
AssertDeepEquals(t, BookmarksToRawValues(sess.LastBookmarks()), []string{"1"})
})

bt.Run("Pending and invoke tx function", func(t *testing.T) {
Expand All @@ -291,7 +291,7 @@ func TestSession(st *testing.T) {
if !reflect.DeepEqual([]string{"1"}, rtx.Bookmarks) {
t.Errorf("Using unclean or no bookmarks: %+v", rtx)
}
AssertStringEqual(t, sess.LastBookmark(), "1")
AssertDeepEquals(t, BookmarksToRawValues(sess.LastBookmarks()), []string{"1"})
AssertIntEqual(t, bufferCalls, 1)
})

Expand All @@ -315,7 +315,7 @@ func TestSession(st *testing.T) {
if !reflect.DeepEqual([]string{"1"}, rtx.Bookmarks) {
t.Errorf("Using unclean or no bookmarks: %+v", rtx)
}
AssertStringEqual(t, sess.LastBookmark(), "1")
AssertDeepEquals(t, BookmarksToRawValues(sess.LastBookmarks()), []string{"1"})
AssertIntEqual(t, bufferCalls, 1)
})

Expand Down Expand Up @@ -477,7 +477,7 @@ func TestSession(st *testing.T) {
// Begin and commit a transaction on the session
tx, _ := sess.BeginTransaction()
tx.Commit()
AssertStringEqual(t, sess.LastBookmark(), bookmark)
AssertDeepEquals(t, BookmarksToRawValues(sess.LastBookmarks()), []string{bookmark})
// The bookmark should be used in next transaction
tx, _ = sess.BeginTransaction()
AssertLen(t, conn.RecordedTxs, 2)
Expand Down
Loading