Skip to content

Commit ec5519a

Browse files
committed
Use sharding keys to calculate bucket id (WIP)
CRUD allows to automatically calculate `bucket_id` based on primary key or one can specify `bucket_id` explicitly [1]. However it is often required to calculate `bucket_id` using sharding keys created by DDL schema. DDL module exposes space with sharding keys as a part of public API [2], so everyone is allowed to set and get sharding keys there without adding DDL module to dependencies. Patch allows to calculate `bucket_id` value automatically when sharding keys specified using DDL module or manually in `_ddl_sharding_key` space. 1. #46 2. https://github.com/tarantool/ddl#api 3. #46 (comment) Closes #166
1 parent df7138e commit ec5519a

File tree

6 files changed

+350
-8
lines changed

6 files changed

+350
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1616

1717
### Added
1818

19+
* Support calculating `bucket_id` based on `ddl.sharding_key`.
1920
* Added jsonpath indexes support for queries
2021
* `tuple-merger` module updated to 0.0.2
2122

crud/common/sharding.lua

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ local utils = require('crud.common.utils')
77

88
local sharding = {}
99

10+
local sharding_key_cache = nil
11+
1012
function sharding.key_get_bucket_id(key, specified_bucket_id)
1113
if specified_bucket_id ~= nil then
1214
return specified_bucket_id
@@ -20,7 +22,17 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
2022
return specified_bucket_id
2123
end
2224

23-
local key = utils.extract_key(tuple, space.index[0].parts)
25+
local primary_index = space.index[0]
26+
local key
27+
local sharding_key = get_ddl_sharding_key(space.name)
28+
if sharding_key ~= nil then
29+
key = utils.extract_sharding_key(space, tuple, sharding_key)
30+
end
31+
32+
if key == nil then
33+
key = utils.extract_key(tuple, primary_index.parts)
34+
end
35+
2436
return sharding.key_get_bucket_id(key)
2537
end
2638

@@ -51,4 +63,22 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_
5163
return bucket_id
5264
end
5365

66+
-- Get sharding key (actually field names) for all spaces in schema
67+
-- and cache it's value to speedup access.
68+
local function sharding.get_ddl_sharding_key(space_name)
69+
if box.space._ddl_sharding_key == nil then
70+
return nil
71+
end
72+
73+
if sharding_key_cache == nil then
74+
sharding_key_cache = box.space._ddl_sharding_key:select{}
75+
end
76+
77+
if space_name ~= nil then
78+
return sharding_key_cache[space_name]
79+
else
80+
return sharding_key_cache
81+
end
82+
end
83+
5484
return sharding

crud/common/utils.lua

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,18 @@ function utils.unflatten(tuple, space_format)
140140
return object
141141
end
142142

143+
function utils.extract_sharding_key(space, tuple, sharding_key)
144+
local key = {}
145+
for i, field in ipairs(space:format()) do
146+
for j, name in ipairs(sharding_key) do
147+
if name == field.name then
148+
key[i] = tuple[i]
149+
end
150+
end
151+
end
152+
return key
153+
end
154+
143155
function utils.extract_key(tuple, key_parts)
144156
local key = {}
145157
for i, part in ipairs(key_parts) do
@@ -375,6 +387,24 @@ function utils.get_bucket_id_fieldno(space, shard_index_name)
375387
return bucket_id_index.parts[1].fieldno
376388
end
377389

390+
-- NOTE: it's a more general version of get_bucket_id_fieldno()
391+
-- but it use space's format instead of index
392+
function utils.get_fieldno_by_name(space_format, field_name)
393+
field_name = field_name or 'bucket_id'
394+
local field_idx
395+
for i, field_format in ipairs(space_format) do
396+
if field_format.name == field_name then
397+
field_idx = i
398+
break
399+
end
400+
end
401+
if field_idx == nil then
402+
return nil, ShardingError:new('%q field is not found', field_name)
403+
end
404+
405+
return idx
406+
end
407+
378408
local uuid_t = ffi.typeof('struct tt_uuid')
379409
function utils.is_uuid(value)
380410
return ffi.istype(uuid_t, value)

crud/select/plan.lua

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ local errors = require('errors')
33
local compare_conditions = require('crud.compare.conditions')
44
local utils = require('crud.common.utils')
55
local dev_checks = require('crud.common.dev_checks')
6+
local sharding = require('crud.common.sharding')
67

78
local compat = require('crud.common.compat')
89
local has_keydef = compat.exists('tuple.keydef', 'key_def')
@@ -16,6 +17,7 @@ local select_plan = {}
1617

1718
local IndexTypeError = errors.new_class('IndexTypeError', {capture_stack = false})
1819
local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false})
20+
local DDLError = errors.new_class('DDLError', {capture_stack = false})
1921

