Skip to content

Commit 18657b4

Browse files
WIP: compare hashes
1 parent 086431c commit 18657b4

File tree

14 files changed

+292
-75
lines changed

14 files changed

+292
-75
lines changed

crud/common/sharding/init.lua

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ local errors = require('errors')
33

44
local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
55
local GetReplicasetsError = errors.new_class('GetReplicasetsError', {capture_stack = false})
6+
local ShardingHashMismatchError = errors.new_class("ShardingHashMismatchError", {capture_stack = false})
67

78
local utils = require('crud.common.utils')
89
local dev_checks = require('crud.common.dev_checks')
910
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
11+
local storage_metadata_cache = require('crud.common.sharding.storage_metadata_cache')
1012

1113
local sharding = {}
1214

@@ -25,37 +27,49 @@ function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
2527
dev_checks('string', '?', '?number|cdata')
2628

2729
if specified_bucket_id ~= nil then
28-
return specified_bucket_id
30+
return { bucket_id = specified_bucket_id }
2931
end
3032

31-
local sharding_func, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
33+
local sharding_func_data, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
3234
if err ~= nil then
3335
return nil, err
3436
end
3537

36-
if sharding_func ~= nil then
37-
return sharding_func(key)
38+
if sharding_func_data.value ~= nil then
39+
return {
40+
bucket_id = sharding_func_data.value(key),
41+
sharding_func_hash = sharding_func_data.hash,
42+
}
3843
end
3944

40-
return vshard.router.bucket_id_strcrc32(key)
45+
return { bucket_id = vshard.router.bucket_id_strcrc32(key) }
4146
end
4247

4348
function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
4449
if specified_bucket_id ~= nil then
45-
return specified_bucket_id
50+
return { bucket_id = specified_bucket_id }
4651
end
4752

4853
local sharding_index_parts = space.index[0].parts
49-
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space.name)
54+
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space.name)
5055
if err ~= nil then
5156
return nil, err
5257
end
53-
if sharding_key_as_index_obj ~= nil then
54-
sharding_index_parts = sharding_key_as_index_obj.parts
58+
if sharding_key_data.value ~= nil then
59+
sharding_index_parts = sharding_key_data.value.parts
5560
end
5661
local key = utils.extract_key(tuple, sharding_index_parts)
5762

58-
return sharding.key_get_bucket_id(space.name, key)
63+
local bucket_id_data, err = sharding.key_get_bucket_id(space.name, key, nil)
64+
if err ~= nil then
65+
return nil, err
66+
end
67+
68+
return {
69+
bucket_id = bucket_id_data.bucket_id,
70+
sharding_func_hash = bucket_id_data.sharding_func_hash,
71+
sharding_key_hash = sharding_key_data.hash
72+
}
5973
end
6074

6175
function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id)
@@ -77,16 +91,35 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_
7791
end
7892
end
7993

80-
local bucket_id = tuple[bucket_id_fieldno]
81-
if bucket_id == nil then
82-
bucket_id, err = sharding.tuple_get_bucket_id(tuple, space)
94+
local sharding_data = { bucket_id = tuple[bucket_id_fieldno] }
95+
96+
if sharding_data.bucket_id == nil then
97+
sharding_data, err = sharding.tuple_get_bucket_id(tuple, space)
8398
if err ~= nil then
8499
return nil, err
85100
end
86-
tuple[bucket_id_fieldno] = bucket_id
101+
tuple[bucket_id_fieldno] = sharding_data.bucket_id
102+
end
103+
104+
return sharding_data
105+
end
106+
107+
function sharding.check_sharding_hash(space_name, sharding_func_hash, sharding_key_hash, skip_sharding_hash_check)
108+
if skip_sharding_hash_check == true then
109+
return true
110+
end
111+
112+
local storage_func_hash = storage_metadata_cache.get_sharding_func_hash(space_name)
113+
local storage_key_hash = storage_metadata_cache.get_sharding_key_hash(space_name)
114+
115+
if storage_func_hash ~= sharding_func_hash or storage_key_hash ~= sharding_key_hash then
116+
local err_msg = ('crud: Sharding hash mismatch for space %s. ' ..
117+
'Sharding data will be reloaded after receiving this error. ' ..
118+
'Please retry your request.'):format(space_name)
119+
return nil, ShardingHashMismatchError:new(err_msg)
87120
end
88121

89-
return bucket_id
122+
return true
90123
end
91124

92125
return sharding

crud/common/sharding/sharding_metadata.lua

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,10 @@ end)
121121

122122
local function fetch_on_router(space_name, metadata_map_name, timeout)
123123
if cache[metadata_map_name] ~= nil then
124-
return cache[metadata_map_name][space_name]
124+
return {
125+
value = cache[metadata_map_name][space_name],
126+
hash = cache[cache.META_HASH_MAP_NAME][metadata_map_name][space_name]
127+
}
125128
end
126129

