Skip to content

Commit 4b3482a

Browse files
committed
Support custom sharding key in replace, insert, upsert
Part of #166 Reviewed-by: Alexander Turenko <[email protected]> Reviewed-by: Oleg Babin <[email protected]>
1 parent fc7bc57 commit 4b3482a

File tree

9 files changed

+647
-4
lines changed

9 files changed

+647
-4
lines changed

crud.lua

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ local select = require('crud.select')
1212
local truncate = require('crud.truncate')
1313
local len = require('crud.len')
1414
local borders = require('crud.borders')
15+
local sharding_key = require('crud.common.sharding_key')
1516
local utils = require('crud.common.utils')
1617

1718
local crud = {}
@@ -113,6 +114,7 @@ function crud.init_storage()
113114
truncate.init()
114115
len.init()
115116
borders.init()
117+
sharding_key.init()
116118
end
117119

118120
function crud.init_router()

crud/common/const.lua

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ local const = {}
22

33
const.RELOAD_RETRIES_NUM = 1
44
const.RELOAD_SCHEMA_TIMEOUT = 3 -- 3 seconds
5+
const.FETCH_SHARDING_KEY_TIMEOUT = 3 -- 3 seconds
56

67
return const

crud/common/sharding.lua

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ local errors = require('errors')
44
local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
55

66
local utils = require('crud.common.utils')
7+
local sharding_key_module = require('crud.common.sharding_key')
78

89
local sharding = {}
910

@@ -20,7 +21,16 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
2021
return specified_bucket_id
2122
end
2223

23-
local key = utils.extract_key(tuple, space.index[0].parts)
24+
local sharding_index_parts = space.index[0].parts
25+
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space.name)
26+
if err ~= nil then
27+
return nil, err
28+
end
29+
if sharding_key_as_index_obj ~= nil then
30+
sharding_index_parts = sharding_key_as_index_obj.parts
31+
end
32+
local key = utils.extract_key(tuple, sharding_index_parts)
33+
2434
return sharding.key_get_bucket_id(key)
2535
end
2636

@@ -43,11 +53,15 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_
4353
end
4454
end
4555

46-
if tuple[bucket_id_fieldno] == nil then
47-
tuple[bucket_id_fieldno] = sharding.tuple_get_bucket_id(tuple, space)
56+
local bucket_id = tuple[bucket_id_fieldno]
57+
if bucket_id == nil then
58+
bucket_id, err = sharding.tuple_get_bucket_id(tuple, space)
59+
if err ~= nil then
60+
return nil, err
61+
end
62+
tuple[bucket_id_fieldno] = bucket_id
4863
end
4964

50-
local bucket_id = tuple[bucket_id_fieldno]
5165
return bucket_id
5266
end
5367