2022
local function index_is_allowed(index)
2123
return index.type == 'TREE'
@@ -48,7 +50,7 @@ local function get_index_for_condition(space_indexes, space_format, condition)
4850
end
4951
end
5052

51-
local function extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
53+
local function extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index, space_format)
5254
if #scan_value < #sharding_index.parts then
5355
return nil
5456
end
@@ -62,6 +64,9 @@ local function extract_sharding_key_from_scan_value(scan_value, scan_index, shar
6264
scan_value_fields_values[scan_index_part.fieldno] = scan_value[i]
6365
end
6466

67+
-- getting sharding_key specified in DDL schema
68+
local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name)
69+
6570
-- check that sharding key is included in the scan index fields
6671
local sharding_key = {}
6772
for _, sharding_key_part in ipairs(sharding_index.parts) do
@@ -79,7 +84,20 @@ local function extract_sharding_key_from_scan_value(scan_value, scan_index, shar
7984
return nil
8085
end
8186

82-
table.insert(sharding_key, field_value)
87+
-- check if a field is a part of DDL sharding key
88+
local is_sharding_key_found = false
89+
if ddl_sharding_key ~= nil then
90+
for i, field_name in ipairs(ddl_sharding_key) do
91+
if fieldno == utils.get_fieldno_by_name(space_format, field_name) then
92+
is_sharding_key_found = true
93+
break
94+
end
95+
end
96+
end
97+
98+
if ddl_sharding_key == nil or is_sharding_key_found == true then
99+
table.insert(sharding_key, field_value)
100+
end
83101
end
84102

85103
return sharding_key
@@ -226,12 +244,10 @@ function select_plan.new(space, conditions, opts)
226244
end
227245
end
228246

229-
local sharding_index = primary_index -- XXX: only sharding by primary key is supported
230-
231-
-- get sharding key value
232-
local sharding_key
247+
-- getting sharding_key used in primary_index
248+
local sharding_index = primary_index
233249
if scan_value ~= nil and (scan_iter == box.index.EQ or scan_iter == box.index.REQ) then
234-
sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
250+
sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index, space_format)
235251
end
236252

237253
if sharding_key ~= nil and opts.force_map_call ~= true then

test/entrypoint/srv_ddl.lua

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#!/usr/bin/env tarantool
2+
3+
require('strict').on()
4+
_G.is_initialized = function() return false end
5+
6+
local log = require('log')
7+
local errors = require('errors')
8+
local cartridge = require('cartridge')
9+
local ddl = require('ddl')
10+
11+
package.preload['customers-storage'] = function()
12+
return {
13+
role_name = 'customers-storage',
14+
init = function()
15+
local engine = os.getenv('ENGINE') or 'memtx'
16+
local schema_customers = {
17+
is_local = true,
18+
engine = engine,
19+
temporary = false,
20+
sharding_key = {
21+
'name',
22+
'age',
23+
},
24+
format = {
25+
{name = 'id', type = 'unsigned', is_nullable = false},
26+
{name = 'bucket_id', type = 'unsigned', is_nullable = false},
27+
{name = 'name', type = 'string', is_nullable = false},
28+
{name = 'age', type = 'number', is_nullable = false},
29+
},
30+
indexes = {
31+
{
32+
name = 'primary',
33+
type = 'TREE',
34+
unique = true,
35+
parts = {
36+
{path = 'id', type = 'unsigned', is_nullable = false},
37+
},
38+
},
39+
{
40+
name = 'bucket_id',
41+
type = 'TREE',
42+
unique = false,
43+
parts = {
44+
{path = 'bucket_id', type = 'unsigned', is_nullable = false},
45+
},
46+
},
47+
}
48+
}
49+
local schema = {
50+
spaces = {
51+
['customers'] = schema_customers,
52+
},
53+
}
54+
if not box.cfg.read_only then
55+
local ok, err = ddl.set_schema(schema)
56+
if not ok then
57+
error(err)
58+
end
59+
end
60+
end,
61+
}
62+
end
63+
64+
local ok, err = errors.pcall('CartridgeCfgError', cartridge.cfg, {
65+
advertise_uri = 'localhost:3301',
66+
http_port = 8081,
67+
bucket_count = 3000,
68+
roles = {
69+
'customers-storage',
70+
'cartridge.roles.crud-router',
71+
'cartridge.roles.crud-storage',
72+
},
73+
})
74+
75+
if not ok then
76+
log.error('%s', err)
77+
os.exit(1)
78+
end
79+
80+
_G.is_initialized = cartridge.is_healthy

0 commit comments

Comments
 (0)