127130
local timeout = timeout or const.FETCH_SHARDING_METADATA_TIMEOUT
@@ -131,7 +134,10 @@ local function fetch_on_router(space_name, metadata_map_name, timeout)
131134
end
132135

133136
if cache[metadata_map_name] ~= nil then
134-
return cache[metadata_map_name][space_name]
137+
return {
138+
value = cache[metadata_map_name][space_name],
139+
hash = cache[cache.META_HASH_MAP_NAME][metadata_map_name][space_name],
140+
}
135141
end
136142

137143
return nil, FetchShardingMetadataError:new(

crud/count.lua

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ local function count_on_storage(space_name, index_id, conditions, opts)
2727
tarantool_iter = 'number',
2828
yield_every = '?number',
2929
scan_condition_num = '?number',
30+
sharding_func_hash = '?number',
31+
sharding_key_hash = '?number',
32+
skip_sharding_hash_check = '?boolean',
3033
})
3134

3235
opts = opts or {}
@@ -38,6 +41,14 @@ local function count_on_storage(space_name, index_id, conditions, opts)
3841
return nil, CountError:new("Index with ID %s doesn't exist", index_id)
3942
end
4043

44+
local _, err = sharding.check_sharding_hash(space_name,
45+
opts.sharding_func_hash,
46+
opts.sharding_key_hash,
47+
opts.skip_sharding_hash_check)
48+
if err ~= nil then
49+
return nil, err
50+
end
51+
4152
local value = opts.scan_value
4253

