Skip to content

Commit f2caaf3

Browse files
committed
Implementation of batch upsert
Batch upsert is mostly used for operation with one bucket / one Tarantool node in a transaction. In this case batch upsert is more efficient then upserting tuple-by-tuple. Right now CRUD cannot provide batch upsert with full consistency. CRUD offers batch upsert with partial consistency. That means that full consistency can be provided only on single replicaset using `box` transactions. Part of #193
1 parent 43d35d8 commit f2caaf3

File tree

10 files changed

+2862
-7
lines changed

10 files changed

+2862
-7
lines changed

README.md

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ It also provides the `crud-storage` and `crud-router` roles for
2222
- [Delete](#delete)
2323
- [Replace](#replace)
2424
- [Upsert](#upsert)
25+
- [Upsert many](#upsert-many)
2526
- [Select](#select)
2627
- [Select conditions](#select-conditions)
2728
- [Pairs](#pairs)
@@ -524,6 +525,100 @@ crud.upsert_object('customers',
524525
...
525526
```
526527

528+
### Upsert many
529+
530+
```lua
531+
-- Upsert batch of tuples
532+
local result, err = crud.upsert_many(space_name, tuples, operations, opts)
533+
-- Upsert batch of objects
534+
local result, err = crud.upsert_object_many(space_name, objects, operations, opts)
535+
```
536+
537+
where:
538+
539+
* `space_name` (`string`) - name of the space to insert an object
540+
* `tuples` / `objects` (`table`) - array of tuples/objects to insert
541+
* `operations` (`table`) - update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/#box-space-update) if there is an existing tuple which matches the key fields of tuple
542+
* `opts`:
543+
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
544+
* `fields` (`?table`) - field names for getting only a subset of fields
545+
* `stop_on_error` (`?boolean`) - stop on a first error and report error
546+
regarding the failed operation and error about what tuples were not
547+
performed, default is `false`
548+
* `rollback_on_error` (`?boolean`) - any failed operation will lead to
549+
rollback on a storage, where the operation is failed, report error
550+
about what tuples were rollback, default is `false`
551+
552+
Returns metadata and array of empty arrays, array of errors.
553+
Error object can contain field `operation_data`.
554+
555+
This field can contain:
556+
* tuple for which the error occurred;
557+
* object with an incorrect format;
558+
* tuple the operation on which was performed but
559+
operation was rollback;
560+
* tuple the operation on which was not performed
561+
because operation was stopped by error.
562+
563+
Right now CRUD cannot provide batch upsert with full consistency.
564+
CRUD offers batch upsert with partial consistency. That means
565+
that full consistency can be provided only on single replicaset
566+
using `box` transactions.
567+
568+
**Example:**
569+
570+
```lua
571+
crud.upsert_many('customers', {
572+
{1, box.NULL, 'Elizabeth', 23},
573+
{2, box.NULL, 'Anastasia', 22},
574+
},{{'+', 'age', 1}, {'+', 'age', 2}})
575+
---
576+
- metadata:
577+
- {'name': 'id', 'type': 'unsigned'}
578+
- {'name': 'bucket_id', 'type': 'unsigned'}
579+
- {'name': 'name', 'type': 'string'}
580+
- {'name': 'age', 'type': 'number'}
581+
rows:
582+
- []
583+
- []
584+
...
585+
crud.upsert_object_many('customers', {
586+
{id = 3, name = 'Elizabeth', age = 24},
587+
{id = 10, name = 'Anastasia', age = 21},
588+
}, {{'+', 'age', 1}, {'+', 'age', 2}})
589+
---
590+
- metadata:
591+
- {'name': 'id', 'type': 'unsigned'}
592+
- {'name': 'bucket_id', 'type': 'unsigned'}
593+
- {'name': 'name', 'type': 'string'}
594+
- {'name': 'age', 'type': 'number'}
595+
rows:
596+
- []
597+
- []
598+
599+
-- Partial success
600+
local res, errs = crud.upsert_object_many('customers', {
601+
{id = 22, name = 'Alex', age = 34},
602+
{id = 3, name = 'Anastasia', age = 22},
603+
{id = 5, name = 'Sergey', age = 25},
604+
}, {{'+', 'age', 12}, {'=', 'age', 'invalid type'}, {'+', 'age', 10}})
605+
---
606+
res
607+
- metadata:
608+
- {'name': 'id', 'type': 'unsigned'}
609+
- {'name': 'bucket_id', 'type': 'unsigned'}
610+
- {'name': 'name', 'type': 'string'}
611+
- {'name': 'age', 'type': 'number'}
612+
rows:
613+
- [],
614+
- [],
615+
616+
#errs -- 1
617+
errs[1].class_name -- BatchUpsertError
618+
errs[1].err -- 'Tuple field 4 (age) type does not match one required by operation <...>'
619+
errs[1].tuple -- {3, 2804, 'Anastasia', 22}
620+
...
621+
```
527622

528623
### Select
529624

crud.lua

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ local replace = require('crud.replace')
99
local get = require('crud.get')
1010
local update = require('crud.update')
1111
local upsert = require('crud.upsert')
12+
local upsert_many = require('crud.upsert_many')
1213
local delete = require('crud.delete')
1314
local select = require('crud.select')
1415
local truncate = require('crud.truncate')
@@ -60,6 +61,14 @@ crud.update = stats.wrap(update.call, stats.op.UPDATE)
6061
-- @function upsert
6162
crud.upsert = stats.wrap(upsert.tuple, stats.op.UPSERT)
6263

64+
-- @refer upsert_many.tuples
65+
-- @function upsert_many
66+
crud.upsert_many = upsert_many.tuples
67+
68+
-- @refer upsert_many.objects
69+
-- @function upsert_object_many
70+
crud.upsert_object_many = upsert_many.objects
71+
6372
-- @refer upsert.object
6473
-- @function upsert
6574
crud.upsert_object = stats.wrap(upsert.object, stats.op.UPSERT)
@@ -138,6 +147,7 @@ function crud.init_storage()
138147
replace.init()
139148
update.init()
140149
upsert.init()
150+
upsert_many.init()
141151
delete.init()
142152
select.init()
143153
truncate.init()

crud/common/map_call_cases/batch_insert_iter.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ function BatchInsertIterator:get()
7070
local replicaset = self.next_index
7171
local func_args = {
7272
self.space_name,
73-
self.next_batch,
73+
self.next_batch.tuples,
7474
self.opts,
7575
}
7676

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
local errors = require('errors')
2+
3+
local dev_checks = require('crud.common.dev_checks')
4+
local sharding = require('crud.common.sharding')
5+
6+
local BaseIterator = require('crud.common.map_call_cases.base_iter')
7+
8+
local SplitTuplesError = errors.new_class('SplitTuplesError')
9+
10+
local BatchUpsertIterator = {}
11+
-- inheritance from BaseIterator
12+
setmetatable(BatchUpsertIterator, {__index = BaseIterator})
13+
14+
--- Create new batch upsert iterator for map call
15+
--
16+
-- @function new
17+
--
18+
-- @tparam[opt] table opts
19+
-- Options of BatchUpsertIterator:new
20+
-- @tparam[opt] table opts.tuples
21+
-- Tuples to be upserted
22+
-- @tparam[opt] table opts.space
23+
-- Space to be upserted into
24+
-- @tparam[opt] table opts.operations
25+
-- Operations to be performed on tuples
26+
-- @tparam[opt] table opts.execute_on_storage_opts
27+
-- Additional opts for call on storage
28+
--
29+
-- @return[1] table iterator
30+
-- @treturn[2] nil
31+
-- @treturn[2] table of tables Error description
32+
function BatchUpsertIterator:new(opts)
33+
dev_checks('table', {
34+
tuples = 'table',
35+
space = 'table',
36+
operations = 'table',
37+
execute_on_storage_opts = 'table',
38+
})
39+
40+
local sharding_data, err = sharding.split_tuples_by_replicaset(opts.tuples, opts.space, {
41+
operations = opts.operations,
42+
})
43+
if err ~= nil then
44+
return nil, SplitTuplesError:new("Failed to split tuples by replicaset: %s", err.err)
45+
end
46+
47+
local next_replicaset, next_batch = next(sharding_data.batches)
48+
49+
local execute_on_storage_opts = opts.execute_on_storage_opts
50+
execute_on_storage_opts.sharding_func_hash = sharding_data.sharding_func_hash
51+
execute_on_storage_opts.sharding_key_hash = sharding_data.sharding_key_hash
52+
execute_on_storage_opts.skip_sharding_hash_check = sharding_data.skip_sharding_hash_check
53+
54+
local iter = {
55+
space_name = opts.space.name,
56+
opts = execute_on_storage_opts,
57+
batches_by_replicasets = sharding_data.batches,
58+
next_index = next_replicaset,
59+
next_batch = next_batch,
60+
}
61+
62+
setmetatable(iter, self)
63+
self.__index = self
64+
65+
return iter
66+
end
67+
68+
--- Get function arguments and next replicaset
69+
--
70+
-- @function get
71+
--
72+
-- @return[1] table func_args
73+
-- @return[2] table replicaset
74+
function BatchUpsertIterator:get()
75+
local replicaset = self.next_index
76+
local func_args = {
77+
self.space_name,
78+
self.next_batch.tuples,
79+
self.next_batch.operations,
80+
self.opts,
81+
}
82+
83+
self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index)
84+
85+
return func_args, replicaset
86+
end
87+
88+
return BatchUpsertIterator

crud/common/sharding/init.lua

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,12 @@ end
209209
-- @return[1] batches
210210
-- Map where key is a replicaset and value
211211
-- is table of tuples related to this replicaset
212-
function sharding.split_tuples_by_replicaset(tuples, space)
213-
dev_checks('table', 'table')
212+
function sharding.split_tuples_by_replicaset(tuples, space, opts)
213+
dev_checks('table', 'table', {
214+
operations = '?table',
215+
})
216+
217+
opts = opts or {}
214218

215219
local batches = {}
216220

@@ -219,7 +223,7 @@ function sharding.split_tuples_by_replicaset(tuples, space)
219223
local skip_sharding_hash_check
220224
local sharding_data
221225
local err
222-
for _, tuple in ipairs(tuples) do
226+
for i, tuple in ipairs(tuples) do
223227
sharding_data, err = sharding.tuple_set_and_return_bucket_id(tuple, space)
224228
if err ~= nil then
225229
return nil, BucketIDError:new("Failed to get bucket ID: %s", err)
@@ -244,9 +248,15 @@ function sharding.split_tuples_by_replicaset(tuples, space)
244248
sharding_data.bucket_id, err.err)
245249
end
246250

247-
local tuples_by_replicaset = batches[replicaset] or {}
248-
table.insert(tuples_by_replicaset, tuple)
249-
batches[replicaset] = tuples_by_replicaset
251+
local record_by_replicaset = batches[replicaset] or {tuples = {}}
252+
table.insert(record_by_replicaset.tuples, tuple)
253+
254+
if opts.operations ~= nil then
255+
record_by_replicaset.operations = record_by_replicaset.operations or {}
256+
table.insert(record_by_replicaset.operations, opts.operations[i])
257+
end
258+
259+
batches[replicaset] = record_by_replicaset
250260
end
251261

252262
return {

0 commit comments

Comments
 (0)