Skip to content

Commit 775aa91

Browse files
internal: rework sharding schema reload
This patch is the groundwork for vshard groups and custom routers support. After this patch, sharding schema reload works per vshard router object. Test runs have shown that this patch do not affects the performance of crud requests. Part of #44
1 parent 0e785e1 commit 775aa91

File tree

13 files changed

+122
-65
lines changed

13 files changed

+122
-65
lines changed

crud/common/sharding/init.lua

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
3232
return { bucket_id = specified_bucket_id }
3333
end
3434

35-
local sharding_func_data, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
35+
local vshard_router = vshard.router.static
36+
local sharding_func_data, err = sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, space_name)
3637
if err ~= nil then
3738
return nil, err
3839
end
@@ -53,7 +54,8 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
5354
end
5455

5556
local sharding_index_parts = space.index[0].parts
56-
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space.name)
57+
local vshard_router = vshard.router.static
58+
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space.name)
5759
if err ~= nil then
5860
return nil, err
5961
end

crud/common/sharding/router_metadata_cache.lua

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,47 @@ local router_metadata_cache = {}
55
router_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map"
66
router_metadata_cache.SHARDING_FUNC_MAP_NAME = "sharding_func_map"
77
router_metadata_cache.META_HASH_MAP_NAME = "sharding_meta_hash_map"
8-
router_metadata_cache[router_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
9-
router_metadata_cache[router_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
10-
router_metadata_cache[router_metadata_cache.META_HASH_MAP_NAME] = {}
11-
router_metadata_cache.fetch_lock = fiber.channel(1)
12-
router_metadata_cache.is_part_of_pk = {}
8+
9+
local internal_storage = {}
10+
11+
function router_metadata_cache.get_instance(vshard_router)
12+
local name = vshard_router.name
13+
14+
if internal_storage[name] ~= nil then
15+
return internal_storage[name]
16+
end
17+
18+
internal_storage[name] = {
19+
[router_metadata_cache.SHARDING_KEY_MAP_NAME] = nil,
20+
[router_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil,
21+
[router_metadata_cache.META_HASH_MAP_NAME] = {},
22+
fetch_lock = fiber.channel(1),
23+
is_part_of_pk = {}
24+
}
25+
26+
return internal_storage[name]
27+
end
28+
29+
function router_metadata_cache.drop_instance(vshard_router)
30+
local name = vshard_router.name
31+
32+
if internal_storage[name] == nil then
33+
return
34+
end
35+
36+
if internal_storage[name].fetch_lock ~= nil then
37+
internal_storage[name].fetch_lock:close()
38+
end
39+
40+
internal_storage[name] = nil
41+
end
1342

1443
function router_metadata_cache.drop_caches()
15-
router_metadata_cache[router_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
16-
router_metadata_cache[router_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
17-
router_metadata_cache[router_metadata_cache.META_HASH_MAP_NAME] = {}
18-
if router_metadata_cache.fetch_lock ~= nil then
19-
router_metadata_cache.fetch_lock:close()
44+
for name, _ in pairs(internal_storage) do
45+
router_metadata_cache.drop_instance(name)
2046
end
21-
router_metadata_cache.fetch_lock = fiber.channel(1)
22-
router_metadata_cache.is_part_of_pk = {}
47+
48+
internal_storage = {}
2349
end
2450

2551
return router_metadata_cache

crud/common/sharding/sharding_func.lua

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
local errors = require('errors')
22
local log = require('log')
3+
local vshard = require('vshard')
34

45
local dev_checks = require('crud.common.dev_checks')
5-
local cache = require('crud.common.sharding.router_metadata_cache')
6+
local router_cache = require('crud.common.sharding.router_metadata_cache')
67
local utils = require('crud.common.utils')
78

89
local ShardingFuncError = errors.new_class('ShardingFuncError', {capture_stack = false})
@@ -106,11 +107,13 @@ function sharding_func_module.construct_as_callable_obj_cache(metadata_map, spec
106107

107108
local result_err
108109

109-
cache[cache.SHARDING_FUNC_MAP_NAME] = {}
110-
local func_cache = cache[cache.SHARDING_FUNC_MAP_NAME]
110+
local vshard_router = vshard.router.static
111+
local cache = router_cache.get_instance(vshard_router)
112+
cache[router_cache.SHARDING_FUNC_MAP_NAME] = {}
113+
local func_cache = cache[router_cache.SHARDING_FUNC_MAP_NAME]
111114

112-
cache[cache.META_HASH_MAP_NAME][cache.SHARDING_FUNC_MAP_NAME] = {}
113-
local func_hash_cache = cache[cache.META_HASH_MAP_NAME][cache.SHARDING_FUNC_MAP_NAME]
115+
cache[router_cache.META_HASH_MAP_NAME][router_cache.SHARDING_FUNC_MAP_NAME] = {}
116+
local func_hash_cache = cache[router_cache.META_HASH_MAP_NAME][router_cache.SHARDING_FUNC_MAP_NAME]
114117

115118
for space_name, metadata in pairs(metadata_map) do
116119
if metadata.sharding_func_def ~= nil then

crud/common/sharding/sharding_key.lua

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
local errors = require('errors')
22
local log = require('log')
3+
local vshard = require('vshard')
34

45
local dev_checks = require('crud.common.dev_checks')
5-
local cache = require('crud.common.sharding.router_metadata_cache')
6+
local router_cache = require('crud.common.sharding.router_metadata_cache')
67
local utils = require('crud.common.utils')
78

89
local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false})
@@ -30,8 +31,8 @@ local function as_index_object(space_name, space_format, sharding_key_def)
3031
end
3132

3233
-- Make sure sharding key definition is a part of primary key.
33-
local function is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj)
34-
dev_checks('string', 'table', 'table')
34+
local function is_part_of_pk(cache, space_name, primary_index_parts, sharding_key_as_index_obj)
35+
dev_checks('table', 'string', 'table', 'table')
3536

3637
if cache.is_part_of_pk[space_name] ~= nil then
3738
return cache.is_part_of_pk[space_name]
@@ -83,7 +84,9 @@ function sharding_key_module.extract_from_pk(space_name, sharding_key_as_index_o
8384
return primary_key
8485
end
8586

86-
local res = is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj)
87+
local vshard_router = vshard.router.static
88+
local cache = router_cache.get_instance(vshard_router)
89+
local res = is_part_of_pk(cache, space_name, primary_index_parts, sharding_key_as_index_obj)
8790
if res == false then
8891
return nil, ShardingKeyError:new(
8992
"Sharding key for space %q is missed in primary index, specify bucket_id",
@@ -102,11 +105,13 @@ function sharding_key_module.construct_as_index_obj_cache(metadata_map, specifie
102105

103106
local result_err
104107

105-
cache[cache.SHARDING_KEY_MAP_NAME] = {}
106-
local key_cache = cache[cache.SHARDING_KEY_MAP_NAME]
108+
local vshard_router = vshard.router.static
109+
local cache = router_cache.get_instance(vshard_router)
110+
cache[router_cache.SHARDING_KEY_MAP_NAME] = {}
111+
local key_cache = cache[router_cache.SHARDING_KEY_MAP_NAME]
107112

108-
cache[cache.META_HASH_MAP_NAME][cache.SHARDING_KEY_MAP_NAME] = {}
109-
local key_hash_cache = cache[cache.META_HASH_MAP_NAME][cache.SHARDING_KEY_MAP_NAME]
113+
cache[router_cache.META_HASH_MAP_NAME][router_cache.SHARDING_KEY_MAP_NAME] = {}
114+
local key_hash_cache = cache[router_cache.META_HASH_MAP_NAME][router_cache.SHARDING_KEY_MAP_NAME]
110115

111116
for space_name, metadata in pairs(metadata_map) do
112117
if metadata.sharding_key_def ~= nil then

crud/common/sharding/sharding_metadata.lua

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
local fiber = require('fiber')
22
local errors = require('errors')
33
local log = require('log')
4+
local vshard = require('vshard')
45

56
local call = require('crud.common.call')
67
local const = require('crud.common.const')
78
local dev_checks = require('crud.common.dev_checks')
8-
local cache = require('crud.common.sharding.router_metadata_cache')
9+
local router_cache = require('crud.common.sharding.router_metadata_cache')
910
local storage_cache = require('crud.common.sharding.storage_metadata_cache')
1011
local sharding_func = require('crud.common.sharding.sharding_func')
1112
local sharding_key = require('crud.common.sharding.sharding_key')
@@ -22,8 +23,11 @@ local sharding_metadata_module = {}
2223
local function locked(f)
2324
dev_checks('function')
2425

25-
return function(timeout, ...)
26+
return function(timeout, vshard_router, ...)
2627
local timeout_deadline = fiber.clock() + timeout
28+
29+
local cache = router_cache.get_instance(vshard_router)
30+
2731
local ok = cache.fetch_lock:put(true, timeout)
2832
-- channel:put() returns false in two cases: when timeout is exceeded
2933
-- or channel has been closed. However error message describes only
@@ -34,7 +38,7 @@ local function locked(f)
3438
"Timeout for fetching sharding metadata is exceeded")
3539
end
3640
local timeout = timeout_deadline - fiber.clock()
37-
local status, err = pcall(f, timeout, ...)
41+
local status, err = pcall(f, timeout, vshard_router, ...)
3842
cache.fetch_lock:get()
3943
if not status or err ~= nil then
4044
return err
@@ -95,8 +99,10 @@ end
9599
-- cache.fetch_lock become unlocked during timeout passed to
96100
-- _fetch_on_router().
97101
-- metadata_map_name == nil means forced reload.
98-
local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name)
99-
dev_checks('number', 'string', '?string')
102+
local _fetch_on_router = locked(function(timeout, vshard_router, space_name, metadata_map_name)
103+
dev_checks('number', 'table', 'string', '?string')
104+
105+
local cache = router_cache.get_instance(vshard_router)
100106

101107
if (metadata_map_name ~= nil) and (cache[metadata_map_name]) ~= nil then
102108
return
@@ -109,11 +115,11 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name)
109115
return err
110116
end
111117
if metadata_map == nil then
112-
cache[cache.SHARDING_KEY_MAP_NAME] = {}
113-
cache[cache.SHARDING_FUNC_MAP_NAME] = {}
114-
cache[cache.META_HASH_MAP_NAME] = {
115-
[cache.SHARDING_KEY_MAP_NAME] = {},
116-
[cache.SHARDING_FUNC_MAP_NAME] = {},
118+
cache[router_cache.SHARDING_KEY_MAP_NAME] = {}
119+
cache[router_cache.SHARDING_FUNC_MAP_NAME] = {}
120+
cache[router_cache.META_HASH_MAP_NAME] = {
121+
[router_cache.SHARDING_KEY_MAP_NAME] = {},
122+
[router_cache.SHARDING_FUNC_MAP_NAME] = {},
117123
}
118124
return
119125
end
@@ -129,24 +135,26 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name)
129135
end
130136
end)
131137

132-
local function fetch_on_router(space_name, metadata_map_name, timeout)
138+
local function fetch_on_router(vshard_router, space_name, metadata_map_name, timeout)
139+
local cache = router_cache.get_instance(vshard_router)
140+
133141
if cache[metadata_map_name] ~= nil then
134142
return {
135143
value = cache[metadata_map_name][space_name],
136-
hash = cache[cache.META_HASH_MAP_NAME][metadata_map_name][space_name]
144+
hash = cache[router_cache.META_HASH_MAP_NAME][metadata_map_name][space_name]
137145
}
138146
end
139147

140148
local timeout = timeout or const.FETCH_SHARDING_METADATA_TIMEOUT
141-
local err = _fetch_on_router(timeout, space_name, metadata_map_name)
149+
local err = _fetch_on_router(timeout, vshard_router, space_name, metadata_map_name)
142150
if err ~= nil then
143151
return nil, err
144152
end
145153

146154
if cache[metadata_map_name] ~= nil then
147155
return {
148156
value = cache[metadata_map_name][space_name],
149-
hash = cache[cache.META_HASH_MAP_NAME][metadata_map_name][space_name],
157+
hash = cache[router_cache.META_HASH_MAP_NAME][metadata_map_name][space_name],
150158
}
151159
end
152160

@@ -163,10 +171,10 @@ end
163171
-- that nil without error is a successfull return value.
164172
-- - nil and error, when something goes wrong on fetching attempt.
165173
--
166-
function sharding_metadata_module.fetch_sharding_key_on_router(space_name, timeout)
167-
dev_checks('string', '?number')
174+
function sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name, timeout)
175+
dev_checks('table', 'string', '?number')
168176

169-
return fetch_on_router(space_name, cache.SHARDING_KEY_MAP_NAME, timeout)
177+
return fetch_on_router(vshard_router, space_name, router_cache.SHARDING_KEY_MAP_NAME, timeout)
170178
end
171179

172180
-- Get sharding func for a certain space.
@@ -178,28 +186,31 @@ end
178186
-- that nil without error is a successfull return value.
179187
-- - nil and error, when something goes wrong on fetching attempt.
180188
--
181-
function sharding_metadata_module.fetch_sharding_func_on_router(space_name, timeout)
182-
dev_checks('string', '?number')
189+
function sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, space_name, timeout)
190+
dev_checks('table', 'string', '?number')
183191

