Skip to content

Commit 7c99f03

Browse files
committed
streams: interactive transactions and support
The main purpose of streams is transactions via iproto. Since v. 2.10.0, Tarantool supports streams and interactive transactions over them. Each stream can start its own transaction, so they allows multiplexing several transactions over one connection. API for this feature is the following: * `NewStream()` method to create a stream object for `Connection` and `NewStream(userMode Mode)` method to create a stream object for `ConnectionPool` * stream object `Stream` with `Do()`, `DoAsync()`, `DoTyped()`, `Begin()`, `Commit()`, `Rollback()` methods, `Begin()` - start transaction via iproto stream; `Commit()` - commit transaction; `Rollback()` - rollback transaction. Closes #101
1 parent 61d0739 commit 7c99f03

File tree

9 files changed

+816
-22
lines changed

9 files changed

+816
-22
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1515
- Public API with request object types (#126)
1616
- Support decimal type in msgpack (#96)
1717
- Support datetime type in msgpack (#118)
18+
- Streams and interactive transactions support (#101)
1819

1920
### Changed
2021

connection.go

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
)
2020

2121
const requestsMap = 128
22+
const defaultStreamId = 0
2223
const (
2324
connDisconnected = 0
2425
connConnected = 1
@@ -139,6 +140,8 @@ type Connection struct {
139140
state uint32
140141
dec *msgpack.Decoder
141142
lenbuf [PacketLengthBytes]byte
143+
144+
lastStreamId uint32
142145
}
143146

144147
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
@@ -468,15 +471,17 @@ func (conn *Connection) dial() (err error) {
468471
return
469472
}
470473

471-
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res SchemaResolver) (err error) {
474+
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
475+
req Request, streamId uint32, res SchemaResolver) (err error) {
472476
hl := h.Len()
473477
h.Write([]byte{
474478
0xce, 0, 0, 0, 0, // Length.
475-
0x82, // 2 element map.
479+
0x83, // 3 element map.
476480
KeyCode, byte(req.Code()), // Request code.
477481
KeySync, 0xce,
478482
byte(reqid >> 24), byte(reqid >> 16),
479483
byte(reqid >> 8), byte(reqid),
484+
KeyStreamId, byte(streamId),
480485
})
481486

482487
if err = req.Body(res, enc); err != nil {
@@ -495,7 +500,7 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res Sch
495500
func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
496501
var packet smallWBuf
497502
req := newAuthRequest(conn.opts.User, string(scramble))
498-
err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, conn.Schema)
503+
err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, defaultStreamId, conn.Schema)
499504

500505
if err != nil {
501506
return errors.New("auth: pack error " + err.Error())
@@ -785,16 +790,16 @@ func (conn *Connection) newFuture() (fut *Future) {
785790
return
786791
}
787792

788-
func (conn *Connection) send(req Request) *Future {
793+
func (conn *Connection) send(req Request, streamId uint32) *Future {
789794
fut := conn.newFuture()
790795
if fut.ready == nil {
791796
return fut
792797
}
793-
conn.putFuture(fut, req)
798+
conn.putFuture(fut, req, streamId)
794799
return fut
795800
}
796801

797-
func (conn *Connection) putFuture(fut *Future, req Request) {
802+
func (conn *Connection) putFuture(fut *Future, req Request, streamId uint32) {
798803
shardn := fut.requestId & (conn.opts.Concurrency - 1)
799804
shard := &conn.shard[shardn]
800805
shard.bufmut.Lock()
@@ -811,7 +816,7 @@ func (conn *Connection) putFuture(fut *Future, req Request) {
811816
}
812817
blen := shard.buf.Len()
813818
reqid := fut.requestId
814-
if err := pack(&shard.buf, shard.enc, reqid, req, conn.Schema); err != nil {
819+
if err := pack(&shard.buf, shard.enc, reqid, req, streamId, conn.Schema); err != nil {
815820
shard.buf.Trunc(blen)
816821
shard.bufmut.Unlock()
817822
if f := conn.fetchFuture(reqid); f == fut {
@@ -1011,7 +1016,7 @@ func (conn *Connection) DoTyped(req Request, result interface{}) error {
10111016
// An error is returned if the request was formed incorrectly, or failed to
10121017
// create the future.
10131018
func (conn *Connection) DoAsync(req Request) *Future {
1014-
return conn.send(req)
1019+
return conn.send(req, defaultStreamId)
10151020
}
10161021

10171022
// ConfiguredTimeout returns a timeout from connection config.
@@ -1027,3 +1032,16 @@ func (conn *Connection) OverrideSchema(s *Schema) {
10271032
conn.Schema = s
10281033
}
10291034
}
1035+
1036+
// NewStream creates new Stream object for connection.
1037+
//
1038+
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
1039+
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
1040+
// Since 1.7.0
1041+
func (conn *Connection) NewStream() *Stream {
1042+
conn.lastStreamId += 1
1043+
return &Stream{
1044+
Id: conn.lastStreamId,
1045+
Conn: conn,
1046+
}
1047+
}

connection_pool/connection_pool.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,20 @@ func (connPool *ConnectionPool) DoAsync(req tarantool.Request, userMode Mode) *t
554554
return conn.DoAsync(req)
555555
}
556556

557+
// NewStream creates new Stream object for connection selected
558+
// by userMode from connPool.
559+
//
560+
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
561+
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
562+
// Since 1.7.0
563+
func (connPool *ConnectionPool) NewStream(userMode Mode) (*tarantool.Stream, error) {
564+
conn, err := connPool.getNextConnection(userMode)
565+
if err != nil {
566+
return nil, err
567+
}
568+
return conn.NewStream(), nil
569+
}
570+
557571
//
558572
// private
559573
//

connection_pool/connection_pool_test.go

Lines changed: 217 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ var defaultTimeoutRetry = 500 * time.Millisecond
3939

4040
var instances []test_helpers.TarantoolInstance
4141

42+
var tarantoolVersionIsLess bool
43+
4244
func TestConnError_IncorrectParams(t *testing.T) {
4345
connPool, err := connection_pool.Connect([]string{}, tarantool.Opts{})
4446
require.Nilf(t, connPool, "conn is not nil with incorrect param")
@@ -1291,6 +1293,203 @@ func TestDo(t *testing.T) {
12911293
require.NotNilf(t, resp, "response is nil after Ping")
12921294
}
12931295

1296+
func TestStream_Commit(t *testing.T) {
1297+
var req tarantool.Request
1298+
var resp *tarantool.Response
1299+
var err error
1300+
1301+
// Tarantool supports streams and interactive transactions since version 2.10.0
1302+
if tarantoolVersionIsLess {
1303+
t.Skip()
1304+
}
1305+
1306+
roles := []bool{true, true, false, true, true}
1307+
1308+
err = test_helpers.SetClusterRO(servers, connOpts, roles)
1309+
require.Nilf(t, err, "fail to set roles for cluster")
1310+
1311+
connPool, err := connection_pool.Connect(servers, connOpts)
1312+
require.Nilf(t, err, "failed to connect")
1313+
require.NotNilf(t, connPool, "conn is nil after Connect")
1314+
defer connPool.Close()
1315+
1316+
stream, err := connPool.NewStream(connection_pool.PreferRW)
1317+
require.Nilf(t, err, "failed to create stream")
1318+
require.NotNilf(t, connPool, "stream is nil after NewStream")
1319+
1320+
// Begin transaction
1321+
resp, err = stream.Begin()
1322+
require.Nilf(t, err, "failed to Begin")
1323+
require.NotNilf(t, resp, "response is nil after Begin")
1324+
require.Equalf(t, uint32(0), resp.Code, "failed to Begin: wrong code returned")
1325+
1326+
// Insert in stream
1327+
req = tarantool.NewInsertRequest(spaceName).
1328+
Tuple([]interface{}{"commit_key", "commit_value"})
1329+
resp, err = stream.Do(req)
1330+
require.Nilf(t, err, "failed to Insert")
1331+
require.NotNilf(t, resp, "response is nil after Insert")
1332+
require.Equalf(t, uint32(0), resp.Code, "failed to Insert: wrong code returned")
1333+
1334+
// Connect to servers[2] to check if tuple
1335+
// was inserted outside of stream on RW instance
1336+
// before transaction commit
1337+
conn, err := tarantool.Connect(servers[2], connOpts)
1338+
require.Nilf(t, err, "failed to connect %s", servers[2])
1339+
require.NotNilf(t, conn, "conn is nil after Connect")
1340+
1341+
// Select not related to the transaction
1342+
// while transaction is not committed
1343+
// result of select is empty
1344+
req = tarantool.NewSelectRequest(spaceNo).
1345+
Index(indexNo).
1346+
Offset(0).
1347+
Limit(1).
1348+
Iterator(tarantool.IterEq).
1349+
Key([]interface{}{"commit_key"})
1350+
resp, err = conn.Do(req)
1351+
require.Nilf(t, err, "failed to Select")
1352+
require.NotNilf(t, resp, "response is nil after Select")
1353+
require.Equalf(t, 0, len(resp.Data), "response Data len != 0")
1354+
1355+
// Select in stream
1356+
resp, err = stream.Do(req)
1357+
require.Nilf(t, err, "failed to Select")
1358+
require.NotNilf(t, resp, "response is nil after Select")
1359+
require.Equalf(t, 1, len(resp.Data), "response Body len != 1 after Select")
1360+
1361+
tpl, ok := resp.Data[0].([]interface{})
1362+
require.Truef(t, ok, "unexpected body of Select")
1363+
require.Equalf(t, 2, len(tpl), "unexpected body of Select")
1364+
1365+
key, ok := tpl[0].(string)
1366+
require.Truef(t, ok, "unexpected body of Select (0)")
1367+
require.Equalf(t, "commit_key", key, "unexpected body of Select (0)")
1368+
1369+
value, ok := tpl[1].(string)
1370+
require.Truef(t, ok, "unexpected body of Select (1)")
1371+
require.Equalf(t, "commit_value", value, "unexpected body of Select (1)")
1372+
1373+
// Commit transaction
1374+
resp, err = stream.Commit()
1375+
require.Nilf(t, err, "failed to Commit")
1376+
require.NotNilf(t, resp, "response is nil after Commit")
1377+
require.Equalf(t, uint32(0), resp.Code, "failed to Commit: wrong code returned")
1378+
1379+
// Select outside of transaction
1380+
resp, err = conn.Do(req)
1381+
require.Nilf(t, err, "failed to Select")
1382+
require.NotNilf(t, resp, "response is nil after Select")
1383+
require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select")
1384+
1385+
tpl, ok = resp.Data[0].([]interface{})
1386+
require.Truef(t, ok, "unexpected body of Select")
1387+
require.Equalf(t, 2, len(tpl), "unexpected body of Select")
1388+
1389+
key, ok = tpl[0].(string)
1390+
require.Truef(t, ok, "unexpected body of Select (0)")
1391+
require.Equalf(t, "commit_key", key, "unexpected body of Select (0)")
1392+
1393+
value, ok = tpl[1].(string)
1394+
require.Truef(t, ok, "unexpected body of Select (1)")
1395+
require.Equalf(t, "commit_value", value, "unexpected body of Select (1)")
1396+
}
1397+
1398+
func TestStream_Rollback(t *testing.T) {
1399+
var req tarantool.Request
1400+
var resp *tarantool.Response
1401+
var err error
1402+
1403+
// Tarantool supports streams and interactive transactions since version 2.10.0
1404+
if tarantoolVersionIsLess {
1405+
t.Skip()
1406+
}
1407+
1408+
roles := []bool{true, true, false, true, true}
1409+
1410+
err = test_helpers.SetClusterRO(servers, connOpts, roles)
1411+
require.Nilf(t, err, "fail to set roles for cluster")
1412+
1413+
connPool, err := connection_pool.Connect(servers, connOpts)
1414+
require.Nilf(t, err, "failed to connect")
1415+
require.NotNilf(t, connPool, "conn is nil after Connect")
1416+
defer connPool.Close()
1417+
1418+
stream, err := connPool.NewStream(connection_pool.PreferRW)
1419+
require.Nilf(t, err, "failed to create stream")
1420+
require.NotNilf(t, connPool, "stream is nil after NewStream")
1421+
1422+
// Begin transaction
1423+
resp, err = stream.Begin()
1424+
require.Nilf(t, err, "failed to Begin")
1425+
require.NotNilf(t, resp, "response is nil after Begin")
1426+
require.Equalf(t, uint32(0), resp.Code, "failed to Begin: wrong code returned")
1427+
1428+
// Insert in stream
1429+
req = tarantool.NewInsertRequest(spaceName).
1430+
Tuple([]interface{}{"rollback_key", "rollback_value"})
1431+
resp, err = stream.Do(req)
1432+
require.Nilf(t, err, "failed to Insert")
1433+
require.NotNilf(t, resp, "response is nil after Insert")
1434+
require.Equalf(t, uint32(0), resp.Code, "failed to Insert: wrong code returned")
1435+
1436+
// Connect to servers[2] to check if tuple
1437+
// was not inserted outside of stream on RW instance
1438+
conn, err := tarantool.Connect(servers[2], connOpts)
1439+
require.Nilf(t, err, "failed to connect %s", servers[2])
1440+
require.NotNilf(t, conn, "conn is nil after Connect")
1441+
1442+
// Select not related to the transaction
1443+
// while transaction is not committed
1444+
// result of select is empty
1445+
req = tarantool.NewSelectRequest(spaceNo).
1446+
Index(indexNo).
1447+
Offset(0).
1448+
Limit(1).
1449+
Iterator(tarantool.IterEq).
1450+
Key([]interface{}{"rollback_key"})
1451+
resp, err = conn.Do(req)
1452+
require.Nilf(t, err, "failed to Select")
1453+
require.NotNilf(t, resp, "response is nil after Select")
1454+
require.Equalf(t, 0, len(resp.Data), "response Data len != 0")
1455+
1456+
// Select in stream
1457+
resp, err = stream.Do(req)
1458+
require.Nilf(t, err, "failed to Select")
1459+
require.NotNilf(t, resp, "response is nil after Select")
1460+
require.Equalf(t, 1, len(resp.Data), "response Body len != 1 after Select")
1461+
1462+
tpl, ok := resp.Data[0].([]interface{})
1463+
require.Truef(t, ok, "unexpected body of Select")
1464+
require.Equalf(t, 2, len(tpl), "unexpected body of Select")
1465+
1466+
key, ok := tpl[0].(string)
1467+
require.Truef(t, ok, "unexpected body of Select (0)")
1468+
require.Equalf(t, "rollback_key", key, "unexpected body of Select (0)")
1469+
1470+
value, ok := tpl[1].(string)
1471+
require.Truef(t, ok, "unexpected body of Select (1)")
1472+
require.Equalf(t, "rollback_value", value, "unexpected body of Select (1)")
1473+
1474+
// Rollback transaction
1475+
resp, err = stream.Rollback()
1476+
require.Nilf(t, err, "failed to Rollback")
1477+
require.NotNilf(t, resp, "response is nil after Rollback")
1478+
require.Equalf(t, uint32(0), resp.Code, "failed to Rollback: wrong code returned")
1479+
1480+
// Select outside of transaction
1481+
resp, err = conn.Do(req)
1482+
require.Nilf(t, err, "failed to Select")
1483+
require.NotNilf(t, resp, "response is nil after Select")
1484+
require.Equalf(t, 0, len(resp.Data), "response Body len != 0 after Select")
1485+
1486+
// Select inside of stream after rollback
1487+
resp, err = stream.Do(req)
1488+
require.Nilf(t, err, "failed to Select")
1489+
require.NotNilf(t, resp, "response is nil after Select")
1490+
require.Equalf(t, 0, len(resp.Data), "response Body len != 0 after Select")
1491+
}
1492+
12941493
// runTestMain is a body of TestMain function
12951494
// (see https://pkg.go.dev/testing#hdr-Main).
12961495
// Using defer + os.Exit is not works so TestMain body
@@ -1307,13 +1506,25 @@ func runTestMain(m *testing.M) int {
13071506
"work_dir5"}
13081507
var err error
13091508

1509+
memtxUseMvccEngine := true
1510+
1511+
// Tarantool supports streams and interactive transactions since version 2.10.0
1512+
tarantoolVersionIsLess, err = test_helpers.IsTarantoolVersionLess(2, 10, 0)
1513+
if err != nil {
1514+
log.Fatalf("Could not check the Tarantool version")
1515+
}
1516+
if tarantoolVersionIsLess {
1517+
memtxUseMvccEngine = false
1518+
}
1519+
13101520
instances, err = test_helpers.StartTarantoolInstances(servers, workDirs, test_helpers.StartOpts{
1311-
InitScript: initScript,
1312-
User: connOpts.User,
1313-
Pass: connOpts.Pass,
1314-
WaitStart: waitStart,
1315-
ConnectRetry: connectRetry,
1316-
RetryTimeout: retryTimeout,
1521+
InitScript: initScript,
1522+
User: connOpts.User,
1523+
Pass: connOpts.Pass,
1524+
WaitStart: waitStart,
1525+
ConnectRetry: connectRetry,
1526+
RetryTimeout: retryTimeout,
1527+
MemtxUseMvccEngine: memtxUseMvccEngine,
13171528
})
13181529

13191530
if err != nil {

const.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@ const (
1212
UpsertRequestCode = 9
1313
Call17RequestCode = 10 /* call in >= 1.7 format */
1414
ExecuteRequestCode = 11
15+
BeginRequestCode = 14
16+
CommitRequestCode = 15
17+
RollbackRequestCode = 16
1518
PingRequestCode = 64
1619
SubscribeRequestCode = 66
1720

1821
KeyCode = 0x00
1922
KeySync = 0x01
23+
KeyStreamId = 0x0a
2024
KeySpaceNo = 0x10
2125
KeyIndexNo = 0x11
2226
KeyLimit = 0x12

0 commit comments

Comments
 (0)