Skip to content

Commit 37c7f4b

Browse files
internal: use single router object
This patch is the groundwork for vshard groups and custom routers support. After this patch, vshard router object is retrieved only in the single point of a request. (Except for name resolving in statistics.) Test runs have shown that this patch do not affects the performance of crud requests. Part of #44
1 parent 775aa91 commit 37c7f4b

32 files changed

+306
-221
lines changed

crud/borders.lua

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,14 @@ else
6767
end
6868
end
6969

70-
local function call_get_border_on_router(border_name, space_name, index_name, opts)
71-
checks('string', 'string', '?string|number', {
70+
local function call_get_border_on_router(vshard_router, border_name, space_name, index_name, opts)
71+
checks('table', 'string', 'string', '?string|number', {
7272
timeout = '?number',
7373
fields = '?table',
7474
})
7575

7676
opts = opts or {}
7777

78-
local vshard_router = vshard.router.static
7978
local replicasets = vshard_router:routeall()
8079
local space = utils.get_space(space_name, replicasets)
8180
if space == nil then
@@ -105,7 +104,7 @@ local function call_get_border_on_router(border_name, space_name, index_name, op
105104
replicasets = replicasets,
106105
timeout = opts.timeout,
107106
}
108-
local results, err = call.map(
107+
local results, err = call.map(vshard_router,
109108
STAT_FUNC_NAME,
110109
{border_name, space_name, index.id, field_names},
111110
call_opts
@@ -161,8 +160,10 @@ local function call_get_border_on_router(border_name, space_name, index_name, op
161160
end
162161

163162
local function get_border(border_name, space_name, index_name, opts)
164-
return schema.wrap_func_reload(
165-
call_get_border_on_router, border_name, space_name, index_name, opts
163+
local vshard_router = vshard.router.static
164+
165+
return schema.wrap_func_reload(vshard_router, call_get_border_on_router,
166+
border_name, space_name, index_name, opts
166167
)
167168
end
168169

crud/common/call.lua

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
local vshard = require('vshard')
21
local errors = require('errors')
32

43
local dev_checks = require('crud.common.dev_checks')
@@ -41,14 +40,13 @@ function call.get_vshard_call_name(mode, prefer_replica, balance)
4140
return 'callbre'
4241
end
4342

44-
local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id)
43+
local function wrap_vshard_err(vshard_router, err, func_name, replicaset_uuid, bucket_id)
4544
-- Do not rewrite ShardingHashMismatchError class.
4645
if err.class_name == sharding_utils.ShardingHashMismatchError.name then
4746
return errors.wrap(err)
4847
end
4948

5049
if replicaset_uuid == nil then
51-
local vshard_router = vshard.router.static
5250
local replicaset, _ = vshard_router:route(bucket_id)
5351
if replicaset == nil then
5452
return CallError:new(
@@ -67,8 +65,8 @@ local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id)
6765
))
6866
end
6967

70-
function call.map(func_name, func_args, opts)
71-
dev_checks('string', '?table', {
68+
function call.map(vshard_router, func_name, func_args, opts)
69+
dev_checks('table', 'string', '?table', {
7270
mode = 'string',
7371
prefer_replica = '?boolean',
7472
balance = '?boolean',
@@ -88,7 +86,11 @@ function call.map(func_name, func_args, opts)
8886

8987
local iter = opts.iter
9088
if iter == nil then
91-
iter, err = BaseIterator:new({func_args = func_args, replicasets = opts.replicasets})
89+
iter, err = BaseIterator:new({
90+
func_args = func_args,
91+
replicasets = opts.replicasets,
92+
vshard_router = vshard_router,
93+
})
9294
if err ~= nil then
9395
return nil, err
9496
end
@@ -136,8 +138,8 @@ function call.map(func_name, func_args, opts)
136138
return postprocessor:get()
137139
end
138140

139-
function call.single(bucket_id, func_name, func_args, opts)
140-
dev_checks('number', 'string', '?table', {
141+
function call.single(vshard_router, bucket_id, func_name, func_args, opts)
142+
dev_checks('table', 'number', 'string', '?table', {
141143
mode = 'string',
142144
prefer_replica = '?boolean',
143145
balance = '?boolean',
@@ -151,13 +153,12 @@ function call.single(bucket_id, func_name, func_args, opts)
151153

152154
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
153155

154-
local vshard_router = vshard.router.static
155156
local res, err = vshard_router[vshard_call_name](vshard_router, bucket_id, func_name, func_args, {
156157
timeout = timeout,
157158
})
158159

159160
if err ~= nil then
160-
return nil, wrap_vshard_err(err, func_name, nil, bucket_id)
161+
return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id)
161162
end
162163

163164
if res == box.NULL then
@@ -167,25 +168,24 @@ function call.single(bucket_id, func_name, func_args, opts)
167168
return res
168169
end
169170

170-
function call.any(func_name, func_args, opts)
171-
dev_checks('string', '?table', {
171+
function call.any(vshard_router, func_name, func_args, opts)
172+
dev_checks('table', 'string', '?table', {
172173
timeout = '?number',
173174
})
174175

175176
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
176177

177-
local vshard_router = vshard.router.static
178178
local replicasets, err = vshard_router:routeall()
179179
if replicasets == nil then
180-
return nil, CallError:new("Failed to get all replicasets: %s", err.err)
180+
return nil, CallError:new("Failed to get router replicasets: %s", err.err)
181181
end
182182
local replicaset = select(2, next(replicasets))
183183

184184
local res, err = replicaset:call(func_name, func_args, {
185185
timeout = timeout,
186186
})
187187
if err ~= nil then
188-
return nil, wrap_vshard_err(err, func_name, replicaset.uuid)
188+
return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset.uuid)
189189
end
190190

191191
if res == box.NULL then

crud/common/map_call_cases/base_iter.lua

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
local errors = require('errors')
2-
local vshard = require('vshard')
32

43
local dev_checks = require('crud.common.dev_checks')
54
local GetReplicasetsError = errors.new_class('GetReplicasetsError')
@@ -24,14 +23,14 @@ function BaseIterator:new(opts)
2423
dev_checks('table', {
2524
func_args = '?table',
2625
replicasets = '?table',
26+
vshard_router = 'table',
2727
})
2828

2929
local replicasets, err
3030
if opts.replicasets ~= nil then
3131
replicasets = opts.replicasets
3232
else
33-
local vshard_router = vshard.router.static
34-
replicasets, err = vshard_router:routeall()
33+
replicasets, err = opts.vshard_router:routeall()
3534
if err ~= nil then
3635
return nil, GetReplicasetsError:new("Failed to get all replicasets: %s", err.err)
3736
end

crud/common/map_call_cases/base_postprocessor.lua

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ local BasePostprocessor = {}
77
-- @function new
88
--
99
-- @return[1] table postprocessor
10-
function BasePostprocessor:new()
10+
function BasePostprocessor:new(vshard_router)
1111
local postprocessor = {
1212
results = {},
1313
early_exit = false,
14-
errs = nil
14+
errs = nil,
15+
vshard_router = vshard_router,
1516
}
1617

1718
setmetatable(postprocessor, self)
@@ -58,7 +59,7 @@ function BasePostprocessor:collect(result_info, err_info)
5859

5960
if err ~= nil then
6061
self.results = nil
61-
self.errs = err_info.err_wrapper(err, unpack(err_info.wrapper_args))
62+
self.errs = err_info.err_wrapper(self.vshard_router, err, unpack(err_info.wrapper_args))
6263
self.early_exit = true
6364

6465
return self.early_exit

crud/common/map_call_cases/batch_insert_iter.lua

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@ function BatchInsertIterator:new(opts)
3232
tuples = 'table',
3333
space = 'table',
3434
execute_on_storage_opts = 'table',
35+
vshard_router = 'table',
3536
})
3637

37-
local sharding_data, err = sharding.split_tuples_by_replicaset(opts.tuples, opts.space)
38+
local sharding_data, err = sharding.split_tuples_by_replicaset(opts.vshard_router, opts.tuples, opts.space)
3839
if err ~= nil then
3940
return nil, SplitTuplesError:new("Failed to split tuples by replicaset: %s", err.err)
4041
end

crud/common/map_call_cases/batch_postprocessor.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ function BatchPostprocessor:collect(result_info, err_info)
5151
err_to_wrap = err.err
5252
end
5353

54-
local err_obj = err_info.err_wrapper(err_to_wrap, unpack(err_info.wrapper_args))
54+
local err_obj = err_info.err_wrapper(self.vshard_router, err_to_wrap, unpack(err_info.wrapper_args))
5555
err_obj.operation_data = err.operation_data
5656
err_obj.space_schema_hash = err.space_schema_hash
5757

crud/common/map_call_cases/batch_upsert_iter.lua

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,15 @@ function BatchUpsertIterator:new(opts)
3535
space = 'table',
3636
operations = 'table',
3737
execute_on_storage_opts = 'table',
38+
vshard_router = 'table',
3839
})
3940

40-
local sharding_data, err = sharding.split_tuples_by_replicaset(opts.tuples, opts.space, {
41-
operations = opts.operations,
42-
})
41+
local sharding_data, err = sharding.split_tuples_by_replicaset(
42+
opts.vshard_router,
43+
opts.tuples,
44+
opts.space,
45+
{operations = opts.operations})
46+
4347
if err ~= nil then
4448
return nil, SplitTuplesError:new("Failed to split tuples by replicaset: %s", err.err)
4549
end

crud/common/schema.lua

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
local fiber = require('fiber')
22
local msgpack = require('msgpack')
33
local digest = require('digest')
4-
local vshard = require('vshard')
54
local errors = require('errors')
65
local log = require('log')
76

@@ -86,18 +85,17 @@ end
8685
-- This wrapper is used for functions that can fail if router uses outdated
8786
-- space schema. In case of such errors these functions returns `need_reload`
8887
-- for schema-dependent errors.
89-
function schema.wrap_func_reload(func, ...)
88+
function schema.wrap_func_reload(vshard_router, func, ...)
9089
local i = 0
9190

9291
local res, err, need_reload
9392
while true do
94-
res, err, need_reload = func(...)
93+
res, err, need_reload = func(vshard_router, ...)
9594

9695
if err == nil or need_reload ~= const.NEED_SCHEMA_RELOAD then
9796
break
9897
end
9998

100-
local vshard_router = vshard.router.static
10199
local ok, reload_schema_err = reload_schema(vshard_router)
102100
if not ok then
103101
log.warn("Failed to reload schema: %s", reload_schema_err)

crud/common/sharding/init.lua

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
local vshard = require('vshard')
21
local errors = require('errors')
32

43
local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
@@ -13,8 +12,7 @@ local sharding_utils = require('crud.common.sharding.utils')
1312

1413
local sharding = {}
1514

16-
function sharding.get_replicasets_by_bucket_id(bucket_id)
17-
local vshard_router = vshard.router.static
15+
function sharding.get_replicasets_by_bucket_id(vshard_router, bucket_id)
1816
local replicaset, err = vshard_router:route(bucket_id)
1917
if replicaset == nil then
2018
return nil, GetReplicasetsError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)
@@ -25,14 +23,13 @@ function sharding.get_replicasets_by_bucket_id(bucket_id)
2523
}
2624
end
2725

28-
function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
29-
dev_checks('string', '?', '?number|cdata')
26+
function sharding.key_get_bucket_id(vshard_router, space_name, key, specified_bucket_id)
27+
dev_checks('table', 'string', '?', '?number|cdata')
3028

3129
if specified_bucket_id ~= nil then
3230
return { bucket_id = specified_bucket_id }
3331
end
3432

35-
local vshard_router = vshard.router.static
3633
local sharding_func_data, err = sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, space_name)
3734
if err ~= nil then
3835
return nil, err
@@ -48,13 +45,12 @@ function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
4845
return { bucket_id = vshard_router:bucket_id_strcrc32(key) }
4946
end
5047

51-
function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
48+
function sharding.tuple_get_bucket_id(vshard_router, tuple, space, specified_bucket_id)
5249
if specified_bucket_id ~= nil then
5350
return { bucket_id = specified_bucket_id }
5451
end
5552

5653
local sharding_index_parts = space.index[0].parts
57-
local vshard_router = vshard.router.static
5854
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space.name)
5955
if err ~= nil then
6056
return nil, err
@@ -64,7 +60,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
6460
end
6561
local key = utils.extract_key(tuple, sharding_index_parts)
6662

67-
local bucket_id_data, err = sharding.key_get_bucket_id(space.name, key, nil)
63+
local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space.name, key, nil)
6864
if err ~= nil then
6965
return nil, err
7066
end
@@ -76,7 +72,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
7672
}
7773
end
7874

79-
function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id)
75+
function sharding.tuple_set_and_return_bucket_id(vshard_router, tuple, space, specified_bucket_id)
8076
local bucket_id_fieldno, err = utils.get_bucket_id_fieldno(space)
8177
if err ~= nil then
8278
return nil, BucketIDError:new("Failed to get bucket ID fieldno: %s", err)
@@ -98,7 +94,7 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_
9894
local sharding_data = { bucket_id = tuple[bucket_id_fieldno] }
9995