184-
return fetch_on_router(space_name, cache.SHARDING_FUNC_MAP_NAME, timeout)
192+
return fetch_on_router(vshard_router, space_name, router_cache.SHARDING_FUNC_MAP_NAME, timeout)
185193
end
186194

187195
function sharding_metadata_module.update_sharding_key_cache(space_name)
188-
cache.drop_caches()
196+
local vshard_router = vshard.router.static
197+
router_cache.drop_instance(vshard_router.name)
189198

190-
return sharding_metadata_module.fetch_sharding_key_on_router(space_name)
199+
return sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
191200
end
192201

193202
function sharding_metadata_module.update_sharding_func_cache(space_name)
194-
cache.drop_caches()
203+
local vshard_router = vshard.router.static
204+
router_cache.drop_instance(vshard_router.name)
195205

196-
return sharding_metadata_module.fetch_sharding_func_on_router(space_name)
206+
return sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, space_name)
197207
end
198208

199209
function sharding_metadata_module.reload_sharding_cache(space_name)
200-
cache.drop_caches()
210+
local vshard_router = vshard.router.static
211+
router_cache.drop_instance(vshard_router.name)
201212

202-
local err = _fetch_on_router(const.FETCH_SHARDING_METADATA_TIMEOUT, space_name, nil)
213+
local err = _fetch_on_router(const.FETCH_SHARDING_METADATA_TIMEOUT, vshard_router, space_name, nil)
203214
if err ~= nil then
204215
log.warn('Failed to reload sharding cache: %s', err)
205216
end

