Skip to content

Commit bad0e60

Browse files
ddl: reload and retry on sharding info mismatch
If sharding info mismatch has happened, sharding info will be reloaded on router. After that, request will be retried with new sharding info (expect for pairs requests due to its nature, they must be retried manually). There are no detectable performance drops introduced in this patch. Closes #212
1 parent e9818e9 commit bad0e60

21 files changed

+455
-133
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
2323
* Fix processing storage error for tuple-merger implementation of
2424
select/pairs (#271).
2525
* Do not change input tuple object in requests.
26+
* Add automatic reload of DDL schema (#212).
2627

2728
## [0.10.0] - 01-12-21
2829

crud/borders.lua

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ local checks = require('checks')
22
local errors = require('errors')
33
local vshard = require('vshard')
44

5+
local const = require('crud.common.const')
56
local dev_checks = require('crud.common.dev_checks')
67
local call = require('crud.common.call')
78
local utils = require('crud.common.utils')
@@ -76,7 +77,7 @@ local function call_get_border_on_router(border_name, space_name, index_name, op
7677

7778
local space = utils.get_space(space_name, vshard.router.routeall())
7879
if space == nil then
79-
return nil, BorderError:new("Space %q doesn't exist", space_name), true
80+
return nil, BorderError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
8081
end
8182

8283
local index
@@ -87,7 +88,9 @@ local function call_get_border_on_router(border_name, space_name, index_name, op
8788
end
8889

8990
if index == nil then
90-
return nil, BorderError:new("Index %q of space %q doesn't exist", index_name, space_name), true
91+
return nil,
92+
BorderError:new("Index %q of space %q doesn't exist", index_name, space_name),
93+
const.NEED_SCHEMA_RELOAD
9194
end
9295

9396
local primary_index = space.index[0]
@@ -131,8 +134,14 @@ local function call_get_border_on_router(border_name, space_name, index_name, op
131134
for _, storage_result in pairs(results) do
132135
local storage_result = storage_result[1]
133136
if storage_result.err ~= nil then
137+
local err_wrapped = BorderError:new("Failed to get %s: %s", border_name, storage_result.err)
138+
134139
local need_reload = schema.result_needs_reload(space, storage_result)
135-
return nil, BorderError:new("Failed to get %s: %s", border_name, storage_result.err), need_reload
140+
if need_reload then
141+
return nil, err_wrapped, const.NEED_SCHEMA_RELOAD
142+
end
143+
144+
return nil, err_wrapped
136145
end
137146

138147
local tuple = storage_result.res

crud/common/call.lua

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ local errors = require('errors')
33

44
local dev_checks = require('crud.common.dev_checks')
55
local utils = require('crud.common.utils')
6+
local sharding_utils = require('crud.common.sharding.utils')
67
local fiber_clock = require('fiber').clock
78

89
local CallError = errors.new_class('CallError')
@@ -40,6 +41,11 @@ function call.get_vshard_call_name(mode, prefer_replica, balance)
4041
end
4142

4243
local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id)
44+
-- Do not rewrite ShardingHashMismatchError class.
45+
if err.class_name == sharding_utils.ShardingHashMismatchError.name then
46+
return errors.wrap(err)
47+
end
48+
4349
if err.type == 'ClientError' and type(err.message) == 'string' then
4450
if err.message == string.format("Procedure '%s' is not defined", func_name) then
4551
if func_name:startswith('_crud.') then

crud/common/const.lua

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,9 @@ local const = {}
33
const.RELOAD_RETRIES_NUM = 1
44
const.RELOAD_SCHEMA_TIMEOUT = 3 -- 3 seconds
55
const.FETCH_SHARDING_METADATA_TIMEOUT = 3 -- 3 seconds
6+
const.SHARDING_RELOAD_RETRIES_NUM = 1
67

7-
return const
8+
const.NEED_SCHEMA_RELOAD = 0x0001000
9+
const.NEED_SHARDING_RELOAD = 0x0001001
10+
11+
return const

crud/common/schema.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ function schema.wrap_func_reload(func, ...)
8787
while true do
8888
res, err, need_reload = func(...)
8989

90-
if err == nil or not need_reload then
90+
if err == nil or need_reload ~= const.NEED_SCHEMA_RELOAD then
9191
break
9292
end
9393

crud/common/sharding/init.lua

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ local errors = require('errors')
33

44
local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
55
local GetReplicasetsError = errors.new_class('GetReplicasetsError', {capture_stack = false})
6-
local ShardingHashMismatchError = errors.new_class("ShardingHashMismatchError", {capture_stack = false})
76

7+
local const = require('crud.common.const')
88
local utils = require('crud.common.utils')
99
local dev_checks = require('crud.common.dev_checks')
1010
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
1111
local storage_metadata_cache = require('crud.common.sharding.storage_metadata_cache')
12+
local sharding_utils = require('crud.common.sharding.utils')
1213

1314
local sharding = {}
1415

@@ -114,12 +115,72 @@ function sharding.check_sharding_hash(space_name, sharding_func_hash, sharding_k
114115

115116
if storage_func_hash ~= sharding_func_hash or storage_key_hash ~= sharding_key_hash then
116117
local err_msg = ('crud: Sharding hash mismatch for space %s. ' ..
117-
'Please refresh sharding data and retry your request.'
118+
'Sharding info will be refreshed after receiving this error. ' ..
119+
'Please retry your request.'
118120
):format(space_name)
119-
return nil, ShardingHashMismatchError:new(err_msg)
121+
return nil, sharding_utils.ShardingHashMismatchError:new(err_msg)
120122
end
121123

122124
return true
123125
end
124126

127+
function sharding.result_needs_sharding_reload(err)
128+
return err.class_name == sharding_utils.ShardingHashMismatchError.name
129+
end
130+
131+
function sharding.wrap_method(method, space_name, ...)
132+
local i = 0
133+
134+
local res, err, need_reload
135+
while true do
136+
res, err, need_reload = method(space_name, ...)
137+
138+
if err == nil or need_reload ~= const.NEED_SHARDING_RELOAD then
139+
break
140+
end
141+
142+
sharding_metadata_module.reload_sharding_cache(space_name)
143+
144+
i = i + 1
145+
146+
if i > const.SHARDING_RELOAD_RETRIES_NUM then
147+
break
148+
end
149+
end
150+
151+
return res, err, need_reload
152+
end
153+
154+
-- This wrapper assumes reload is performed inside the method and
155+
-- expect ShardingHashMismatchError error to be thrown.
156+
function sharding.wrap_select_method(method, space_name, ...)
157+
local i = 0
158+
159+
local ok, res, err
160+
while true do
161+
ok, res, err = pcall(method, space_name, ...)
162+
163+
if ok == true then
164+
break
165+
end
166+
167+
-- Error thrown from merger casted to string,
168+
-- so the only way to identify it is string.find.
169+
local str_err = tostring(res)
170+
if (str_err:find(sharding_utils.ShardingHashMismatchError.name) == nil) then
171+
error(res)
172+
end
173+
174+
-- Reload is performed inside the merger.
175+
176+
i = i + 1
177+
178+
if i > const.SHARDING_RELOAD_RETRIES_NUM then
179+
error(res)
180+
end
181+
end
182+
183+
return res, err
184+
end
185+
125186
return sharding

crud/common/sharding/sharding_metadata.lua

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
local fiber = require('fiber')
22
local errors = require('errors')
3+
local log = require('log')
34

45
local call = require('crud.common.call')
56
local const = require('crud.common.const')
@@ -85,10 +86,11 @@ end
8586
-- a sharding metadata by a single one, other fibers will wait while
8687
-- cache.fetch_lock become unlocked during timeout passed to
8788
-- _fetch_on_router().
89+
-- metadata_map_name == nil means forced reload.
8890
local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name)
89-
dev_checks('number', 'string', 'string')
91+
dev_checks('number', 'string', '?string')
9092

91-
if cache[metadata_map_name] ~= nil then
93+
if (metadata_map_name ~= nil) and (cache[metadata_map_name]) ~= nil then
9294
return
9395
end
9496

@@ -186,6 +188,15 @@ function sharding_metadata_module.update_sharding_func_cache(space_name)
186188
return sharding_metadata_module.fetch_sharding_func_on_router(space_name)
187189
end
188190

191+
function sharding_metadata_module.reload_sharding_cache(space_name)
192+
cache.drop_caches()
193+
194+
local err = _fetch_on_router(const.FETCH_SHARDING_METADATA_TIMEOUT, space_name, nil)
195+
if err ~= nil then
196+
log.warn('Failed to reload sharding cache: %s', err)
197+
end
198+
end
199+
189200
function sharding_metadata_module.init()
190201
_G._crud.fetch_on_storage = sharding_metadata_module.fetch_on_storage
191202
end

crud/common/sharding/utils.lua

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
local digest = require('digest')
2+
local errors = require('errors')
23
local msgpack = require('msgpack')
34

45
local utils = {}
@@ -8,6 +9,8 @@ utils.SPACE_SHARDING_KEY_FIELDNO = 2
89
utils.SPACE_SHARDING_FUNC_NAME_FIELDNO = 2
910
utils.SPACE_SHARDING_FUNC_BODY_FIELDNO = 3
1011

12+
utils.ShardingHashMismatchError = errors.new_class("ShardingHashMismatchError", {capture_stack = false})
13+
1114
function utils.extract_sharding_func_def(tuple)
1215
if not tuple then
1316
return nil

crud/common/utils.lua

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ local vshard = require('vshard')
44
local fun = require('fun')
55
local bit = require('bit')
66

7+
local const = require('crud.common.const')
78
local schema = require('crud.common.schema')
89
local dev_checks = require('crud.common.dev_checks')
910

@@ -631,12 +632,12 @@ end
631632
local function flatten_obj(space_name, obj)
632633
local space_format, err = utils.get_space_format(space_name, vshard.router.routeall())
633634
if err ~= nil then
634-
return nil, FlattenError:new("Failed to get space format: %s", err), true
635+
return nil, FlattenError:new("Failed to get space format: %s", err), const.NEED_SCHEMA_RELOAD
635636
end
636637

637638
local tuple, err = utils.flatten(obj, space_format)
638639
if err ~= nil then
639-
return nil, FlattenError:new("Object is specified in bad format: %s", err), true
640+
return nil, FlattenError:new("Object is specified in bad format: %s", err), const.NEED_SCHEMA_RELOAD
640641
end
641642

642643
return tuple

crud/count.lua

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ local vshard = require('vshard')
44
local fiber = require('fiber')
55

66
local call = require('crud.common.call')
7+
local const = require('crud.common.const')
78
local utils = require('crud.common.utils')
89
local sharding = require('crud.common.sharding')
910
local filters = require('crud.compare.filters')
@@ -122,7 +123,7 @@ local function call_count_on_router(space_name, user_conditions, opts)
122123

123124
local space = utils.get_space(space_name, replicasets)
124125
if space == nil then
125-
return nil, CountError:new("Space %q doesn't exist", space_name), true
126+
return nil, CountError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
126127
end
127128

128129
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
@@ -135,7 +136,7 @@ local function call_count_on_router(space_name, user_conditions, opts)
135136
sharding_key_as_index_obj = sharding_key_data.value,
136137
})
137138
if err ~= nil then
138-
return nil, CountError:new("Failed to plan count: %s", err), true
139+
return nil, CountError:new("Failed to plan count: %s", err), const.NEED_SCHEMA_RELOAD
139140
end
140141

141142
-- set replicasets to count from
@@ -188,7 +189,7 @@ local function call_count_on_router(space_name, user_conditions, opts)
188189
local err
189190
replicasets_to_count, err = sharding.get_replicasets_by_bucket_id(bucket_id_data.bucket_id)
190191
if err ~= nil then
191-
return nil, err, true
192+
return nil, err, const.NEED_SCHEMA_RELOAD
192193
end
193194
else
194195
skip_sharding_hash_check = true
@@ -219,7 +220,13 @@ local function call_count_on_router(space_name, user_conditions, opts)
219220
}, call_opts)
220221