10096
if sharding_data.bucket_id == nil then
101-
sharding_data, err = sharding.tuple_get_bucket_id(tuple, space)
97+
sharding_data, err = sharding.tuple_get_bucket_id(vshard_router, tuple, space)
10298
if err ~= nil then
10399
return nil, err
104100
end
@@ -144,18 +140,18 @@ function sharding.batching_result_needs_sharding_reload(errs, tuples_count)
144140
return sharding_errs_count == tuples_count
145141
end
146142

147-
function sharding.wrap_method(method, space_name, ...)
143+
function sharding.wrap_method(vshard_router, method, space_name, ...)
148144
local i = 0
149145

150146
local res, err, need_reload
151147
while true do
152-
res, err, need_reload = method(space_name, ...)
148+
res, err, need_reload = method(vshard_router, space_name, ...)
153149

154150
if err == nil or need_reload ~= const.NEED_SHARDING_RELOAD then
155151
break
156152
end
157153

158-
sharding_metadata_module.reload_sharding_cache(space_name)
154+
sharding_metadata_module.reload_sharding_cache(vshard_router, space_name)
159155

160156
i = i + 1
161157

@@ -169,12 +165,12 @@ end
169165

170166
-- This wrapper assumes reload is performed inside the method and
171167
-- expect ShardingHashMismatchError error to be thrown.
172-
function sharding.wrap_select_method(method, space_name, ...)
168+
function sharding.wrap_select_method(vshard_router, method, space_name, ...)
173169
local i = 0
174170

