Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
* Functions ``stop()`` for the roles ``crud-storage`` and ``crud-router``.
* Option flag `force_map_call` for `select()`/`pairs()`
to disable the `bucket_id` computation from primary key.
* `crud.min` and `crud.max` functions to find the minimum and maximum values in the specified index.

## [0.6.0] - 2021-03-29

Expand Down
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,32 @@ end

See more examples of pairs queries [here.](https://github.com/tarantool/crud/blob/master/doc/pairs.md)

### Min and max

```lua
-- Find the minimum value in the specified index
local result, err = crud.min(space_name, 'age', opts)
---
- metadata:
- {'name': 'id', 'type': 'unsigned'}
- {'name': 'bucket_id', 'type': 'unsigned'}
- {'name': 'name', 'type': 'string'}
- {'name': 'age', 'type': 'number'}
rows:
- [1, 477, 'Elizabeth', 12]

-- Find the maximum value in the specified index
local result, err = crud.min(space_name, 'age', opts)
---
- metadata:
- {'name': 'id', 'type': 'unsigned'}
- {'name': 'bucket_id', 'type': 'unsigned'}
- {'name': 'name', 'type': 'string'}
- {'name': 'age', 'type': 'number'}
rows:
- [5, 1172, 'Jack', 35]
```

### Cut extra rows

You could use `crud.cut_rows` function to cut off scan key and primary key values that were merged to the select/pairs partial result (select/pairs with `fields` option).
Expand Down
10 changes: 10 additions & 0 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ local upsert = require('crud.upsert')
local delete = require('crud.delete')
local select = require('crud.select')
local truncate = require('crud.truncate')
local borders = require('crud.borders')
local utils = require('crud.common.utils')

local crud = {}
Expand Down Expand Up @@ -69,6 +70,14 @@ crud.unflatten_rows = utils.unflatten_rows
-- @function truncate
crud.truncate = truncate.call

-- @refer borders.min
-- @function min
crud.min = borders.min

-- @refer borders.max
-- @function max
crud.max = borders.max

-- @refer utils.cut_rows
-- @function cut_rows
crud.cut_rows = utils.cut_rows
Expand Down Expand Up @@ -97,6 +106,7 @@ function crud.init_storage()
delete.init()
select.init()
truncate.init()
borders.init()
end

function crud.init_router()
Expand Down
180 changes: 180 additions & 0 deletions crud/borders.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
local checks = require('checks')
local errors = require('errors')
local vshard = require('vshard')

local dev_checks = require('crud.common.dev_checks')
local call = require('crud.common.call')
local utils = require('crud.common.utils')
local schema = require('crud.common.schema')
local Keydef = require('crud.compare.keydef')

local BorderError = errors.new_class('Border', {capture_stack = false})

local borders = {}

local STAT_FUNC_NAME = '_crud.get_border_on_storage'


local function get_border_on_storage(border_name, space_name, index_id, field_names)
dev_checks('string', 'string', 'number', '?table')

assert(border_name == 'min' or border_name == 'max')

local space = box.space[space_name]
if space == nil then
return nil, BorderError:new("Space %q doesn't exist", space_name)
end

local index = space.index[index_id]
if index == nil then
return nil, BorderError:new("Index %q of space doesn't exist", index_id, space_name)
end

local function get_index_border(index)
return index[border_name](index)
end

return schema.wrap_func_result(space, get_index_border, {index}, {
add_space_schema_hash = true,
field_names = field_names,
})
end

function borders.init()
_G._crud.get_border_on_storage = get_border_on_storage
end

local function is_closer(compare_sign, keydef, tuple, res_tuple)
if res_tuple == nil then
return true
end

local cmp = keydef:compare(tuple, res_tuple)

return cmp * compare_sign > 0
end

local function call_get_border_on_router(border_name, space_name, index_name, opts)
checks('string', 'string', '?string|number', {
timeout = '?number',
fields = '?table',
})

opts = opts or {}

local space = utils.get_space(space_name, vshard.router.routeall())
if space == nil then
return nil, BorderError:new("Space %q doesn't exist", space_name), true
end

local index
if index_name == nil then
index = space.index[0]
else
index = space.index[index_name]
end

if index == nil then
return nil, BorderError:new("Index %q of space %q doesn't exist", index_name, space_name), true
end

local primary_index = space.index[0]

local cmp_key_parts = utils.merge_primary_key_parts(index.parts, primary_index.parts)
local field_names = utils.enrich_field_names_with_cmp_key(opts.fields, cmp_key_parts, space:format())

local replicasets = vshard.router.routeall()
local call_opts = {
mode = 'read',
replicasets = replicasets,
timeout = opts.timeout,
}
local results, err = call.map(
STAT_FUNC_NAME,
{border_name, space_name, index.id, field_names},
call_opts
)

if err ~= nil then
return nil, BorderError:new("Failed to get %s: %s", border_name, err)
end

local keydef = Keydef.new(space, field_names, index.id)
local compare_sign = border_name == 'max' and 1 or -1

local res_tuple = nil
for _, storage_result in pairs(results) do
local storage_result = storage_result[1]
if storage_result.err ~= nil then
local need_reload = schema.result_needs_reload(space, storage_result)
return nil, BorderError:new("Failed to get %s: %s", border_name, storage_result.err), need_reload
end

local tuple = storage_result.res
if tuple ~= nil and is_closer(compare_sign, keydef, tuple, res_tuple) then
res_tuple = tuple
end
end

local result = utils.format_result({res_tuple}, space, field_names)

if opts.fields ~= nil then
result = utils.cut_rows(result.rows, result.metadata, opts.fields)
end

return result
end

local function get_border(border_name, space_name, index_name, opts)
return schema.wrap_func_reload(
call_get_border_on_router, border_name, space_name, index_name, opts
)
end

--- Find the minimum value in the specified index
--
-- @function min
--
-- @param string space_name
-- A space name
--
-- @param ?string index_name
-- An index name (by default, primary index is used)
--
-- @tparam ?number opts.timeout
-- Function call timeout
--
-- @tparam ?table opts.fields
-- Field names for getting only a subset of fields
--
-- @return[1] result
-- @treturn[2] nil
-- @treturn[2] table Error description
function borders.min(space_name, index_id, opts)
return get_border('min', space_name, index_id, opts)
end

--- Find the maximum value in the specified index
--
-- @function min
--
-- @param string space_name
-- A space name
--
-- @param ?string index_name
-- An index name (by default, primary index is used)
--
-- @tparam ?number opts.timeout
-- Function call timeout
--
-- @tparam ?table opts.fields
-- Field names for getting only a subset of fields
--
-- @return[1] result
-- @treturn[2] nil
-- @treturn[2] table Error description
function borders.max(space_name, index_id, opts)
return get_border('max', space_name, index_id, opts)
end

return borders
25 changes: 17 additions & 8 deletions crud/common/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -197,19 +197,14 @@ function schema.truncate_row_trailing_fields(tuple, field_names)
return tuple
end

-- schema.wrap_box_space_func_result pcalls some box.space function
-- and returns its result as a table
-- `{res = ..., err = ..., space_schema_hash = ...}`
-- space_schema_hash is computed if function failed and
-- `add_space_schema_hash` is true
function schema.wrap_box_space_func_result(space, func_name, args, opts)
dev_checks('table', 'string', 'table', 'table')
function schema.wrap_func_result(space, func, args, opts)
dev_checks('table', 'function', 'table', 'table')

local result = {}

opts = opts or {}

local ok, func_res = pcall(space[func_name], space, unpack(args))
local ok, func_res = pcall(func, unpack(args))
if not ok then
result.err = func_res
if opts.add_space_schema_hash then
Expand All @@ -222,6 +217,20 @@ function schema.wrap_box_space_func_result(space, func_name, args, opts)
return result
end

-- schema.wrap_box_space_func_result pcalls some box.space function
-- and returns its result as a table
-- `{res = ..., err = ..., space_schema_hash = ...}`
-- space_schema_hash is computed if function failed and
-- `add_space_schema_hash` is true
function schema.wrap_box_space_func_result(space, box_space_func_name, box_space_func_args, opts)
dev_checks('table', 'string', 'table', 'table')
local function func(space, box_space_func_name, box_space_func_args)
return space[box_space_func_name](space, unpack(box_space_func_args))
end

return schema.wrap_func_result(space, func, {space, box_space_func_name, box_space_func_args}, opts)
end

-- schema.result_needs_reload checks that schema reload can
-- be helpful to avoid storage error.
-- It checks if space_schema_hash returned by storage
Expand Down
24 changes: 24 additions & 0 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,30 @@ function utils.merge_primary_key_parts(key_parts, pk_parts)
return merged_parts
end

function utils.enrich_field_names_with_cmp_key(field_names, key_parts, space_format)
if field_names == nil then
return nil
end

local enriched_field_names = {}
local key_field_names = {}

for _, field_name in ipairs(field_names) do
table.insert(enriched_field_names, field_name)
key_field_names[field_name] = true
end

for _, part in ipairs(key_parts) do
local field_name = space_format[part.fieldno].name
if not key_field_names[field_name] then
table.insert(enriched_field_names, field_name)
key_field_names[field_name] = true
end
end

return enriched_field_names
end

local enabled_tarantool_features = {}

local function determine_enabled_features()
Expand Down
8 changes: 3 additions & 5 deletions crud/compare/keydef.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ end
local keydef_cache = {}
setmetatable(keydef_cache, {__mode = 'k'})

local function new(replicasets, space_name, field_names, index_name)
local function new(space, field_names, index_id)
-- Get requested and primary index metainfo.
local conn = select(2, next(replicasets)).master.conn
local space = conn.space[space_name]
local index = space.index[index_name]
local key = msgpack.encode({index_name, field_names})
local index = space.index[index_id]
local key = msgpack.encode({index_id, field_names})

if keydef_cache[key] ~= nil then
return keydef_cache[key]
Expand Down
2 changes: 1 addition & 1 deletion crud/select/compat/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ local function build_select_iterator(space_name, user_conditions, opts)
field_names = plan.field_names,
}

local merger = Merger.new(replicasets_to_select, space_name, plan.index_id,
local merger = Merger.new(replicasets_to_select, space, plan.index_id,
common.SELECT_FUNC_NAME,
{space_name, plan.index_id, plan.conditions, select_opts},
{tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts}
Expand Down
4 changes: 2 additions & 2 deletions crud/select/merger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ local reverse_tarantool_iters = {
[box.index.REQ] = true,
}

local function new(replicasets, space_name, index_id, func_name, func_args, opts)
local function new(replicasets, space, index_id, func_name, func_args, opts)
opts = opts or {}
local call_opts = opts.call_opts

Expand Down Expand Up @@ -164,7 +164,7 @@ local function new(replicasets, space_name, index_id, func_name, func_args, opts
table.insert(merger_sources, source)
end

local keydef = Keydef.new(replicasets, space_name, opts.field_names, index_id)
local keydef = Keydef.new(space, opts.field_names, index_id)
local merger = merger_lib.new(keydef, merger_sources, {
reverse = reverse_tarantool_iters[opts.tarantool_iter],
})
Expand Down
Loading