Skip to content

Commit fec0de8

Browse files
committed
Support custom sharding key in delete, update and get
Part of #166
1 parent e4c08e8 commit fec0de8

File tree

9 files changed

+581
-3
lines changed

9 files changed

+581
-3
lines changed

crud/common/sharding_key.lua

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ local dev_checks = require('crud.common.dev_checks')
77
local cache = require('crud.common.sharding_key_cache')
88
local utils = require('crud.common.utils')
99

10+
local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false})
1011
local FetchShardingKeyError = errors.new_class('FetchShardingKeyError', {capture_stack = false})
1112
local GetShardingKeyFieldnosError = errors.new_class('GetShardingKeyFieldnosError', {capture_stack = false})
1213

@@ -115,6 +116,87 @@ function sharding_key_module.fetch_on_router(space_name)
115116
return cache.sharding_key_def
116117
end
117118

119+
-- Make sure sharding key definition is a part of primary key.
120+
function sharding_key_module.is_part_of_pk(space_name, space_format,
121+
primary_index, sharding_key_def)
122+
dev_checks('string', 'table', 'table', 'table')
123+
124+
if cache.as_index_object[space_name] == nil then
125+
cache.as_index_object[space_name] =
126+
sharding_key_module.as_index_object(space_format, space_name,
127+
sharding_key_def)
128+
end
129+
local sharding_key_as_index_obj = cache.as_index_object[space_name]
130+
131+
if cache.is_part_of_pk[space_name] ~= nil then
132+
return cache.is_part_of_pk[space_name]
133+
end
134+
135+
local is_part_of_pk = true
136+
local pk_fieldno_map = utils.get_index_fieldno_map(primary_index.parts)
137+
for _, part in ipairs(sharding_key_as_index_obj.parts) do
138+
if pk_fieldno_map[part.fieldno] == nil then
139+
is_part_of_pk = false
140+
break
141+
end
142+
end
143+
cache.is_part_of_pk[space_name] = is_part_of_pk
144+
145+
return is_part_of_pk
146+
end
147+
148+
-- Build an array with sharding key values.
149+
function sharding_key_module.extract_from_index(sharding_key_def,
150+
index_parts,
151+
sharding_key_fieldno_map)
152+
dev_checks('table', 'table', 'table')
153+
154+
local sharding_key = {}
155+
for i, k in ipairs(sharding_key_def) do
156+
local fieldno = index_parts[i].fieldno
157+
if sharding_key_fieldno_map[fieldno] == true then
158+
table.insert(sharding_key, k)
159+
end
160+
end
161+
162+
return sharding_key
163+
end
164+
165+
-- Extract sharding key from pk.
166+
-- Returns a table with sharding key or pair of nil and error.
167+
function sharding_key_module.extract_from_pk(space_obj, sharding_key)
168+
dev_checks('table', '?')
169+
170+
local space_name = space_obj.name
171+
local sharding_key_def, err = sharding_key_module.fetch_on_router(space_name)
172+
if err ~= nil then
173+
return nil, err
174+
end
175+
if sharding_key_def == nil then
176+
return sharding_key
177+
end
178+
179+
local primary_index = space_obj.index[0]
180+
local space_format = space_obj:format()
181+
182+
local is_part_of_pk = sharding_key_module.is_part_of_pk(space_name, space_format,
183+
primary_index, sharding_key_def)
184+
if is_part_of_pk == false then
185+
return nil, ShardingKeyError:new(
186+
"Sharding key for space %q is missed in primary index, specify bucket_id",
187+
space_name
188+
)
189+
end
190+
if type(sharding_key) ~= 'table' then
191+
sharding_key = {sharding_key}
192+
end
193+
local sharding_key_as_index_obj = cache.as_index_object[space_name]
194+
local sharding_key_fieldno_map = utils.get_index_fieldno_map(sharding_key_as_index_obj.parts)
195+
196+
return sharding_key_module.extract_from_index(sharding_key, primary_index.parts,
197+
sharding_key_fieldno_map)
198+
end
199+
118200
function sharding_key_module.init()
119201
_G._crud.fetch_on_storage = sharding_key_module.fetch_on_storage
120202
end

crud/common/sharding_key_cache.lua

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
local sharding_key_cache = {}
22

33
sharding_key_cache.sharding_key_def = nil
4+
sharding_key_cache.is_part_of_pk = {}
5+
sharding_key_cache.as_index_object = {}
6+
7+
function sharding_key_cache.drop_caches()
8+
sharding_key_cache.sharding_key_def = nil
9+
sharding_key_cache.is_part_of_pk = {}
10+
sharding_key_cache.as_index_object = {}
11+
end
412

513
return sharding_key_cache

