Skip to content

Commit e31b97e

Browse files
committed
Use sharding keys to calculate bucket id (WIP)
CRUD allows to automatically calculate `bucket_id` based on primary key or one can specify `bucket_id` explicitly [1]. However it is often required to calculate `bucket_id` using sharding keys created by DDL schema. DDL module exposes space with sharding keys as a part of public API [2], so everyone is allowed to set and get sharding keys there without adding DDL module to dependencies. Patch allows to calculate `bucket_id` value automatically when sharding keys used in a sharded space. 1. #46 2. https://github.com/tarantool/ddl#api 3. #46 (comment) Closes #166
1 parent 416d043 commit e31b97e

File tree

6 files changed

+328
-5
lines changed

6 files changed

+328
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1616

1717
### Added
1818

19+
* Support calculating `bucket_id` based on `ddl.sharding_key`.
1920
* Added jsonpath indexes support for queries
2021
* `tuple-merger` module updated to 0.0.2
2122

crud/common/sharding.lua

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ local utils = require('crud.common.utils')
77

88
local sharding = {}
99

10+
local sharding_key = nil
11+
1012
function sharding.key_get_bucket_id(key, specified_bucket_id)
1113
if specified_bucket_id ~= nil then
1214
return specified_bucket_id
@@ -20,7 +22,18 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
2022
return specified_bucket_id
2123
end
2224

23-
local key = utils.extract_key(tuple, space.index[0].parts)
25+
local primary_index = space.index[0]
26+
local key
27+
local sharding_key = get_sharding_key()
28+
if sharding_key ~= nil then
29+
sharding_key = sharding_key[space_name]
30+
key = utils.extract_sharding_key(space, tuple, sharding_key)
31+
end
32+
33+
if key == nil then
34+
key = utils.extract_key(tuple, primary_index.parts)
35+
end
36+
2437
return sharding.key_get_bucket_id(key)
2538
end
2639

@@ -51,4 +64,18 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_
5164
return bucket_id
5265
end
5366

67+
-- Get sharding key for all spaces and
68+
-- cache it's value to speedup access.
69+
local function get_sharding_key()
70+
if box.space._ddl_sharding_key == nil then
71+
return nil
72+
end
73+
74+
if sharding_key == nil then
75+
sharding_key = box.space._ddl_sharding_key:select{}
76+
end
77+
78+
return sharding_key
79+
end
80+
5481
return sharding

crud/common/utils.lua

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,18 @@ function utils.unflatten(tuple, space_format)
140140
return object
141141
end
142142

143+
function utils.extract_sharding_key(space, tuple, sharding_key)
144+
local key = {}
145+
for i, field in ipairs(space:format()) do
146+
for j, name in ipairs(sharding_key) do
147+
if name == field.name then
148+
key[i] = tuple[i]
149+
end
150+
end
151+
end
152+
return key
153+
end
154+
143155
function utils.extract_key(tuple, key_parts)
144156
local key = {}
145157
for i, part in ipairs(key_parts) do

crud/select/plan.lua

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@ local errors = require('errors')
33
local compare_conditions = require('crud.compare.conditions')
44
local utils = require('crud.common.utils')
55
local dev_checks = require('crud.common.dev_checks')
6+
-- ddl module is optional
7+
local ok, ddl = pcall(require, 'ddl')
8+
if ok == false then
9+
ddl = nil
10+
end
611

712
local compat = require('crud.common.compat')
813
local has_keydef = compat.exists('tuple.keydef', 'key_def')
@@ -16,6 +21,7 @@ local select_plan = {}
1621

1722
local IndexTypeError = errors.new_class('IndexTypeError', {capture_stack = false})
1823
local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false})
24+
local DDLError = errors.new_class('DDLError', {capture_stack = false})
1925

2026
local function index_is_allowed(index)
2127
return index.type == 'TREE'
@@ -226,12 +232,24 @@ function select_plan.new(space, conditions, opts)
226232
end
227233
end
228234