crud/common/sharding_key.lua

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
local fiber = require('fiber')
2+
local errors = require('errors')
3+
4+
local call = require('crud.common.call')
5+
local const = require('crud.common.const')
6+
local dev_checks = require('crud.common.dev_checks')
7+
local cache = require('crud.common.sharding_key_cache')
8+
local utils = require('crud.common.utils')
9+
10+
local FetchShardingKeyError = errors.new_class('FetchShardingKeyError', {capture_stack = false})
11+
local WrongShardingConfigurationError = errors.new_class('WrongShardingConfigurationError', {capture_stack = false})
12+
13+
local FETCH_FUNC_NAME = '_crud.fetch_on_storage'
14+
15+
local sharding_key_module = {}
16+
17+
-- Function decorator that is used to prevent _fetch_on_router() from being
18+
-- called concurrently by different fibers.
19+
local function locked(f)
20+
dev_checks('function')
21+
22+
return function(timeout, ...)
23+
local timeout_deadline = fiber.clock() + timeout
24+
local ok = cache.fetch_lock:put(true, timeout)
25+
-- channel:put() returns false in two cases: when timeout is exceeded
26+
-- or channel has been closed. However error message describes only
27+
-- first reason, I'm not sure we need to disclose to users such details
28+
-- like problems with synchronization objects.
29+
if not ok then
30+
return FetchShardingKeyError:new(
31+
"Timeout for fetching sharding key is exceeded")
32+
end
33+
local timeout = timeout_deadline - fiber.clock()
34+
local status, err = pcall(f, timeout, ...)
35+
cache.fetch_lock:get()
36+
if not status or err ~= nil then
37+
return err
38+
end
39+
end
40+
end
41+
42+
-- Build a structure similar to index, but it is not a real index object,
43+
-- it contains only parts key with fieldno's.
44+
local function as_index_object(space_name, space_format, sharding_key_def)
45+
dev_checks('string', 'table', 'table')
46+
47+
local fieldnos = {}
48+
local fieldno_map = utils.get_format_fieldno_map(space_format)
49+
for _, field_name in ipairs(sharding_key_def) do
50+
local fieldno = fieldno_map[field_name]
51+
if fieldno == nil then
52+
return nil, WrongShardingConfigurationError:new(
53+
"No such field (%s) in a space format (%s)", field_name, space_name)
54+
end
55+
table.insert(fieldnos, {fieldno = fieldno})
56+
end
57+
58+
return {parts = fieldnos}
59+
end
60+
61+
-- Return a map with metadata or nil when space box.space._ddl_sharding_key is
62+
-- not available on storage.
63+
function sharding_key_module.fetch_on_storage()
64+
local sharding_key_space = box.space._ddl_sharding_key
65+
if sharding_key_space == nil then
66+
return nil
67+
end
68+
69+
local SPACE_NAME_FIELDNO = 1
70+
local SPACE_SHARDING_KEY_FIELDNO = 2
71+
local metadata_map = {}
72+
for _, tuple in sharding_key_space:pairs() do
73+
local space_name = tuple[SPACE_NAME_FIELDNO]
74+
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
75+
local space_format = box.space[space_name]:format()
76+
metadata_map[space_name] = {
77+
sharding_key_def = sharding_key_def,
78+
space_format = space_format,
79+
}
80+
end
81+
82+
return metadata_map
83+
end
84+
85+
-- Under high load we may get a case when more than one fiber will fetch
86+
-- metadata from storages. It is not good from performance point of view.
87+
-- locked() wraps a _fetch_on_router() to limit a number of fibers that fetches
88+
-- a sharding metadata by a single one, other fibers will wait while
89+
-- cache.fetch_lock become unlocked during timeout passed to
90+
-- _fetch_on_router().
91+
local _fetch_on_router = locked(function(timeout)
92+
dev_checks('number')
93+
94+
if cache.sharding_key_as_index_obj_map ~= nil then
95+
return
96+
end
97+
98+
local metadata_map, err = call.any(FETCH_FUNC_NAME, {}, {
99+
timeout = timeout
100+
})
101+
if err ~= nil then
102+
return err
103+
end
104+
if metadata_map == nil then
105+
cache.sharding_key_as_index_obj_map = {}
106+
return
107+
end
108+
109+
cache.sharding_key_as_index_obj_map = {}
110+
for space_name, metadata in pairs(metadata_map) do
111+
local sharding_key_as_index_obj, err = as_index_object(space_name,
112+
metadata.space_format,
113+
metadata.sharding_key_def)
114+
if err ~= nil then
115+
return err
116+
end
117+
cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj
118+
end
119+
end)
120+
121+
-- Get sharding index for a certain space.
122+
--
123+
-- Return:
124+
-- - sharding key as index object, when sharding key definition found on
125+
-- storage.
126+
-- - nil, when sharding key definition was not found on storage. Pay attention
127+
-- that nil without error is a successfull return value.
128+
-- - nil and error, when something goes wrong on fetching attempt.
129+
--
130+
function sharding_key_module.fetch_on_router(space_name, timeout)
131+
dev_checks('string', '?number')
132+
133+
if cache.sharding_key_as_index_obj_map ~= nil then
134+
return cache.sharding_key_as_index_obj_map[space_name]
135+
end
136+
137+
local timeout = timeout or const.FETCH_SHARDING_KEY_TIMEOUT
138+
local err = _fetch_on_router(timeout)
139+
if err ~= nil then
140+
if cache.sharding_key_as_index_obj_map ~= nil then
141+
return cache.sharding_key_as_index_obj_map[space_name]
142+
end
143+
return nil, err
144+
end
145+
146+
if cache.sharding_key_as_index_obj_map ~= nil then
147+
return cache.sharding_key_as_index_obj_map[space_name]
148+
end
149+
150+
return nil, FetchShardingKeyError:new(
151+
"Fetching sharding key for space '%s' is failed", space_name)
152+
end
153+
154+
function sharding_key_module.init()
155+
_G._crud.fetch_on_storage = sharding_key_module.fetch_on_storage
156+
end
157+
158+
sharding_key_module.internal = {
159+
as_index_object = as_index_object,
160+
}
161+
162+
return sharding_key_module