221222
if err ~= nil then
222-
return nil, CountError:new("Failed to call count on storage-side: %s", err)
223+
local err_wrapped = CountError:new("Failed to call count on storage-side: %s", err)
224+
225+
if sharding.result_needs_sharding_reload(err) then
226+
return nil, err_wrapped, const.NEED_SHARDING_RELOAD
227+
end
228+
229+
return nil, err_wrapped
223230
end
224231

225232
if results.err ~= nil then
@@ -289,7 +296,8 @@ function count.call(space_name, user_conditions, opts)
289296
mode = '?string',
290297
})
291298

292-
return schema.wrap_func_reload(call_count_on_router, space_name, user_conditions, opts)
299+
return schema.wrap_func_reload(sharding.wrap_method,
300+
call_count_on_router, space_name, user_conditions, opts)
293301
end
294302

295303
return count

crud/delete.lua

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ local errors = require('errors')
33
local vshard = require('vshard')
44

55
local call = require('crud.common.call')
6+
local const = require('crud.common.const')
67
local utils = require('crud.common.utils')
78
local sharding = require('crud.common.sharding')
89
local sharding_key_module = require('crud.common.sharding.sharding_key')
@@ -65,7 +66,7 @@ local function call_delete_on_router(space_name, key, opts)
6566

6667
local space = utils.get_space(space_name, vshard.router.routeall())
6768
if space == nil then
68-
return nil, DeleteError:new("Space %q doesn't exist", space_name), true
69+
return nil, DeleteError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
6970
end
7071

7172
if box.tuple.is(key) then
@@ -119,7 +120,13 @@ local function call_delete_on_router(space_name, key, opts)
119120
)
120121

121122
if err ~= nil then
122-
return nil, DeleteError:new("Failed to call delete on storage-side: %s", err)
123+
local err_wrapped = DeleteError:new("Failed to call delete on storage-side: %s")
124+
125+
if sharding.result_needs_sharding_reload(err) then
126+
return nil, err_wrapped, const.NEED_SHARDING_RELOAD
127+
end
128+
129+
return nil, err_wrapped
123130
end
124131

125132
if storage_result.err ~= nil then
@@ -159,7 +166,8 @@ function delete.call(space_name, key, opts)
159166
fields = '?table',
160167
})
161168

162-
return schema.wrap_func_reload(call_delete_on_router, space_name, key, opts)
169+
return schema.wrap_func_reload(sharding.wrap_method,
170+
call_delete_on_router, space_name, key, opts)
163171
end
164172

165173
return delete

0 commit comments

Comments
 (0)