4354
local filter_func, err = filters.gen_func(space, conditions, {
@@ -114,14 +125,14 @@ local function call_count_on_router(space_name, user_conditions, opts)
114125
return nil, CountError:new("Space %q doesn't exist", space_name), true
115126
end
116127

117-
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
128+
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
118129
if err ~= nil then
119130
return nil, err
120131
end
121132

122133
-- plan count
123134
local plan, err = count_plan.new(space, conditions, {
124-
sharding_key_as_index_obj = sharding_key_as_index_obj,
135+
sharding_key_as_index_obj = sharding_key_data.value,
125136
})
126137
if err ~= nil then
127138
return nil, CountError:new("Failed to plan count: %s", err), true
@@ -159,21 +170,28 @@ local function call_count_on_router(space_name, user_conditions, opts)
159170
-- eye to resharding. However, AFAIU, the optimization
160171
-- does not make the result less consistent (sounds
161172
-- weird, huh?).
173+
local sharding_func_hash = nil
174+
local skip_sharding_hash_check = nil
175+
162176
local perform_map_reduce = opts.force_map_call == true or
163177
(opts.bucket_id == nil and plan.sharding_key == nil)
164178
if not perform_map_reduce then
165-
local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
179+
local bucket_id_data, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
166180
if err ~= nil then
167181
return nil, err
168182
end
169183

170-
assert(bucket_id ~= nil)
184+
assert(bucket_id_data.bucket_id ~= nil)
185+
186+
sharding_func_hash = bucket_id_data.sharding_func_hash
171187

172188
local err
173-
replicasets_to_count, err = sharding.get_replicasets_by_bucket_id(bucket_id)
189+
replicasets_to_count, err = sharding.get_replicasets_by_bucket_id(bucket_id_data.bucket_id)
174190
if err ~= nil then
175191
return nil, err, true
176192
end
193+
else
194+
skip_sharding_hash_check = true
177195
end
178196

179197
local yield_every = opts.yield_every or DEFAULT_YIELD_EVERY
@@ -191,6 +209,9 @@ local function call_count_on_router(space_name, user_conditions, opts)
191209
tarantool_iter = plan.tarantool_iter,
192210
yield_every = yield_every,
193211
scan_condition_num = plan.scan_condition_num,
212+
sharding_func_hash = sharding_func_hash,
213+
sharding_key_hash = sharding_key_data.hash,
214+
skip_sharding_hash_check = skip_sharding_hash_check,
194215
}
195216

196217
local results, err = call.map(COUNT_FUNC_NAME, {

crud/delete.lua

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,29 @@ local delete = {}
1616

1717
local DELETE_FUNC_NAME = '_crud.delete_on_storage'
1818

19-
local function delete_on_storage(space_name, key, field_names)
20-
dev_checks('string', '?', '?table')
19+
local function delete_on_storage(space_name, key, field_names, opts)
20+
dev_checks('string', '?', '?table', {
21+
sharding_key_hash = '?number',
22+
sharding_func_hash = '?number',
23+
skip_sharding_hash_check = '?boolean',
24+
})
25+
26+
opts = opts or {}
2127

2228
local space = box.space[space_name]
2329
if space == nil then
2430
return nil, DeleteError:new("Space %q doesn't exist", space_name)
2531
end
2632

33+
local _, err = sharding.check_sharding_hash(space_name,
34+
opts.sharding_func_hash,
35+
opts.sharding_key_hash,
36+
opts.skip_sharding_hash_check)
37+
38+
if err ~= nil then
39+
return nil, err
40+
end
41+
2742
-- add_space_schema_hash is false because
2843
-- reloading space format on router can't avoid delete error on storage
2944
return schema.wrap_box_space_func_result(space, 'delete', {key}, {
@@ -57,35 +72,49 @@ local function call_delete_on_router(space_name, key, opts)
5772
key = key:totable()
5873
end
5974

75+
local sharding_key_hash = nil
76+
local skip_sharding_hash_check = nil
77+
6078
local sharding_key = key
6179
if opts.bucket_id == nil then
6280
local primary_index_parts = space.index[0].parts
6381

64-
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
82+
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
6583
if err ~= nil then
6684
return nil, err
6785
end
6886

6987
sharding_key, err = sharding_key_module.extract_from_pk(space_name,
70-
sharding_key_as_index_obj,
88+
sharding_key_data.value,
7189
primary_index_parts, key)
7290
if err ~= nil then
7391
return nil, err
7492
end
93+
94+
sharding_key_hash = sharding_key_data.hash
95+
else
96+
skip_sharding_hash_check = true
7597
end
7698

77-
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
99+
local bucket_id_data, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
78100
if err ~= nil then
79101
return nil, err
80102
end
81103

104+
local delete_on_storage_opts = {
105+
sharding_func_hash = bucket_id_data.sharding_func_hash,
106+
sharding_key_hash = sharding_key_hash,
107+
skip_sharding_hash_check = skip_sharding_hash_check,
108+
}
109+
82110
local call_opts = {
83111
mode = 'write',
84112
timeout = opts.timeout,
85113
}
114+
86115
local storage_result, err = call.single(
87-
bucket_id, DELETE_FUNC_NAME,
88-
{space_name, key, opts.fields},
116+
bucket_id_data.bucket_id, DELETE_FUNC_NAME,
117+
{space_name, key, opts.fields, delete_on_storage_opts},
89118
call_opts
90119
)
91120

crud/get.lua

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,29 @@ local get = {}
1616

1717
local GET_FUNC_NAME = '_crud.get_on_storage'
1818

19-
local function get_on_storage(space_name, key, field_names)
20-
dev_checks('string', '?', '?table')
19+
local function get_on_storage(space_name, key, field_names, opts)
20+
dev_checks('string', '?', '?table', {
21+
sharding_key_hash = '?number',
22+
sharding_func_hash = '?number',
23+
skip_sharding_hash_check = '?boolean',
24+
})
25+
26+
opts = opts or {}
2127

2228
local space = box.space[space_name]
2329
if space == nil then
2430
return nil, GetError:new("Space %q doesn't exist", space_name)
2531
end
2632

33+
local _, err = sharding.check_sharding_hash(space_name,
34+
opts.sharding_func_hash,
35+
opts.sharding_key_hash,
36+
opts.skip_sharding_hash_check)
37+
38+
if err ~= nil then
39+
return nil, err
40+
end
41+
2742
-- add_space_schema_hash is false because
2843
-- reloading space format on router can't avoid get error on storage
2944
return schema.wrap_box_space_func_result(space, 'get', {key}, {
@@ -61,36 +76,50 @@ local function call_get_on_router(space_name, key, opts)
6176
end
6277

6378
local sharding_key = key
79+
local sharding_key_hash = nil
80+
local skip_sharding_hash_check = nil
81+
6482
if opts.bucket_id == nil then
6583
local primary_index_parts = space.index[0].parts
6684

67-
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
85+
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
6886
if err ~= nil then
6987
return nil, err
7088
end
7189

7290
sharding_key, err = sharding_key_module.extract_from_pk(space_name,
73-
sharding_key_as_index_obj,
91+
sharding_key_data.value,
7492
primary_index_parts, key)
7593
if err ~= nil then
7694
return nil, err
7795
end
96+
97+
sharding_key_hash = sharding_key_data.hash
98+
else
99+
skip_sharding_hash_check = true
78100
end
79101

80-
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
102+
local bucket_id_data, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
81103
if err ~= nil then
82104
return nil, err
83105
end
84106

107+
local get_on_storage_opts = {
108+
sharding_func_hash = bucket_id_data.sharding_func_hash,
109+
sharding_key_hash = sharding_key_hash,
110+
skip_sharding_hash_check = skip_sharding_hash_check,
111+
}
112+
85113
local call_opts = {
86114
mode = opts.mode or 'read',
87115
prefer_replica = opts.prefer_replica,
88116
balance = opts.balance,
89117
timeout = opts.timeout,
90118
}
119+
91120
local storage_result, err = call.single(
92-
bucket_id, GET_FUNC_NAME,
93-
{space_name, key, opts.fields},
121+
bucket_id_data.bucket_id, GET_FUNC_NAME,
122+
{space_name, key, opts.fields, get_on_storage_opts},
94123
call_opts
95124
)
96125

0 commit comments

Comments
 (0)