Skip to content

Commit a4e76c7

Browse files
committed
Use DDL sharding key in get, update and delete operations
Part of #166
1 parent b26166a commit a4e76c7

File tree

4 files changed

+185
-0
lines changed

4 files changed

+185
-0
lines changed

crud/common/utils.lua

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,34 @@ function utils.get_fieldno_by_name(space_format, field_name)
393393
return field_idx
394394
end
395395

396+
--- Get a map with fieldno of passed field names.
397+
--
398+
-- @function get_keys_fieldno
399+
--
400+
-- @param table space_format
401+
-- A space format
402+
--
403+
-- @param keys
404+
-- A table with field names.
405+
--
406+
-- @return[1] table
407+
-- @treturn[2] nil
408+
-- @treturn[2] table Error description
409+
--
410+
function utils.get_keys_fieldno_map(space_format, field_names)
411+
dev_checks('table', 'table')
412+
local t = {}
413+
for _, field_name in ipairs(field_names) do
414+
local fieldno, err = utils.get_fieldno_by_name(space_format, field_name)
415+
if fieldno == nil then
416+
return {}, err
417+
end
418+
t[fieldno] = true
419+
end
420+
421+
return t
422+
end
423+
396424
local uuid_t = ffi.typeof('struct tt_uuid')
397425
function utils.is_uuid(value)
398426
return ffi.istype(uuid_t, value)
@@ -542,4 +570,24 @@ function utils.flatten_obj_reload(space_name, obj)
542570
return schema.wrap_func_reload(flatten_obj, space_name, obj)
543571
end
544572

573+
function utils.has_value(t, value)
574+
for _, v in ipairs(t) do
575+
if value == v then
576+
return true
577+
end
578+
end
579+
580+
return false
581+
end
582+
583+
function utils.get_index_fieldno_map(index_obj)
584+
local t = {}
585+
for i, part in index_obj.parts do
586+
local fieldno = part[i].fieldno
587+
t[fieldno] = true
588+
end
589+
590+
return t
591+
end
592+
545593
return utils

crud/delete.lua

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,50 @@ local function call_delete_on_router(space_name, key, opts)
5555
key = key:totable()
5656
end
5757

