Skip to content

Commit c75e7fa

Browse files
committed
streams: interactive transactions and support
The main purpose of streams is transactions via iproto. Since v. 2.10.0, Tarantool supports streams and interactive transactions over them. Each stream can start its own transaction, so they allows multiplexing several transactions over one connection. API for this feature is the following: * `NewStream()` method to create a stream object for `Connection` and `NewStream(userMode Mode)` method to create a stream object for `ConnectionPool` * stream object `Stream` with `Do()`, `DoAsync()`, `DoTyped()`, `Begin()`, `Commit()`, `Rollback()` methods, `Begin()` - start transaction via iproto stream; `Commit()` - commit transaction; `Rollback()` - rollback transaction. Closes #101
1 parent daae235 commit c75e7fa

10 files changed

+785
-15
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ CI and documentation.
3636
- queue-utube handling (#85)
3737
- Master discovery (#113)
3838
- SQL support (#62)
39+
- Streams and interactive transactions support (#101)
3940

4041
### Changed
4142

connection.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ type Connection struct {
139139
state uint32
140140
dec *msgpack.Decoder
141141
lenbuf [PacketLengthBytes]byte
142+
143+
lastStreamId uint32
142144
}
143145

144146
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
@@ -472,11 +474,12 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res Sch
472474
hl := h.Len()
473475
h.Write([]byte{
474476
0xce, 0, 0, 0, 0, // Length.
475-
0x82, // 2 element map.
477+
0x83, // 3 element map.
476478
KeyCode, byte(req.Code()), // Request code.
477479
KeySync, 0xce,
478480
byte(reqid >> 24), byte(reqid >> 16),
479481
byte(reqid >> 8), byte(reqid),
482+
KeyStreamId, byte(req.StreamId()),
480483
})
481484

482485
if err = req.Body(res, enc); err != nil {
@@ -1027,3 +1030,15 @@ func (conn *Connection) OverrideSchema(s *Schema) {
10271030
conn.Schema = s
10281031
}
10291032
}
1033+
1034+
// NewStream creates new Stream object for connection.
1035+
//
1036+
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
1037+
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
1038+
func (conn *Connection) NewStream() *Stream {
1039+
conn.lastStreamId += 1
1040+
return &Stream{
1041+
Id: conn.lastStreamId,
1042+
Conn: conn,
1043+
}
1044+
}

connection_pool/connection_pool.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,19 @@ func (connPool *ConnectionPool) DoAsync(req tarantool.Request, userMode Mode) *t
512512
return conn.DoAsync(req)
513513
}
514514

515+
// NewStream creates new Stream object for connection selected
516+
// by userMode from connPool.
517+
//
518+
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
519+
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
520+
func (connPool *ConnectionPool) NewStream(userMode Mode) (*tarantool.Stream, error) {
521+
conn, err := connPool.getNextConnection(userMode)
522+
if err != nil {
523+
return nil, err
524+
}
525+
return conn.NewStream(), nil
526+
}
527+
515528
//
516529
// private
517530
//

connection_pool/connection_pool_test.go

Lines changed: 227 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1291,6 +1291,226 @@ func TestDo(t *testing.T) {
12911291
require.NotNilf(t, resp, "response is nil after Ping")
12921292
}
12931293

1294+
func TestStream_Commit(t *testing.T) {
1295+
var req tarantool.Request
1296+
var resp *tarantool.Response
1297+
1298+
// Tarantool supports streams and interactive transactions since version 2.10.0
1299+
isLess, err := test_helpers.IsTarantoolVersionLess(2, 10, 0)
1300+
if err != nil {
1301+
t.Fatal("Could not check the Tarantool version")
1302+
}
1303+
if isLess {
1304+
t.Skip()
1305+
}
1306+
1307+
roles := []bool{true, true, false, true, true}
1308+
1309+
err = test_helpers.SetClusterRO(servers, connOpts, roles)
1310+
require.Nilf(t, err, "fail to set roles for cluster")
1311+
1312+
connPool, err := connection_pool.Connect(servers, connOpts)
1313+
require.Nilf(t, err, "failed to connect")
1314+
require.NotNilf(t, connPool, "conn is nil after Connect")
1315+
defer connPool.Close()
1316+
1317+
stream, err := connPool.NewStream(connection_pool.PreferRW)
1318+
require.Nilf(t, err, "failed to create stream")
1319+
require.NotNilf(t, connPool, "stream is nil after NewStream")
1320+
1321+
// Begin transaction
1322+
resp, err = stream.Begin()
1323+
require.Nilf(t, err, "failed to Begin")
1324+
require.NotNilf(t, resp, "response is nil after Begin")
1325+
require.Equalf(t, uint32(0), resp.Code, "failed to Begin: wrong code returned")
1326+
1327+
// Insert in stream
1328+
req = tarantool.NewInsertRequest(spaceName).
1329+
Tuple([]interface{}{"commit_key", "commit_value"})
1330+
resp, err = stream.Do(req)
1331+
require.Nilf(t, err, "failed to Insert")
1332+
require.NotNilf(t, resp, "response is nil after Insert")
1333+
require.Equalf(t, uint32(0), resp.Code, "failed to Insert: wrong code returned")
1334+
1335+
// Connect to servers[2] to check if tuple
1336+
// was inserted on RW instance
1337+
conn, err := tarantool.Connect(servers[2], connOpts)
1338+
require.Nilf(t, err, "failed to connect %s", servers[2])
1339+
require.NotNilf(t, conn, "conn is nil after Connect")
1340+
1341+
// Select not related to the transaction
1342+
// while transaction is not commited
1343+
// result of select is empty
1344+
req = tarantool.NewSelectRequest(spaceNo).
1345+
Index(indexNo).
1346+
Offset(0).
1347+
Limit(1).
1348+
Iterator(tarantool.IterEq).
1349+
Key([]interface{}{"commit_key"})
1350+
resp, err = conn.Do(req)
1351+
require.Nilf(t, err, "failed to Select")
1352+
require.NotNilf(t, resp, "response is nil after Select")
1353+
require.Equalf(t, 0, len(resp.Data), "response Data len != 0")
1354+
1355+
// Select in stream
1356+
req = tarantool.NewSelectRequest(spaceNo).
1357+
Index(indexNo).
1358+
Offset(0).
1359+
Limit(1).
1360+
Iterator(tarantool.IterEq).
1361+
Key([]interface{}{"commit_key"})
1362+
resp, err = stream.Do(req)
1363+
require.Nilf(t, err, "failed to Select")
1364+
require.NotNilf(t, resp, "response is nil after Select")
1365+
require.Equalf(t, 1, len(resp.Data), "response Body len != 1 after Select")
1366+
1367+
tpl, ok := resp.Data[0].([]interface{})
1368+
require.Truef(t, ok, "unexpected body of Select")
1369+
require.Equalf(t, 2, len(tpl), "unexpected body of Select")
1370+
1371+
key, ok := tpl[0].(string)
1372+
require.Truef(t, ok, "unexpected body of Select (0)")
1373+
require.Equalf(t, "commit_key", key, "unexpected body of Select (0)")
1374+
1375+
value, ok := tpl[1].(string)
1376+
require.Truef(t, ok, "unexpected body of Select (1)")
1377+
require.Equalf(t, "commit_value", value, "unexpected body of Select (1)")
1378+
1379+
// Commit transaction
1380+
resp, err = stream.Commit()
1381+
require.Nilf(t, err, "failed to Commit")
1382+
require.NotNilf(t, resp, "response is nil after Commit")
1383+
require.Equalf(t, uint32(0), resp.Code, "failed to Commit: wrong code returned")
1384+
1385+
// Select outside of transaction
1386+
req = tarantool.NewSelectRequest(spaceNo).
1387+
Index(indexNo).
1388+
Offset(0).
1389+
Limit(1).
1390+
Iterator(tarantool.IterEq).
1391+
Key([]interface{}{"commit_key"})
1392+
resp, err = conn.Do(req)
1393+
require.Nilf(t, err, "failed to Select")
1394+
require.NotNilf(t, resp, "response is nil after Select")
1395+
require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select")
1396+
1397+
tpl, ok = resp.Data[0].([]interface{})
1398+
require.Truef(t, ok, "unexpected body of Select")
1399+
require.Equalf(t, 2, len(tpl), "unexpected body of Select")
1400+
1401+
key, ok = tpl[0].(string)
1402+
require.Truef(t, ok, "unexpected body of Select (0)")
1403+
require.Equalf(t, "commit_key", key, "unexpected body of Select (0)")
1404+
1405+
value, ok = tpl[1].(string)
1406+
require.Truef(t, ok, "unexpected body of Select (1)")
1407+
require.Equalf(t, "commit_value", value, "unexpected body of Select (1)")
1408+
}
1409+
1410+
func TestStream_Rollback(t *testing.T) {
1411+
var req tarantool.Request
1412+
var resp *tarantool.Response
1413+
1414+
// Tarantool supports streams and interactive transactions since version 2.10.0
1415+
isLess, err := test_helpers.IsTarantoolVersionLess(2, 10, 0)
1416+
if err != nil {
1417+
t.Fatal("Could not check the Tarantool version")
1418+
}
1419+
if isLess {
1420+
t.Skip()
1421+
}
1422+
1423+
roles := []bool{true, true, false, true, true}
1424+
1425+
err = test_helpers.SetClusterRO(servers, connOpts, roles)
1426+
require.Nilf(t, err, "fail to set roles for cluster")
1427+
1428+
connPool, err := connection_pool.Connect(servers, connOpts)
1429+
require.Nilf(t, err, "failed to connect")
1430+
require.NotNilf(t, connPool, "conn is nil after Connect")
1431+
defer connPool.Close()
1432+
1433+
stream, err := connPool.NewStream(connection_pool.PreferRW)
1434+
require.Nilf(t, err, "failed to create stream")
1435+
require.NotNilf(t, connPool, "stream is nil after NewStream")
1436+
1437+
// Begin transaction
1438+
resp, err = stream.Begin()
1439+
require.Nilf(t, err, "failed to Begin")
1440+
require.NotNilf(t, resp, "response is nil after Begin")
1441+
require.Equalf(t, uint32(0), resp.Code, "failed to Begin: wrong code returned")
1442+
1443+
// Insert in stream
1444+
req = tarantool.NewInsertRequest(spaceName).
1445+
Tuple([]interface{}{"rollback_key", "rollback_value"})
1446+
resp, err = stream.Do(req)
1447+
require.Nilf(t, err, "failed to Insert")
1448+
require.NotNilf(t, resp, "response is nil after Insert")
1449+
require.Equalf(t, uint32(0), resp.Code, "failed to Insert: wrong code returned")
1450+
1451+
// Connect to servers[2] to check if tuple
1452+
// was inserted on RW instance
1453+
conn, err := tarantool.Connect(servers[2], connOpts)
1454+
require.Nilf(t, err, "failed to connect %s", servers[2])
1455+
require.NotNilf(t, conn, "conn is nil after Connect")
1456+
1457+
// Select not related to the transaction
1458+
// while transaction is not commited
1459+
// result of select is empty
1460+
req = tarantool.NewSelectRequest(spaceNo).
1461+
Index(indexNo).
1462+
Offset(0).
1463+
Limit(1).
1464+
Iterator(tarantool.IterEq).
1465+
Key([]interface{}{"rollback_key"})
1466+
resp, err = conn.Do(req)
1467+
require.Nilf(t, err, "failed to Select")
1468+
require.NotNilf(t, resp, "response is nil after Select")
1469+
require.Equalf(t, 0, len(resp.Data), "response Data len != 0")
1470+
1471+
// Select in stream
1472+
req = tarantool.NewSelectRequest(spaceNo).
1473+
Index(indexNo).
1474+
Offset(0).
1475+
Limit(1).
1476+
Iterator(tarantool.IterEq).
1477+
Key([]interface{}{"rollback_key"})
1478+
resp, err = stream.Do(req)
1479+
require.Nilf(t, err, "failed to Select")
1480+
require.NotNilf(t, resp, "response is nil after Select")
1481+
require.Equalf(t, 1, len(resp.Data), "response Body len != 1 after Select")
1482+
1483+
tpl, ok := resp.Data[0].([]interface{})
1484+
require.Truef(t, ok, "unexpected body of Select")
1485+
require.Equalf(t, 2, len(tpl), "unexpected body of Select")
1486+
1487+
key, ok := tpl[0].(string)
1488+
require.Truef(t, ok, "unexpected body of Select (0)")
1489+
require.Equalf(t, "rollback_key", key, "unexpected body of Select (0)")
1490+
1491+
value, ok := tpl[1].(string)
1492+
require.Truef(t, ok, "unexpected body of Select (1)")
1493+
require.Equalf(t, "rollback_value", value, "unexpected body of Select (1)")
1494+
1495+
// Rollback transaction
1496+
resp, err = stream.Rollback()
1497+
require.Nilf(t, err, "failed to Rollback")
1498+
require.NotNilf(t, resp, "response is nil after Rollback")
1499+
require.Equalf(t, uint32(0), resp.Code, "failed to Rollback: wrong code returned")
1500+
1501+
// Select outside of transaction
1502+
req = tarantool.NewSelectRequest(spaceNo).
1503+
Index(indexNo).
1504+
Offset(0).
1505+
Limit(1).
1506+
Iterator(tarantool.IterEq).
1507+
Key([]interface{}{"rollback_key"})
1508+
resp, err = conn.Do(req)
1509+
require.Nilf(t, err, "failed to Select")
1510+
require.NotNilf(t, resp, "response is nil after Select")
1511+
require.Equalf(t, 0, len(resp.Data), "response Body len != 1 after Select")
1512+
}
1513+
12941514
// runTestMain is a body of TestMain function
12951515
// (see https://pkg.go.dev/testing#hdr-Main).
12961516
// Using defer + os.Exit is not works so TestMain body
@@ -1308,12 +1528,13 @@ func runTestMain(m *testing.M) int {
13081528
var err error
13091529

13101530
instances, err = test_helpers.StartTarantoolInstances(servers, workDirs, test_helpers.StartOpts{
1311-
InitScript: initScript,
1312-
User: connOpts.User,
1313-
Pass: connOpts.Pass,
1314-
WaitStart: waitStart,
1315-
ConnectRetry: connectRetry,
1316-
RetryTimeout: retryTimeout,
1531+
InitScript: initScript,
1532+
User: connOpts.User,
1533+
Pass: connOpts.Pass,
1534+
WaitStart: waitStart,
1535+
ConnectRetry: connectRetry,
1536+
RetryTimeout: retryTimeout,
1537+
MemtxUseMvccEngine: true,
13171538
})
13181539

13191540
if err != nil {

const.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@ const (
1212
UpsertRequestCode = 9
1313
Call17RequestCode = 10
1414
ExecuteRequestCode = 11
15+
BeginRequestCode = 14
16+
CommitRequestCode = 15
17+
RollbackRequestCode = 16
1518
PingRequestCode = 64
1619
SubscribeRequestCode = 66
1720

1821
KeyCode = 0x00
1922
KeySync = 0x01
23+
KeyStreamId = 0x0a
2024
KeySpaceNo = 0x10
2125
KeyIndexNo = 0x11
2226
KeyLimit = 0x12

0 commit comments

Comments
 (0)