crud/common/utils.lua

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,18 @@ function utils.get_bucket_id_fieldno(space, shard_index_name)
375375
return bucket_id_index.parts[1].fieldno
376376
end
377377

378+
function utils.get_index_fieldno_map(index_parts)
379+
dev_checks('table')
380+
381+
local fieldno_map = {}
382+
for _, part in ipairs(index_parts) do
383+
local fieldno = part.fieldno
384+
fieldno_map[fieldno] = true
385+
end
386+
387+
return fieldno_map
388+
end
389+
378390
-- Build a map with field names as a keys and fieldno's
379391
-- as a values using space format as a source.
380392
function utils.get_format_fieldno_map(space_format)

crud/delete.lua

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local utils = require('crud.common.utils')
77
local sharding = require('crud.common.sharding')
8+
local sharding_key_module = require('crud.common.sharding_key')
89
local dev_checks = require('crud.common.dev_checks')
910
local schema = require('crud.common.schema')
1011

@@ -55,7 +56,16 @@ local function call_delete_on_router(space_name, key, opts)
5556
key = key:totable()
5657
end
5758

58-
local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
59+
local sharding_key = key
60+
if opts.bucket_id == nil then
61+
local err
62+
sharding_key, err = sharding_key_module.extract_from_pk(space, key)
63+
if err ~= nil then
64+
return nil, err
65+
end
66+
end
67+
68+
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
5969
local call_opts = {
6070
mode = 'write',
6171
timeout = opts.timeout,

crud/get.lua

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local utils = require('crud.common.utils')
77
local sharding = require('crud.common.sharding')
8+
local sharding_key_module = require('crud.common.sharding_key')
89
local dev_checks = require('crud.common.dev_checks')
910
local schema = require('crud.common.schema')
1011

@@ -58,7 +59,16 @@ local function call_get_on_router(space_name, key, opts)
5859
key = key:totable()
5960
end
6061

61-
local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
62+
local sharding_key = key
63+
if opts.bucket_id == nil then
64+
local err
65+
sharding_key, err = sharding_key_module.extract_from_pk(space, key)
66+
if err ~= nil then
67+
return nil, err
68+
end
69+
end
70+
71+
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
6272
local call_opts = {
6373
mode = opts.mode or 'read',
6474
prefer_replica = opts.prefer_replica,

crud/update.lua

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local utils = require('crud.common.utils')
77
local sharding = require('crud.common.sharding')
8+
local sharding_key_module = require('crud.common.sharding_key')
89
local dev_checks = require('crud.common.dev_checks')
910
local schema = require('crud.common.schema')
1011

@@ -83,6 +84,15 @@ local function call_update_on_router(space_name, key, user_operations, opts)
8384
key = key:totable()
8485
end
8586

87+
local sharding_key = key
88+
if opts.bucket_id == nil then
89+
local err
90+
sharding_key, err = sharding_key_module.extract_from_pk(space, key)
91+
if err ~= nil then
92+
return nil, err
93+
end
94+
end
95+
8696
local operations = user_operations
8797
if not utils.tarantool_supports_fieldpaths() then
8898
operations, err = utils.convert_operations(user_operations, space_format)
@@ -91,7 +101,7 @@ local function call_update_on_router(space_name, key, user_operations, opts)
91101
end
92102
end
93103

94-
local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
104+
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
95105
local call_opts = {
96106
mode = 'write',
97107
timeout = opts.timeout,

test/entrypoint/srv_ddl.lua

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ package.preload['customers-storage'] = function()
4545
sharding_key = {'name'},
4646
}
4747

48+
local customers_age_key_schema = table.deepcopy(customers_name_key_schema)
49+
customers_age_key_schema.sharding_key = {'age'}
4850
local name_index = {
4951
name = 'name',
5052
type = 'TREE',
@@ -91,6 +93,7 @@ package.preload['customers-storage'] = function()
9193
customers_name_key = customers_name_key_schema,
9294
customers_name_key_uniq_index = customers_name_key_uniq_index_schema,
9395
customers_name_key_non_uniq_index = customers_name_key_non_uniq_index_schema,
96+
customers_age_key = customers_age_key_schema,
9497
customers_secondary_idx_name_key = customers_secondary_idx_name_key_schema,
9598
}
9699
}
@@ -101,6 +104,11 @@ package.preload['customers-storage'] = function()
101104
error(err)
102105
end
103106
end
107+
108+
rawset(_G, 'set_sharding_key', function(space_name, sharding_key_def)
109+
local fieldno_sharding_key = 2
110+
box.space['_ddl_sharding_key']:update(space_name, {{'=', fieldno_sharding_key, sharding_key_def}})
111+
end)
104112
end,
105113
}
106114
end

0 commit comments

Comments
 (0)