58+
local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name)
59+
if ddl_sharding_key ~= nil then
60+
local sharding_key_fieldno_map, err = utils.get_keys_fieldno_map(space_format, ddl_sharding_key)
61+
if next(sharding_key_fieldno_map) == nil then
62+
return nil, err
63+
end
64+
65+
-- Make sure fields used in sharding key are present in primary index
66+
-- TODO: how to invalidate sharding_key_missed_in_index?
67+
if sharding_key_missed_in_index == nil then
68+
local primary_index_fieldno = utils.get_index_fieldno_map(primary_index)
69+
for fieldno in pairs(sharding_key_fieldno_map) do
70+
if primary_index_fieldno[fieldno] == false then
71+
sharding_key_missed_in_index = true
72+
break
73+
end
74+
end
75+
end
76+
if sharding_key_missed_in_index == true and opts.bucket_id == nil then
77+
return nil, GetError:new("Sharding key is missed in primary index, specify bucket_id"), true
78+
end
79+
80+
-- Make sure fields used in primary key are present in sharding key
81+
local updated_key = {}
82+
if type(key) == 'number' then
83+
local fieldno = primary_index.parts[key].fieldno
84+
primary_and_sharding_key_not_matched = sharding_key_fieldno_map[fieldno]
85+
else
86+
for i, k in pairs(key) do
87+
local fieldno = primary_index.parts[i].fieldno
88+
primary_and_sharding_key_not_matched = sharding_key_fieldno_map[fieldno]
89+
if primary_and_sharding_key_not_matched == nil then
90+
break
91+
end
92+
table.insert(updated_key, k)
93+
end
94+
end
95+
if primary_and_sharding_key_not_matched == nil then
96+
return nil, GetError:new("Primary key is not a part of sharding key"), true
97+
end
98+
99+
key = updated_key
100+
end
101+
58102
local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
59103
local call_opts = {
60104
mode = 'write',

crud/get.lua

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ local GetError = errors.new_class('Get', {capture_stack = false})
1212

1313
local get = {}
1414

15+
local sharding_key_missed_in_index
16+
local primary_and_sharding_key_not_matched
17+
1518
local GET_FUNC_NAME = '_crud.get_on_storage'
1619

1720
local function get_on_storage(space_name, key, field_names)
@@ -53,11 +56,57 @@ local function call_get_on_router(space_name, key, opts)
5356
if space == nil then
5457
return nil, GetError:new("Space %q doesn't exist", space_name), true
5558
end
59+
local primary_index = space.index[0]
60+
local space_format = space:format()
5661

5762
if box.tuple.is(key) then
5863
key = key:totable()
5964
end
6065

66+
local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name)
67+
if ddl_sharding_key ~= nil then
68+
local sharding_key_fieldno_map, err = utils.get_keys_fieldno_map(space_format, ddl_sharding_key)
69+
if next(sharding_key_fieldno_map) == nil then
70+
return nil, err
71+
end
72+
73+
-- Make sure fields used in sharding key are present in primary index
74+
-- TODO: how to invalidate sharding_key_missed_in_index?
75+
if sharding_key_missed_in_index == nil then
76+
local primary_index_fieldno = utils.get_index_fieldno_map(primary_index)
77+
for fieldno in pairs(sharding_key_fieldno_map) do
78+
if primary_index_fieldno[fieldno] == false then
79+
sharding_key_missed_in_index = true
80+
break
81+
end
82+
end
83+
end
84+
if sharding_key_missed_in_index == true and opts.bucket_id == nil then
85+
return nil, GetError:new("Sharding key is missed in primary index, specify bucket_id"), true
86+
end
87+
88+
-- Make sure fields used in primary key are present in sharding key
89+
local updated_key = {}
90+
if type(key) == 'number' then
91+
local fieldno = primary_index.parts[key].fieldno
92+
primary_and_sharding_key_not_matched = sharding_key_fieldno_map[fieldno]
93+
else
94+
for i, k in pairs(key) do
95+
local fieldno = primary_index.parts[i].fieldno
96+
primary_and_sharding_key_not_matched = sharding_key_fieldno_map[fieldno]
97+
if primary_and_sharding_key_not_matched == nil then
98+
break
99+
end
100+
table.insert(updated_key, k)
101+
end
102+
end
103+
if primary_and_sharding_key_not_matched == nil then
104+
return nil, GetError:new("Primary key is not a part of sharding key"), true
105+
end
106+
107+
key = updated_key
108+
end
109+
61110
local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
62111
local call_opts = {
63112
mode = opts.mode or 'read',

crud/update.lua

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,50 @@ local function call_update_on_router(space_name, key, user_operations, opts)
8383
key = key:totable()
8484
end
8585

86+
local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name)
87+
if ddl_sharding_key ~= nil then
88+
local sharding_key_fieldno_map, err = utils.get_keys_fieldno_map(space_format, ddl_sharding_key)
89+
if next(sharding_key_fieldno_map) == nil then
90+
return nil, err
91+
end
92+
93+
-- Make sure fields used in sharding key are present in primary index
94+
-- TODO: how to invalidate sharding_key_missed_in_index?
95+
if sharding_key_missed_in_index == nil then
96+
local primary_index_fieldno = utils.get_index_fieldno_map(primary_index)
97+
for fieldno in pairs(sharding_key_fieldno_map) do
98+
if primary_index_fieldno[fieldno] == false then
99+
sharding_key_missed_in_index = true
100+
break
101+
end
102+
end
103+
end
104+
if sharding_key_missed_in_index == true and opts.bucket_id == nil then
105+
return nil, GetError:new("Sharding key is missed in primary index, specify bucket_id"), true
106+
end
107+
108+
-- Make sure fields used in primary key are present in sharding key
109+
local updated_key = {}
110+
if type(key) == 'number' then
111+
local fieldno = primary_index.parts[key].fieldno
112+
primary_and_sharding_key_not_matched = sharding_key_fieldno_map[fieldno]
113+
else
114+
for i, k in pairs(key) do
115+
local fieldno = primary_index.parts[i].fieldno
116+
primary_and_sharding_key_not_matched = sharding_key_fieldno_map[fieldno]
117+
if primary_and_sharding_key_not_matched == nil then
118+
break
119+
end
120+
table.insert(updated_key, k)
121+
end
122+
end
123+
if primary_and_sharding_key_not_matched == nil then
124+
return nil, GetError:new("Primary key is not a part of sharding key"), true
125+
end
126+
127+
key = updated_key
128+
end
129+
86130
local operations = user_operations
87131
if not utils.tarantool_supports_fieldpaths() then
88132
operations, err = utils.convert_operations(user_operations, space_format)

0 commit comments

Comments
 (0)