Skip to content

Commit 7228c75

Browse files
committed
Support custom sharding key in delete, update and get
Part of #166 Reviewed-by: Alexander Turenko <[email protected]> Reviewed-by: Oleg Babin <[email protected]>
1 parent 5abe1c1 commit 7228c75

File tree

9 files changed

+487
-3
lines changed

9 files changed

+487
-3
lines changed

crud/common/sharding_key.lua

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ local dev_checks = require('crud.common.dev_checks')
77
local cache = require('crud.common.sharding_key_cache')
88
local utils = require('crud.common.utils')
99

10+
local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false})
1011
local FetchShardingKeyError = errors.new_class('FetchShardingKeyError', {capture_stack = false})
1112
local WrongShardingConfigurationError = errors.new_class('WrongShardingConfigurationError', {capture_stack = false})
1213

@@ -151,12 +152,86 @@ function sharding_key_module.fetch_on_router(space_name, timeout)
151152
"Fetching sharding key for space '%s' is failed", space_name)
152153
end
153154

155+
-- Make sure sharding key definition is a part of primary key.
156+
local function is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj)
157+
dev_checks('string', 'table', 'table')
158+
159+
if cache.is_part_of_pk[space_name] ~= nil then
160+
return cache.is_part_of_pk[space_name]
161+
end
162+
163+
local is_part_of_pk = true
164+
local pk_fieldno_map = utils.get_index_fieldno_map(primary_index_parts)
165+
for _, part in ipairs(sharding_key_as_index_obj.parts) do
166+
if pk_fieldno_map[part.fieldno] == nil then
167+
is_part_of_pk = false
168+
break
169+
end
170+
end
171+
cache.is_part_of_pk[space_name] = is_part_of_pk
172+
173+
return is_part_of_pk
174+
end
175+
176+
-- Build an array with sharding key values. Function extracts those values from
177+
-- primary key that are part of sharding key (passed as index object).
178+
local function extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj)
179+
dev_checks('table', 'table', 'table')
180+
181+
-- TODO: extract_from_index() calculates primary_index_parts on each
182+
-- request. It is better to cache it's value.
183+
-- https://github.com/tarantool/crud/issues/243
184+
local primary_index_fieldno_map = utils.get_index_fieldno_map(primary_index_parts)
185+
186+
local sharding_key = {}
187+
for _, part in ipairs(sharding_key_as_index_obj.parts) do
188+
-- part_number cannot be nil because earlier we checked that tuple
189+
-- field names defined in sharding key definition are part of primary
190+
-- key.
191+
local part_number = primary_index_fieldno_map[part.fieldno]
192+
assert(part_number ~= nil)
193+
local field_value = primary_key[part_number]
194+
table.insert(sharding_key, field_value)
195+
end
196+
197+
return sharding_key
198+
end
199+
200+
-- Extract sharding key from pk.
201+
-- Returns a table with sharding key or pair of nil and error.
202+
function sharding_key_module.extract_from_pk(space_name, primary_index_parts, primary_key, timeout)
203+
dev_checks('string', 'table', '?', '?number')
204+
205+
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name, timeout)
206+
if err ~= nil then
207+
return nil, err
208+
end
209+
if sharding_key_as_index_obj == nil then
210+
return primary_key
211+
end
212+
213+
local res = is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj)
214+
if res == false then
215+
return nil, ShardingKeyError:new(
216+
"Sharding key for space %q is missed in primary index, specify bucket_id",
217+
space_name
218+
)
219+
end
220+
if type(primary_key) ~= 'table' then
221+
primary_key = {primary_key}
222+
end
223+
224+
return extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj)
225+
end
226+
154227
function sharding_key_module.init()
155228
_G._crud.fetch_on_storage = sharding_key_module.fetch_on_storage
156229
end
157230

158231
sharding_key_module.internal = {
159232
as_index_object = as_index_object,
233+
extract_from_index = extract_from_index,
234+
is_part_of_pk = is_part_of_pk,
160235
}
161236

162237
return sharding_key_module

crud/common/sharding_key_cache.lua

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,15 @@ local sharding_key_cache = {}
44

55
sharding_key_cache.sharding_key_as_index_obj_map = nil
66
sharding_key_cache.fetch_lock = fiber.channel(1)
7+
sharding_key_cache.is_part_of_pk = {}
8+
9+
function sharding_key_cache.drop_caches()
10+
sharding_key_cache.sharding_key_as_index_obj_map = nil
11+
if sharding_key_cache.fetch_lock ~= nil then
12+
sharding_key_cache.fetch_lock:close()
13+
end
14+
sharding_key_cache.fetch_lock = fiber.channel(1)
15+
sharding_key_cache.is_part_of_pk = {}
16+
end
717

