Skip to content

Commit f9ca802

Browse files
EvgenyMekhanikalyapunov
authored andcommitted
net.box: add interactive transaction support in net.box
Implement `begin`, `commit` and `rollback` methods for stream object in `net.box`, which allows to begin, commit and rollback transaction accordingly. Closes #5860 @TarantoolBot document Title: add interactive transaction support in net.box Implement `begin`, `commit` and `rollback` methods for stream object in `net.box`, which allows to begin, commit and rollback transaction accordingly. Now there are multiple ways to begin, commit and rollback transaction from `net.box`: using appropriate stream methods, using 'call` or 'eval' methods or using `execute` method with sql transaction syntax. User can mix these methods, for example, start transaction using `stream:begin()`, and commit transaction using `stream:call('box.commit')` or stream:execute('COMMIT'). Simple example of using interactive transactions via iproto from net.box: ```lua stream = conn:new_stream() space = stream.space.test space_not_from_stream = conn.space.test stream:begin() space:replace({1}) -- return previously inserted tuple, because request -- belongs to transaction. space:select({}) -- empty select, because select doesn't belongs to -- transaction space_not_from_stream:select({}) stream:call('box.commit') -- now transaction was commited, so all requests -- returns tuple. ``` Different examples of using streams you can find in gh-5860-implement-streams-in-iproto.test.lua
1 parent 48c8dc1 commit f9ca802

6 files changed

+4356
-3
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
## feature/core
2+
3+
* Streams and interactive transactions over streams are implemented
4+
in iproto. Stream is associated with it's ID, which is unique within
5+
one connection. All requests with same not zero stream ID belongs to
6+
the same stream. All requests in stream processed synchronously. The
7+
execution of the next request will not start until the previous one is
8+
completed. If request has zero stream ID it does not belong to stream
9+
and is processed in the old way.
10+
In `net.box`, stream is an object above connection that has the same
11+
methods, but allows to execute requests sequentially. ID is generated
12+
on the client side automatically. If user writes his own connector and
13+
wants to use streams, he must transmit stream_id over iproto protocol.
14+
The main purpose of streams is transactions via iproto. Each stream
15+
can start its own transaction, so they allows multiplexing several
16+
transactions over one connection. There are multiple ways to begin,
17+
commit and rollback transaction: using appropriate stream methods, using
18+
`call` or `eval` methods or using `execute` method with sql transaction
19+
syntax. User can mix these methods, for example, start transaction using
20+
`stream:begin()`, and commit transaction using `stream:call('box.commit')`
21+
or stream:execute('COMMIT').
22+
If any request fails during the transaction, it will not affect the other
23+
requests in the transaction. If disconnect occurs when there is some active
24+
transaction in stream, this transaction will be rollbacked, if it does not
25+
have time to commit before this moment.
26+

src/box/lua/net_box.c

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,10 @@ enum netbox_method {
7575
NETBOX_MIN = 14,
7676
NETBOX_MAX = 15,
7777
NETBOX_COUNT = 16,
78-
NETBOX_INJECT = 17,
78+
NETBOX_BEGIN = 17,
79+
NETBOX_COMMIT = 18,
80+
NETBOX_ROLLBACK = 19,
81+
NETBOX_INJECT = 20,
7982
netbox_method_MAX
8083
};
8184

@@ -916,6 +919,44 @@ netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream,
916919
netbox_encode_prepare(L, idx, stream, sync, stream_id);
917920
}
918921