229-
local sharding_index = primary_index -- XXX: only sharding by primary key is supported
235+
local sharding_index = primary_index
230236

231-
-- get sharding key value
237+
-- getting sharding_key specified in DDL schema
232238
local sharding_key
233-
if scan_value ~= nil and (scan_iter == box.index.EQ or scan_iter == box.index.REQ) then
234-
sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
239+
if ddl ~= nil then
240+
local schema, err = ddl.get_schema(space_name)
241+
if schema == nil then
242+
return nil, DDLError:new('Failed to obtain a DDL schema')
243+
end
244+
sharding_key = schema.sharding_key
245+
end
246+
247+
-- getting sharding_key used in primary_index
248+
if sharding_key == nil then
249+
local sharding_index = primary_index
250+
if scan_value ~= nil and (scan_iter == box.index.EQ or scan_iter == box.index.REQ) then
251+
sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
252+
end
235253
end
236254

237255
if sharding_key ~= nil and opts.force_map_call ~= true then

test/entrypoint/srv_ddl.lua

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#!/usr/bin/env tarantool
2+
3+
require('strict').on()
4+
_G.is_initialized = function() return false end
5+
6+
local log = require('log')
7+
local errors = require('errors')
8+
local cartridge = require('cartridge')
9+
local ddl = require('ddl')
10+
11+
package.preload['customers-storage'] = function()
12+
return {
13+
role_name = 'customers-storage',
14+
init = function()
15+
local engine = os.getenv('ENGINE') or 'memtx'
16+
local schema_customers = {
17+
is_local = true,
18+
engine = engine,
19+
temporary = false,
20+
sharding_key = {
21+
'name',
22+
'age',
23+
},
24+
format = {
25+
{name = 'id', type = 'unsigned', is_nullable = false},
26+
{name = 'bucket_id', type = 'unsigned', is_nullable = false},
27+
{name = 'name', type = 'string', is_nullable = false},
28+
{name = 'age', type = 'number', is_nullable = false},
29+
},
30+
indexes = {
31+
{
32+
name = 'primary',
33+
type = 'TREE',
34+
unique = true,
35+
parts = {
36+
{path = 'id', type = 'unsigned', is_nullable = false},
37+
},
38+
},
39+
{
40+
name = 'bucket_id',
41+
type = 'TREE',
42+
unique = false,
43+
parts = {
44+
{path = 'bucket_id', type = 'unsigned', is_nullable = false},
45+
},
46+
},
47+
}
48+
}
49+
local schema = {
50+
spaces = {
51+
['customers'] = schema_customers,
52+
},
53+
}
54+
if not box.cfg.read_only then
55+
local ok, err = ddl.set_schema(schema)
56+
if not ok then
57+
error(err)
58+
end
59+
end
60+
end,
61+
}
62+
end
63+
64+
local ok, err = errors.pcall('CartridgeCfgError', cartridge.cfg, {
65+
advertise_uri = 'localhost:3301',
66+
http_port = 8081,
67+
bucket_count = 3000,
68+
roles = {
69+
'customers-storage',
70+
'cartridge.roles.crud-router',
71+
'cartridge.roles.crud-storage',
72+
},
73+
})
74+
75+
if not ok then
76+
log.error('%s', err)
77+
os.exit(1)
78+
end
79+
80+
_G.is_initialized = cartridge.is_healthy
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
#!/usr/bin/env tarantool
2+
3+
local fio = require('fio')
4+
local t = require('luatest')
5+
local crud = require('crud')
6+
7+
local helpers = require('test.helper')
8+
9+
local ok, ddl = pcall(require, 'ddl')
10+
if not ok then
11+
t.skip('Please, install ddl rock to run tests')
12+
end
13+
14+
local pgroup = helpers.pgroup.new('ddl_sharding_key', {
15+
engine = {'memtx', 'vinyl'},
16+
})
17+
18+
pgroup:set_before_all(function(g)
19+
g.cluster = helpers.Cluster:new({
20+
datadir = fio.tempdir(),
21+
server_command = helpers.entrypoint('srv_ddl'),
22+
use_vshard = true,
23+
replicasets = helpers.get_test_replicasets(),
24+
env = {
25+
['ENGINE'] = g.params.engine,
26+
},
27+
})
28+
g.cluster:start()
29+
local result, err = g.cluster.main_server.net_box:eval([[
30+
local ddl = require('ddl')
31+
32+
local ok, err = ddl.get_schema()
33+
return ok, err
34+
]])
35+
t.assert_equals(type(result), 'table')
36+
t.assert_equals(err, nil)
37+
end)
38+
39+
pgroup:set_after_all(function(g) helpers.stop_cluster(g.cluster) end)
40+
41+
pgroup:set_before_each(function(g)
42+
helpers.truncate_space_on_cluster(g.cluster, 'customers')
43+
end)
44+
45+
pgroup:add('test_key', function(g)
46+
local result, err = g.cluster.main_server.net_box:call('crud.insert_object',
47+
{'customers', {id = 33, name = 'Mayakovsky', age = 36}})
48+
t.assert_equals(err, nil)
49+
end)
50+
51+
pgroup:add('test_select', function(g)
52+
t.skip('not implemented')
53+
end)
54+
55+
pgroup:add('test_insert_get', function(g)
56+
-- insert
57+
local result, err = g.cluster.main_server.net_box:call('crud.insert',
58+
{'customers', {2, box.NULL, 'Ivan', 20}})
59+
60+
t.assert_equals(err, nil)
61+
t.assert_equals(result.metadata, {
62+
{name = 'id', type = 'unsigned', is_nullable = false},
63+
{name = 'bucket_id', type = 'unsigned', is_nullable = false},
64+
{name = 'name', type = 'string', is_nullable = false},
65+
{name = 'age', type = 'number', is_nullable = false},
66+
})
67+
t.assert_equals(result.rows, {{2, 401, 'Ivan', 20}})
68+
69+
-- get
70+
local result, err = g.cluster.main_server.net_box:call('crud.get', {'customers', 2})
71+
72+
t.assert_equals(err, nil)
73+
t.assert(result ~= nil)
74+
t.assert_equals(result.rows, {{2, 401, 'Ivan', 20}})
75+
end)
76+
77+
pgroup:add('test_update', function(g)
78+
-- insert tuple
79+
local result, err = g.cluster.main_server.net_box:call(
80+
'crud.insert_object', {'customers', {id = 22, name = 'Leo', age = 72}})
81+
82+
t.assert_equals(err, nil)
83+
t.assert_equals(result.metadata, {
84+
{name = 'id', type = 'unsigned', is_nullable = false},
85+
{name = 'bucket_id', type = 'unsigned', is_nullable = false},
86+
{name = 'name', type = 'string', is_nullable = false},
87+
{name = 'age', type = 'number', is_nullable = false},
88+
})
89+
local objects = crud.unflatten_rows(result.rows, result.metadata)
90+
t.assert_equals(objects, {{id = 22, name = 'Leo', age = 72, bucket_id = 655}})
91+
92+
-- update age and name fields
93+
local result, err = g.cluster.main_server.net_box:call('crud.update', {'customers', 22, {
94+
{'+', 'age', 10},
95+
{'=', 'name', 'Leo Tolstoy'},
96+
}})
97+
98+
t.assert_equals(err, nil)
99+
local objects = crud.unflatten_rows(result.rows, result.metadata)
100+
t.assert_equals(objects, {{id = 22, name = 'Leo Tolstoy', age = 82, bucket_id = 655}})
101+
102+
-- get
103+
local result, err = g.cluster.main_server.net_box:call('crud.get', {'customers', 22})
104+
105+
t.assert_equals(err, nil)
106+
local objects = crud.unflatten_rows(result.rows, result.metadata)
107+
t.assert_equals(objects, {{id = 22, name = 'Leo Tolstoy', age = 82, bucket_id = 655}})
108+
end)
109+
110+
pgroup:add('test_delete', function(g)
111+
-- insert tuple
112+
local result, err = g.cluster.main_server.net_box:call(
113+
'crud.insert_object', {'customers', {id = 33, name = 'Mayakovsky', age = 36}})
114+
115+
t.assert_equals(err, nil)
116+
t.assert_equals(result.metadata, {
117+
{name = 'id', type = 'unsigned', is_nullable = false},
118+
{name = 'bucket_id', type = 'unsigned', is_nullable = false},
119+
{name = 'name', type = 'string', is_nullable = false},
120+
{name = 'age', type = 'number', is_nullable = false},
121+
})
122+
local objects = crud.unflatten_rows(result.rows, result.metadata)
123+
t.assert_equals(objects, {{id = 33, name = 'Mayakovsky', age = 36, bucket_id = 907}})
124+
125+
-- delete
126+
local result, err = g.cluster.main_server.net_box:call('crud.delete', {'customers', 33})
127+
128+
t.assert_equals(err, nil)
129+
if g.params.engine == 'memtx' then
130+
local objects = crud.unflatten_rows(result.rows, result.metadata)
131+
t.assert_equals(objects, {{id = 33, name = 'Mayakovsky', age = 36, bucket_id = 907}})
132+
else
133+
t.assert_equals(#result.rows, 0)
134+
end
135+
136+
-- get
137+
local result, err = g.cluster.main_server.net_box:call('crud.get', {'customers', 33})
138+
139+
t.assert_equals(err, nil)
140+
t.assert_equals(#result.rows, 0)
141+
end)
142+
143+
pgroup:add('test_replace', function(g)
144+
local result, err = g.cluster.main_server.net_box:call(
145+
'crud.replace', {'customers', {45, box.NULL, 'John Fedor', 99}})
146+
147+
t.assert_equals(err, nil)
148+
t.assert_equals(result.metadata, {
149+
{name = 'id', type = 'unsigned', is_nullable = false},
150+
{name = 'bucket_id', type = 'unsigned', is_nullable = false},
151+
{name = 'name', type = 'string', is_nullable = false},
152+
{name = 'age', type = 'number', is_nullable = false},
153+
})
154+
t.assert_equals(result.rows, {{45, 392, 'John Fedor', 99}})
155+
156+
-- replace again
157+
local result, err = g.cluster.main_server.net_box:call(
158+
'crud.replace', {'customers', {45, box.NULL, 'John Fedor', 100}})
159+
160+
t.assert_equals(err, nil)
161+
t.assert_equals(result.rows, {{45, 392, 'John Fedor', 100}})
162+
end)
163+
164+
pgroup:add('test_upsert', function(g)
165+
-- upsert tuple first time
166+
local result, err = g.cluster.main_server.net_box:call('crud.upsert',
167+
{'customers', {67, box.NULL, 'Saltykov-Shchedrin', 63}, {
168+
{'=', 'name', 'Mikhail Saltykov-Shchedrin'},
169+
}})
170+
171+
t.assert_equals(#result.rows, 0)
172+
t.assert_equals(result.metadata, {
173+
{name = 'id', type = 'unsigned', is_nullable = false},
174+
{name = 'bucket_id', type = 'unsigned', is_nullable = false},
175+
{name = 'name', type = 'string', is_nullable = false},
176+
{name = 'age', type = 'number', is_nullable = false},
177+
})
178+
t.assert_equals(err, nil)
179+
180+
-- get
181+
local result, err = g.cluster.main_server.net_box:call('crud.get', {'customers', 67})
182+
183+
t.assert_equals(err, nil)
184+
t.assert_equals(result.rows, {{67, 1143, 'Saltykov-Shchedrin', 63}})
185+
end)

0 commit comments

Comments
 (0)