Skip to content

Commit 3ca66dd

Browse files
Extract sharding key from conditions
PR #181 introduced support of DDL sharding keys. But if sharding key hasn't got a separate index in schema, select with equal conditions for all required sharding key fields still led to map-reduce instead of a single storage call. This patch introduces impoved support of sharding keys extraction and fixes the issue. Closes #213
1 parent cc2e0e8 commit 3ca66dd

File tree

7 files changed

+185
-55
lines changed

7 files changed

+185
-55
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
2020

2121
* Use tuple-merger backed select implementation on tarantool 2.10+ (it gives
2222
less pressure on Lua GC).
23+
* DDL sharding key now can be extracted from select conditions even if
24+
there are no separate index.
2325

2426
## [0.9.0] - 20-10-21
2527

README.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,6 @@ Current limitations for using custom sharding key:
101101
updated on storages, see
102102
[#212](https://github.com/tarantool/crud/issues/212). However it is possible
103103
to do it manually with `require('crud.sharding_key').update_cache()`.
104-
- CRUD select may lead map reduce in some cases, see
105-
[#213](https://github.com/tarantool/crud/issues/213).
106104
- No support of JSON path for sharding key, see
107105
[#219](https://github.com/tarantool/crud/issues/219).
108106
- `primary_index_fieldno_map` is not cached, see

crud/select/plan.lua

Lines changed: 55 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -48,49 +48,75 @@ local function get_index_for_condition(space_indexes, space_format, condition)
4848
end
4949
end
5050

51-
local function extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
52-
if #scan_value < #sharding_index.parts then
51+
local function extract_sharding_key_from_conditions(conditions, sharding_index, space_indexes, fieldno_map)
52+
if sharding_index == nil then
5353
return nil
5454
end
5555

56-
if scan_index.id == sharding_index.id then
57-
return scan_value
58-
end
56+
-- If name is both valid index name and field name,
57+
-- it is interpreted as index name.
58+
local filled_fields = {}
59+
for _, condition in ipairs(conditions) do
60+
if condition.operator ~= compare_conditions.operators.EQ then
61+
goto continue
62+
end
5963

60-
local scan_value_fields_values = {}
61-
for i, scan_index_part in ipairs(scan_index.parts) do
62-
scan_value_fields_values[scan_index_part.fieldno] = scan_value[i]
63-
end
64+
local index = space_indexes[condition.operand]
65+
if index ~= nil then
66+
for i, part in ipairs(index.parts) do
67+
filled_fields[part.fieldno] = condition.values[i]
68+
end
6469

65-
-- check that sharding key is included in the scan index fields
66-
local sharding_key = {}
67-
for _, sharding_key_part in ipairs(sharding_index.parts) do
68-
local fieldno = sharding_key_part.fieldno
70+
goto continue
71+
end
6972

70-
-- sharding key isn't included in scan key
71-
if scan_value_fields_values[fieldno] == nil then
72-
return nil
73+
local fieldno = fieldno_map[condition.operand]
74+
if fieldno == nil then
75+
goto continue
7376
end
77+
filled_fields[fieldno] = condition.values[1]
7478

75-
local field_value = scan_value_fields_values[fieldno]
79+
::continue::
80+
end
7681

77-
-- sharding key contains nil values
78-
if field_value == nil then
82+
local sharding_key = {}
83+
for i, v in ipairs(sharding_index.parts) do
84+
if filled_fields[v.fieldno] == nil then
7985
return nil
8086
end
8187

82-
table.insert(sharding_key, field_value)
88+
sharding_key[i] = filled_fields[v.fieldno]
8389
end
8490

8591
return sharding_key
8692
end
8793

94+
local function get_sharding_key_from_scan_value(scan_value, scan_index, scan_iter, sharding_index)
95+
if scan_value == nil then
96+
return nil
97+
end
98+
99+
if scan_iter ~= box.index.EQ and scan_iter ~= box.index.REQ then
100+
return nil
101+
end
102+
103+
if #scan_value < #sharding_index.parts then
104+
return nil
105+
end
106+
107+
if scan_index.id == sharding_index.id then
108+
return scan_value
109+
end
110+
111+
return nil
112+
end
113+
88114
-- We need to construct after_tuple by field_names
89115
-- because if `fields` option is specified we have after_tuple with partial fields
90116
-- and these fields are ordered by field_names + primary key + scan key
91117
-- this order can be differ from order in space format
92118
-- so we need to cast after_tuple to space format for scrolling tuples on storage
93-
local function construct_after_tuple_by_fields(space_format, field_names, tuple)
119+
local function construct_after_tuple_by_fields(fieldno_map, field_names, tuple)
94120
if tuple == nil then
95121
return nil
96122
end
@@ -99,15 +125,10 @@ local function construct_after_tuple_by_fields(space_format, field_names, tuple)
99125
return tuple
100126
end
101127

102-
local positions = {}
103128
local transformed_tuple = {}
104129

105-
for i, field in ipairs(space_format) do
106-
positions[field.name] = i
107-
end
108-
109130
for i, field_name in ipairs(field_names) do
110-
local fieldno = positions[field_name]
131+
local fieldno = fieldno_map[field_name]
111132
if fieldno == nil then
112133
return nil, FilterFieldsError:new(
113134
'Space format doesn\'t contain field named %q', field_name
@@ -145,6 +166,8 @@ function select_plan.new(space, conditions, opts)
145166
local scan_value
146167
local scan_condition_num
147168

169+
local fieldno_map = utils.get_format_fieldno_map(space_format)
170+
148171
-- search index to iterate over
149172
for i, condition in ipairs(conditions) do
150173
scan_index = get_index_for_condition(space_indexes, space_format, condition)
@@ -176,9 +199,7 @@ function select_plan.new(space, conditions, opts)
176199

177200
-- handle opts.first
178201
local total_tuples_count
179-
local scan_after_tuple, err = construct_after_tuple_by_fields(
180-
space_format, field_names, opts.after_tuple
181-
)
202+
local scan_after_tuple, err = construct_after_tuple_by_fields(fieldno_map, field_names, opts.after_tuple)
182203
if err ~= nil then
183204
return nil, err
184205
end
@@ -230,9 +251,10 @@ function select_plan.new(space, conditions, opts)
230251
local sharding_index = opts.sharding_key_as_index_obj or primary_index
231252

232253
-- get sharding key value
233-
local sharding_key
234-
if scan_value ~= nil and (scan_iter == box.index.EQ or scan_iter == box.index.REQ) then
235-
sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
254+
local sharding_key = get_sharding_key_from_scan_value(scan_value, scan_index, scan_iter, sharding_index)
255+
256+
if sharding_key == nil then
257+
sharding_key = extract_sharding_key_from_conditions(conditions, sharding_index, space_indexes, fieldno_map)
236258
end
237259

238260
local plan = {

test/entrypoint/srv_ddl.lua

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ package.preload['customers-storage'] = function()
6161
{path = 'name', is_nullable = false, type = 'string'},
6262
},
6363
}
64+
local age_index = {
65+
name = 'age',
66+
type = 'TREE',
67+
unique = false,
68+
parts = {
69+
{path = 'age', is_nullable = false, type = 'number'},
70+
},
71+
}
6472
local secondary_index = {
6573
name = 'secondary',
6674
type = 'TREE',
@@ -71,6 +79,17 @@ package.preload['customers-storage'] = function()
7179
},
7280
}
7381

82+
local three_fields_index = {
83+
name = 'three_fields',
84+
type = 'TREE',
85+
unique = false,
86+
parts = {
87+
{path = 'age', is_nullable = false, type = 'number'},
88+
{path = 'name', is_nullable = false, type = 'string'},
89+
{path = 'id', is_nullable = false, type = 'unsigned'},
90+
},
91+
}
92+
7493
local customers_name_key_schema = table.deepcopy(customers_schema)
7594
customers_name_key_schema.sharding_key = {'name'}
7695
table.insert(customers_name_key_schema.indexes, primary_index)
@@ -100,13 +119,27 @@ package.preload['customers-storage'] = function()
100119
table.insert(customers_age_key_schema.indexes, primary_index)
101120
table.insert(customers_age_key_schema.indexes, bucket_id_index)
102121

122+
local customers_name_age_key_different_indexes_schema = table.deepcopy(customers_schema)
123+
customers_name_age_key_different_indexes_schema.sharding_key = {'name', 'age'}
124+
table.insert(customers_name_age_key_different_indexes_schema.indexes, primary_index)
125+
table.insert(customers_name_age_key_different_indexes_schema.indexes, bucket_id_index)
126+
table.insert(customers_name_age_key_different_indexes_schema.indexes, age_index)
127+
128+
local customers_name_age_key_three_fields_index_schema = table.deepcopy(customers_schema)
129+
customers_name_age_key_three_fields_index_schema.sharding_key = {'name', 'age'}
130+
table.insert(customers_name_age_key_three_fields_index_schema.indexes, primary_index_id)
131+
table.insert(customers_name_age_key_three_fields_index_schema.indexes, bucket_id_index)
132+
table.insert(customers_name_age_key_three_fields_index_schema.indexes, three_fields_index)
133+
103134
local schema = {
104135
spaces = {
105136
customers_name_key = customers_name_key_schema,
106137
customers_name_key_uniq_index = customers_name_key_uniq_index_schema,
107138
customers_name_key_non_uniq_index = customers_name_key_non_uniq_index_schema,
108139
customers_secondary_idx_name_key = customers_secondary_idx_name_key_schema,
109140
customers_age_key = customers_age_key_schema,
141+
customers_name_age_key_different_indexes = customers_name_age_key_different_indexes_schema,
142+
customers_name_age_key_three_fields_index = customers_name_age_key_three_fields_index_schema,
110143
}
111144
}
112145

test/helpers/storage_stat.lua

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,16 @@ function storage_stat.diff(a, b)
9595
return diff
9696
end
9797

98+
-- Accepts collect (or diff) return value and returns
99+
-- total number of select requests across all storages.
100+
function storage_stat.total(stats)
101+
local total = 0
102+
103+
for _, stat in pairs(stats) do
104+
total = total + (stat.select_requests or 0)
105+
end
106+
107+
return total
108+
end
109+
98110
return storage_stat

test/integration/ddl_sharding_key_test.lua

Lines changed: 82 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ pgroup.before_each(function(g)
5151
helpers.truncate_space_on_cluster(g.cluster, 'customers_name_key_non_uniq_index')
5252
helpers.truncate_space_on_cluster(g.cluster, 'customers_secondary_idx_name_key')
5353
helpers.truncate_space_on_cluster(g.cluster, 'customers_age_key')
54+
helpers.truncate_space_on_cluster(g.cluster, 'customers_name_age_key_different_indexes')
55+
helpers.truncate_space_on_cluster(g.cluster, 'customers_name_age_key_three_fields_index')
5456
end)
5557

5658
pgroup.test_insert_object = function(g)
@@ -279,13 +281,7 @@ pgroup.test_select = function(g)
279281
t.assert_equals(result.rows[1], tuple)
280282
end
281283

282-
-- TODO: After enabling support of sharding keys that are not equal to primary
283-
-- keys, we should handle it differently: it is not enough to look just on scan
284-
-- value, we should traverse all conditions. Now missed cases lead to
285-
-- map-reduce. Will be resolved in #213.
286-
pgroup.test_select_wont_lead_map_reduce = function(g)
287-
local space_name = 'customers_name_key_uniq_index'
288-
284+
local prepare_data_name_sharding_key = function(g, space_name)
289285
local conn_s1 = g.cluster:server('s1-master').net_box
290286
local conn_s2 = g.cluster:server('s2-master').net_box
291287

@@ -301,12 +297,85 @@ pgroup.test_select_wont_lead_map_reduce = function(g)
301297
-- bucket_id is 1161, storage is s-2
302298
local result = conn_s2.space[space_name]:insert({4, 1161, 'James Joyce', 59})
303299
t.assert_not_equals(result, nil)
300+
end
301+
302+
local prepare_data_name_age_sharding_key = function(g, space_name)
303+
local conn_s1 = g.cluster:server('s1-master').net_box
304+
local conn_s2 = g.cluster:server('s2-master').net_box
305+
306+
-- bucket_id is 2310, storage is s-1
307+
local result = conn_s1.space[space_name]:insert({1, 2310, 'Viktor Pelevin', 58})
308+
t.assert_not_equals(result, nil)
309+
-- bucket_id is 63, storage is s-2
310+
local result = conn_s2.space[space_name]:insert({2, 63, 'Isaac Asimov', 72})
311+
t.assert_not_equals(result, nil)
312+
-- bucket_id is 2901, storage is s-1
313+
local result = conn_s1.space[space_name]:insert({3, 2901, 'Aleksandr Solzhenitsyn', 89})
314+
t.assert_not_equals(result, nil)
315+
-- bucket_id is 1365, storage is s-2
316+
local result = conn_s2.space[space_name]:insert({4, 1365, 'James Joyce', 59})
317+
t.assert_not_equals(result, nil)
318+
end
319+
320+
local cases = {
321+
select_for_indexed_sharding_key = {
322+
space_name = 'customers_name_key_uniq_index',
323+
prepare_data = prepare_data_name_sharding_key,
324+
conditions = {{'==', 'name', 'Viktor Pelevin'}},
325+
},
326+
select_for_sharding_key_as_index_part = {
327+
space_name = 'customers_name_key',
328+
prepare_data = prepare_data_name_sharding_key,
329+
conditions = {{'==', 'name', 'Viktor Pelevin'}},
330+
},
331+
select_for_sharding_key_as_several_indexes_parts = {
332+
space_name = 'customers_name_age_key_different_indexes',
333+
prepare_data = prepare_data_name_age_sharding_key,
334+
conditions = {{'==', 'name', 'Viktor Pelevin'}, {'==', 'age', 58}},
335+
},
336+
select_by_index_cond_for_sharding_key_as_several_indexes_parts = {
337+
space_name = 'customers_name_age_key_different_indexes',
338+
prepare_data = prepare_data_name_age_sharding_key,
339+
conditions = {{'==', 'id', {1, 'Viktor Pelevin'}}, {'==', 'age', 58}},
340+
},
341+
select_by_partial_index_cond_for_sharding_key_included = {
342+
space_name = 'customers_name_age_key_three_fields_index',
343+
prepare_data = prepare_data_name_age_sharding_key,
344+
conditions = {{'==', 'three_fields', {58, 'Viktor Pelevin', nil}}},
345+
},
346+
}
347+
348+
for name, case in pairs(cases) do
349+
pgroup[('test_%s_wont_lead_to_map_reduce'):format(name)] = function(g)
350+
case.prepare_data(g, case.space_name)
351+
352+
local stat_a = storage_stat.collect(g.cluster)
353+
354+
local result, err = g.cluster.main_server.net_box:call('crud.select', {
355+
case.space_name, case.conditions
356+
})
357+
t.assert_equals(err, nil)
358+
t.assert_not_equals(result, nil)
359+
t.assert_equals(#result.rows, 1)
360+
361+
local stat_b = storage_stat.collect(g.cluster)
362+
363+
-- Check a number of select() requests made by CRUD on cluster's storages
364+
-- after calling select() on a router. Make sure only a single storage has
365+
-- a single select() request. Otherwise we lead to map-reduce.
366+
local stats = storage_stat.diff(stat_b, stat_a)
367+
t.assert_equals(storage_stat.total(stats), 1, 'Select request was not a map reduce')
368+
end
369+
end
370+
371+
pgroup.test_select_for_part_of_sharding_key_will_lead_to_map_reduce = function(g)
372+
local space_name = 'customers_name_age_key_different_indexes'
373+
prepare_data_name_age_sharding_key(g, space_name)
304374

305375
local stat_a = storage_stat.collect(g.cluster)
306376

307-
-- Select a tuple with name 'Viktor Pelevin'.
308377
local result, err = g.cluster.main_server.net_box:call('crud.select', {
309-
space_name, {{'==', 'name', 'Viktor Pelevin'}}
378+
space_name, {{'==', 'age', 58}},
310379
})
311380
t.assert_equals(err, nil)
312381
t.assert_not_equals(result, nil)
@@ -315,16 +384,10 @@ pgroup.test_select_wont_lead_map_reduce = function(g)
315384
local stat_b = storage_stat.collect(g.cluster)
316385

317386
-- Check a number of select() requests made by CRUD on cluster's storages
318-
-- after calling select() on a router. Make sure only a single storage has
319-
-- a single select() request. Otherwise we lead map-reduce.
320-
t.assert_equals(storage_stat.diff(stat_b, stat_a), {
321-
['s-1'] = {
322-
select_requests = 0,
323-
},
324-
['s-2'] = {
325-
select_requests = 1,
326-
},
327-
})
387+
-- after calling select() on a router. Make sure it was a map-reduce
388+
-- since we do not have sharding key values in conditions.
389+
local stats = storage_stat.diff(stat_b, stat_a)
390+
t.assert_equals(storage_stat.total(stats), 2, 'Select request was a map reduce')
328391
end
329392

330393
pgroup.test_select_secondary_idx = function(g)

test/unit/select_plan_test.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ g.test_is_scan_by_full_sharding_key_eq = function()
185185
t.assert_equals(err, nil)
186186

187187
t.assert_equals(plan.total_tuples_count, nil)
188-
t.assert_equals(plan.sharding_key, nil)
188+
t.assert_equals(plan.sharding_key, {15})
189189

190190
-- gt
191191
local plan, err = select_plan.new(box.space.customers, {

0 commit comments

Comments
 (0)