922+
static inline void
923+
netbox_encode_txn(lua_State *L, enum iproto_type type, int idx,
924+
struct mpstream *stream, uint64_t sync,
925+
uint64_t stream_id)
926+
{
927+
(void)L;
928+
(void) idx;
929+
assert(type == IPROTO_BEGIN ||
930+
type == IPROTO_COMMIT ||
931+
type == IPROTO_ROLLBACK);
932+
size_t svp = netbox_begin_encode(stream, sync, type, stream_id);
933+
netbox_end_encode(stream, svp);
934+
}
935+
936+
static void
937+
netbox_encode_begin(struct lua_State *L, int idx, struct mpstream *stream,
938+
uint64_t sync, uint64_t stream_id)
939+
{
940+
return netbox_encode_txn(L, IPROTO_BEGIN, idx, stream,
941+
sync, stream_id);
942+
}
943+
944+
static void
945+
netbox_encode_commit(struct lua_State *L, int idx, struct mpstream *stream,
946+
uint64_t sync, uint64_t stream_id)
947+
{
948+
return netbox_encode_txn(L, IPROTO_COMMIT, idx, stream,
949+
sync, stream_id);
950+
}
951+
952+
static void
953+
netbox_encode_rollback(struct lua_State *L, int idx, struct mpstream *stream,
954+
uint64_t sync, uint64_t stream_id)
955+
{
956+
return netbox_encode_txn(L, IPROTO_ROLLBACK, idx, stream,
957+
sync, stream_id);
958+
}
959+
919960
static void
920961
netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
921962
uint64_t sync, uint64_t stream_id)
@@ -959,6 +1000,9 @@ netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method,
9591000
[NETBOX_MIN] = netbox_encode_select,
9601001
[NETBOX_MAX] = netbox_encode_select,
9611002
[NETBOX_COUNT] = netbox_encode_call,
1003+
[NETBOX_BEGIN] = netbox_encode_begin,
1004+
[NETBOX_COMMIT] = netbox_encode_commit,
1005+
[NETBOX_ROLLBACK] = netbox_encode_rollback,
9621006
[NETBOX_INJECT] = netbox_encode_inject,
9631007
};
9641008
struct mpstream stream;
@@ -1330,6 +1374,9 @@ netbox_decode_method(struct lua_State *L, enum netbox_method method,
13301374
[NETBOX_MIN] = netbox_decode_tuple,
13311375
[NETBOX_MAX] = netbox_decode_tuple,
13321376
[NETBOX_COUNT] = netbox_decode_value,
1377+
[NETBOX_BEGIN] = netbox_decode_nil,
1378+
[NETBOX_COMMIT] = netbox_decode_nil,
1379+
[NETBOX_ROLLBACK] = netbox_decode_nil,
13331380
[NETBOX_INJECT] = netbox_decode_table,
13341381
};
13351382
method_decoder[method](L, data, data_end, format);

src/box/lua/net_box.lua

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,11 @@ local M_GET = 13
5151
local M_MIN = 14
5252
local M_MAX = 15
5353
local M_COUNT = 16
54+
local M_BEGIN = 17
55+
local M_COMMIT = 18
56+
local M_ROLLBACK = 19
5457
-- Injects raw data into connection. Used by console and tests.
55-
local M_INJECT = 17
58+
local M_INJECT = 20
5659

5760
-- utility tables
5861
local is_final_state = {closed = 1, error = 1}
@@ -754,11 +757,38 @@ local function stream_new_stream(stream)
754757
return stream._conn:new_stream()
755758
end
756759

760+
local function stream_begin(stream, opts)
761+
check_remote_arg(stream, 'begin')
762+
local res = stream:_request(M_BEGIN, opts, nil, stream._stream_id)
763+
if opts and opts.is_async then
764+
return res
765+
end
766+
end
767+
768+
local function stream_commit(stream, opts)
769+
check_remote_arg(stream, 'commit')
770+
local res = stream:_request(M_COMMIT, opts, nil, stream._stream_id)
771+
if opts and opts.is_async then
772+
return res
773+
end
774+
end
775+
776+
local function stream_rollback(stream, opts)
777+
check_remote_arg(stream, 'rollback')
778+
local res = stream:_request(M_ROLLBACK, opts, nil, stream._stream_id)
779+
if opts and opts.is_async then
780+
return res
781+
end
782+
end
783+
757784
function remote_methods:new_stream()
758785
check_remote_arg(self, 'new_stream')
759786
self._last_stream_id = self._last_stream_id + 1
760787
local stream = setmetatable({
761788
new_stream = stream_new_stream,
789+
begin = stream_begin,
790+
commit = stream_commit,
791+
rollback = stream_rollback,
762792
_stream_id = self._last_stream_id,
763793
space = setmetatable({
764794
_stream_space_cache = {},
@@ -1243,6 +1273,9 @@ local this_module = {
12431273
min = M_MIN,
12441274
max = M_MAX,
12451275
count = M_COUNT,
1276+
begin = M_BEGIN,
1277+
commit = M_COMMIT,
1278+
rollback = M_ROLLBACK,
12461279
inject = M_INJECT,
12471280
}
12481281
}

0 commit comments

Comments
 (0)