175171
local ok, res, err
176172
while true do
177-
ok, res, err = pcall(method, space_name, ...)
173+
ok, res, err = pcall(method, vshard_router, space_name, ...)
178174

179175
if ok == true then
180176
break
@@ -212,8 +208,8 @@ end
212208
-- @return[1] batches
213209
-- Map where key is a replicaset and value
214210
-- is table of tuples related to this replicaset
215-
function sharding.split_tuples_by_replicaset(tuples, space, opts)
216-
dev_checks('table', 'table', {
211+
function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts)
212+
dev_checks('table', 'table', 'table', {
217213
operations = '?table',
218214
})
219215

@@ -227,7 +223,7 @@ function sharding.split_tuples_by_replicaset(tuples, space, opts)
227223
local sharding_data
228224
local err
229225
for i, tuple in ipairs(tuples) do
230-
sharding_data, err = sharding.tuple_set_and_return_bucket_id(tuple, space)
226+
sharding_data, err = sharding.tuple_set_and_return_bucket_id(vshard_router, tuple, space)
231227
if err ~= nil then
232228
return nil, BucketIDError:new("Failed to get bucket ID: %s", err)
233229
end
@@ -244,7 +240,6 @@ function sharding.split_tuples_by_replicaset(tuples, space, opts)
244240
skip_sharding_hash_check = false
245241
end
246242

247-
local vshard_router = vshard.router.static
248243
local replicaset, err = vshard_router:route(sharding_data.bucket_id)
249244
if replicaset == nil then
250245
return nil, GetReplicasetsError:new(

0 commit comments

Comments
 (0)