crud/count.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ local function call_count_on_router(space_name, user_conditions, opts)
151151

152152
-- We don't need sharding info if bucket_id specified.
153153
if opts.bucket_id == nil then
154-
sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
154+
sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
155155
if err ~= nil then
156156
return nil, err
157157
end

crud/delete.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ local function call_delete_on_router(space_name, key, opts)
8181
if opts.bucket_id == nil then
8282
local primary_index_parts = space.index[0].parts
8383

84-
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
84+
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
8585
if err ~= nil then
8686
return nil, err
8787
end

crud/get.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ local function call_get_on_router(space_name, key, opts)
8484
if opts.bucket_id == nil then
8585
local primary_index_parts = space.index[0].parts
8686

87-
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
87+
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
8888
if err ~= nil then
8989
return nil, err
9090
end

crud/select/compat/select.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ local function build_select_iterator(space_name, user_conditions, opts)
6161

6262
-- We don't need sharding info if bucket_id specified.
6363
if opts.bucket_id == nil then
64-
sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
64+
sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
6565
if err ~= nil then
6666
return nil, err
6767
end

crud/select/compat/select_old.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ local function build_select_iterator(space_name, user_conditions, opts)
123123
local sharding_key_as_index_obj = nil
124124
-- We don't need sharding info if bucket_id specified.
125125
if opts.bucket_id == nil then
126-
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
126+
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
127127
if err ~= nil then
128128
return nil, err
129129
end

0 commit comments

Comments
 (0)