crud/common/sharding_key_cache.lua

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
local fiber = require('fiber')
2+
3+
local sharding_key_cache = {}
4+
5+
sharding_key_cache.sharding_key_as_index_obj_map = nil
6+
sharding_key_cache.fetch_lock = fiber.channel(1)
7+
8+
return sharding_key_cache

crud/common/utils.lua

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,19 @@ function utils.get_bucket_id_fieldno(space, shard_index_name)
421421
return bucket_id_index.parts[1].fieldno
422422
end
423423

424+
-- Build a map with field names as a keys and fieldno's
425+
-- as a values using space format as a source.
426+
function utils.get_format_fieldno_map(space_format)
427+
dev_checks('table')
428+
429+
local fieldno_map = {}
430+
for fieldno, field_format in ipairs(space_format) do
431+
fieldno_map[field_format.name] = fieldno
432+
end
433+
434+
return fieldno_map
435+
end
436+
424437
local uuid_t = ffi.typeof('struct tt_uuid')
425438
function utils.is_uuid(value)
426439
return ffi.istype(uuid_t, value)

test/entrypoint/srv_ddl.lua

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
#!/usr/bin/env tarantool
2+
3+
require('strict').on()
4+
_G.is_initialized = function() return false end
5+
6+
local log = require('log')
7+
local errors = require('errors')
8+
local cartridge = require('cartridge')
9+
local ddl = require('ddl')
10+
11+
package.preload['customers-storage'] = function()
12+
return {
13+
role_name = 'customers-storage',
14+
init = function()
15+
local engine = os.getenv('ENGINE') or 'memtx'
16+
local customers_schema = {
17+
engine = engine,
18+
is_local = true,
19+
temporary = false,
20+
format = {
21+
{name = 'id', is_nullable = false, type = 'unsigned'},
22+
{name = 'bucket_id', is_nullable = false, type = 'unsigned'},
23+
{name = 'name', is_nullable = false, type = 'string'},
24+
{name = 'age', is_nullable = false, type = 'number'},
25+
},
26+
indexes = {
27+
-- This table is intentionally blank.
28+
},
29+
}
30+
31+
local primary_index = {
32+
name = 'id',
33+
type = 'TREE',
34+
unique = true,
35+
parts = {
36+
{path = 'id', is_nullable = false, type = 'unsigned'},
37+
{path = 'name', is_nullable = false, type = 'string'},
38+
},
39+
}
40+
local bucket_id_index = {
41+
name = 'bucket_id',
42+
type = 'TREE',
43+
unique = false,
44+
parts = {
45+
{path = 'bucket_id', is_nullable = false, type = 'unsigned'},
46+
}
47+
}
48+
49+
local customers_name_key_schema = table.deepcopy(customers_schema)
50+
customers_name_key_schema.sharding_key = {'name'}
51+
table.insert(customers_name_key_schema.indexes, primary_index)
52+
table.insert(customers_name_key_schema.indexes, bucket_id_index)
53+
54+
local schema = {
55+
spaces = {
56+
customers_name_key = customers_name_key_schema,
57+
}
58+
}
59+
60+
if not box.info.ro then
61+
local ok, err = ddl.set_schema(schema)
62+
if not ok then
63+
error(err)
64+
end
65+
end
66+
67+
rawset(_G, 'set_sharding_key', function(space_name, sharding_key_def)
68+
local fieldno_sharding_key = 2
69+
box.space['_ddl_sharding_key']:update(space_name, {{'=', fieldno_sharding_key, sharding_key_def}})
70+
end)
71+
end,
72+
}
73+
end
74+
75+
local ok, err = errors.pcall('CartridgeCfgError', cartridge.cfg, {
76+
advertise_uri = 'localhost:3301',
77+
http_port = 8081,
78+
bucket_count = 3000,
79+
roles = {
80+
'customers-storage',
81+
'cartridge.roles.crud-router',
82+
'cartridge.roles.crud-storage',
83+
},
84+
})
85+
86+
if not ok then
87+
log.error('%s', err)
88+
os.exit(1)
89+
end
90+
91+
_G.is_initialized = cartridge.is_healthy

0 commit comments

Comments
 (0)