818
return sharding_key_cache

crud/common/utils.lua

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,20 @@ function utils.get_bucket_id_fieldno(space, shard_index_name)
421421
return bucket_id_index.parts[1].fieldno
422422
end
423423

424+
-- Build a map with field number as a keys and part number
425+
-- as a values using index parts as a source.
426+
function utils.get_index_fieldno_map(index_parts)
427+
dev_checks('table')
428+
429+
local fieldno_map = {}
430+
for i, part in ipairs(index_parts) do
431+
local fieldno = part.fieldno
432+
fieldno_map[fieldno] = i
433+
end
434+
435+
return fieldno_map
436+
end
437+
424438
-- Build a map with field names as a keys and fieldno's
425439
-- as a values using space format as a source.
426440
function utils.get_format_fieldno_map(space_format)

crud/delete.lua

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local utils = require('crud.common.utils')
77
local sharding = require('crud.common.sharding')
8+
local sharding_key_module = require('crud.common.sharding_key')
89
local dev_checks = require('crud.common.dev_checks')
910
local schema = require('crud.common.schema')
1011

@@ -55,7 +56,17 @@ local function call_delete_on_router(space_name, key, opts)
5556
key = key:totable()
5657
end
5758

58-
local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
59+
local sharding_key = key
60+
if opts.bucket_id == nil then
61+
local err
62+
local primary_index_parts = space.index[0].parts
63+
sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout)
64+
if err ~= nil then
65+
return nil, err
66+
end
67+
end
68+
69+
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
5970
local call_opts = {
6071
mode = 'write',
6172
timeout = opts.timeout,

crud/get.lua

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local utils = require('crud.common.utils')
77
local sharding = require('crud.common.sharding')
8+
local sharding_key_module = require('crud.common.sharding_key')
89
local dev_checks = require('crud.common.dev_checks')
910
local schema = require('crud.common.schema')
1011

@@ -58,7 +59,17 @@ local function call_get_on_router(space_name, key, opts)
5859
key = key:totable()
5960
end
6061

61-
local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
62+
local sharding_key = key
63+
if opts.bucket_id == nil then
64+
local err
65+
local primary_index_parts = space.index[0].parts
66+
sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout)
67+
if err ~= nil then
68+
return nil, err
69+
end
70+
end
71+
72+
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
6273
local call_opts = {
6374
mode = opts.mode or 'read',
6475
prefer_replica = opts.prefer_replica,

crud/update.lua

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local utils = require('crud.common.utils')
77
local sharding = require('crud.common.sharding')
8+
local sharding_key_module = require('crud.common.sharding_key')
89
local dev_checks = require('crud.common.dev_checks')
910
local schema = require('crud.common.schema')
1011

@@ -83,6 +84,16 @@ local function call_update_on_router(space_name, key, user_operations, opts)
8384
key = key:totable()
8485
end
8586

87+
local sharding_key = key
88+
if opts.bucket_id == nil then
89+
local err
90+
local primary_index_parts = space.index[0].parts
91+
sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout)
92+
if err ~= nil then
93+
return nil, err
94+
end
95+
end
96+
8697
local operations = user_operations
8798
if not utils.tarantool_supports_fieldpaths() then
8899
operations, err = utils.convert_operations(user_operations, space_format)
@@ -91,7 +102,7 @@ local function call_update_on_router(space_name, key, user_operations, opts)
91102
end
92103
end
93104

94-
local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
105+
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
95106
local call_opts = {
96107
mode = 'write',
97108
timeout = opts.timeout,

test/entrypoint/srv_ddl.lua

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,18 @@ package.preload['customers-storage'] = function()
9595
table.insert(customers_secondary_idx_name_key_schema.indexes, secondary_index)
9696
table.insert(customers_secondary_idx_name_key_schema.indexes, bucket_id_index)
9797

98+
local customers_age_key_schema = table.deepcopy(customers_schema)
99+
customers_age_key_schema.sharding_key = {'age'}
100+
table.insert(customers_age_key_schema.indexes, primary_index)
101+
table.insert(customers_age_key_schema.indexes, bucket_id_index)
102+
98103
local schema = {
99104
spaces = {
100105
customers_name_key = customers_name_key_schema,
101106
customers_name_key_uniq_index = customers_name_key_uniq_index_schema,
102107
customers_name_key_non_uniq_index = customers_name_key_non_uniq_index_schema,
103108
customers_secondary_idx_name_key = customers_secondary_idx_name_key_schema,
109+
customers_age_key = customers_age_key_schema,
104110
}
105111
}
106112

0 commit comments

Comments
 (0)