Skip to content

Commit cb041bb

Browse files
utils: move storage info to a separate file
This is the preliminary refactoring before solving the original issue. Part of #412 Part of #415
1 parent 5ebc3c5 commit cb041bb

File tree

3 files changed

+132
-107
lines changed

3 files changed

+132
-107
lines changed

crud.lua

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ local utils = require('crud.common.utils')
2222
local stats = require('crud.stats')
2323
local readview = require('crud.readview')
2424
local schema = require('crud.schema')
25+
local storage_info = require('crud.storage_info')
2526

2627
local crud = {}
2728

@@ -145,9 +146,9 @@ crud.stats = stats.get
145146
-- @function reset_stats
146147
crud.reset_stats = stats.reset
147148

148-
-- @refer utils.storage_info
149+
-- @refer storage_info.call
149150
-- @function storage_info
150-
crud.storage_info = utils.storage_info
151+
crud.storage_info = storage_info.call
151152

152153
-- @refer readview.new
153154
-- @function readview
@@ -174,8 +175,8 @@ function crud.init_storage()
174175
user = utils.get_this_replica_user() or 'guest'
175176
end
176177

177-
if rawget(_G, '_crud') == nil then
178-
rawset(_G, '_crud', {})
178+
if rawget(_G, utils.STORAGE_NAMESPACE) == nil then
179+
rawset(_G, utils.STORAGE_NAMESPACE, {})
179180
end
180181

181182
insert.init(user)
@@ -195,9 +196,9 @@ function crud.init_storage()
195196
sharding_metadata.init(user)
196197
readview.init(user)
197198

198-
utils.init_storage_call(user, 'storage_info_on_storage',
199-
utils.storage_info_on_storage
200-
)
199+
-- Must be initialized last: properly working storage info is the flag
200+
-- of initialization success.
201+
storage_info.init(user)
201202
end
202203

203204
function crud.init_router()
@@ -209,7 +210,7 @@ function crud.stop_router()
209210
end
210211

211212
function crud.stop_storage()
212-
rawset(_G, '_crud', nil)
213+
rawset(_G, utils.STORAGE_NAMESPACE, nil)
213214
end
214215

215216
return crud

crud/common/utils.lua

Lines changed: 4 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ local GetSpaceError = errors.new_class('GetSpaceError')
2020
local GetSpaceFormatError = errors.new_class('GetSpaceFormatError', {capture_stack = false})
2121
local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false})
2222
local NotInitializedError = errors.new_class('NotInitialized')
23-
local StorageInfoError = errors.new_class('StorageInfoError')
2423
local VshardRouterError = errors.new_class('VshardRouterError', {capture_stack = false})
2524
local UtilsInternalError = errors.new_class('UtilsInternalError', {capture_stack = false})
2625
local fiber = require('fiber')
2726

2827
local utils = {}
2928

29+
utils.STORAGE_NAMESPACE = '_crud'
30+
3031
--- Returns a full call string for a storage function name.
3132
--
3233
-- @param string name a base name of the storage function.
@@ -35,11 +36,9 @@ local utils = {}
3536
function utils.get_storage_call(name)
3637
dev_checks('string')
3738

38-
return '_crud.' .. name
39+
return ('%s.%s'):format(utils.STORAGE_NAMESPACE, name)
3940
end
4041

41-
local CRUD_STORAGE_INFO_FUNC_NAME = utils.get_storage_call('storage_info_on_storage')
42-
4342
local space_format_cache = setmetatable({}, {__mode = 'k'})
4443

4544
-- copy from LuaJIT lj_char.c
@@ -1131,100 +1130,6 @@ function utils.list_slice(list, start_index, end_index)
11311130
return slice
11321131
end
11331132

1134-
--- Polls replicas for storage state
1135-
--
1136-
-- @function storage_info
1137-
--
1138-
-- @tparam ?number opts.timeout
1139-
-- Function call timeout
1140-
--
1141-
-- @tparam ?string|table opts.vshard_router
1142-
-- Cartridge vshard group name or vshard router instance.
1143-
--
1144-
-- @return a table of storage states by replica id.
1145-
function utils.storage_info(opts)
1146-
opts = opts or {}
1147-
1148-
local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
1149-
if err ~= nil then
1150-
return nil, StorageInfoError:new(err)
1151-
end
1152-
1153-
local replicasets, err = vshard_router:routeall()
1154-
if replicasets == nil then
1155-
return nil, StorageInfoError:new("Failed to get router replicasets: %s", err.err)
1156-
end
1157-
1158-
local futures_by_replicas = {}
1159-
local replica_state_by_id = {}
1160-
local async_opts = {is_async = true}
1161-
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
1162-
1163-
for _, replicaset in pairs(replicasets) do
1164-
for replica_id, replica in pairs(replicaset.replicas) do
1165-
local master = utils.get_replicaset_master(replicaset, {cached = false})
1166-
1167-
replica_state_by_id[replica_id] = {
1168-
status = "error",
1169-
is_master = master == replica
1170-
}
1171-
1172-
local ok, res = pcall(replica.conn.call, replica.conn, CRUD_STORAGE_INFO_FUNC_NAME,
1173-
{}, async_opts)
1174-
if ok then
1175-
futures_by_replicas[replica_id] = res
1176-
else
1177-
local err_msg = string.format("Error getting storage info for %s", replica_id)
1178-
if res ~= nil then
1179-
log.error("%s: %s", err_msg, res)
1180-
replica_state_by_id[replica_id].message = tostring(res)
1181-
else
1182-
log.error(err_msg)
1183-
replica_state_by_id[replica_id].message = err_msg
1184-
end
1185-
end
1186-
end
1187-
end
1188-
1189-
local deadline = fiber.clock() + timeout
1190-
for replica_id, future in pairs(futures_by_replicas) do
1191-
local wait_timeout = deadline - fiber.clock()
1192-
if wait_timeout < 0 then
1193-
wait_timeout = 0
1194-
end
1195-
1196-
local result, err = future:wait_result(wait_timeout)
1197-
if result == nil then
1198-
future:discard()
1199-
local err_msg = string.format("Error getting storage info for %s", replica_id)
1200-
if err ~= nil then
1201-
if err.type == 'ClientError' and err.code == box.error.NO_SUCH_PROC then
1202-
replica_state_by_id[replica_id].status = "uninitialized"
1203-
else
1204-
log.error("%s: %s", err_msg, err)
1205-
replica_state_by_id[replica_id].message = tostring(err)
1206-
end
1207-
else
1208-
log.error(err_msg)
1209-
replica_state_by_id[replica_id].message = err_msg
1210-
end
1211-
else
1212-
replica_state_by_id[replica_id].status = result[1].status or "uninitialized"
1213-
end
1214-
end
1215-
1216-
return replica_state_by_id
1217-
end
1218-
1219-
--- Storage status information.
1220-
--
1221-
-- @function storage_info_on_storage
1222-
--
1223-
-- @return a table with storage status.
1224-
function utils.storage_info_on_storage()
1225-
return {status = "running"}
1226-
end
1227-
12281133
--- Initializes a storage function by its name.
12291134
--
12301135
-- It adds the function into the global scope by its name and required
@@ -1240,7 +1145,7 @@ end
12401145
function utils.init_storage_call(user, name, func)
12411146
dev_checks('?string', 'string', 'function')
12421147

1243-
rawset(_G['_crud'], name, func)
1148+
rawset(_G[utils.STORAGE_NAMESPACE], name, func)
12441149

12451150
if user ~= nil then
12461151
name = utils.get_storage_call(name)

crud/storage_info.lua

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
local checks = require('checks')
2+
local errors = require('errors')
3+
local fiber = require('fiber')
4+
local log = require('log')
5+
6+
local const = require('crud.common.const')
7+
local utils = require('crud.common.utils')
8+
9+
local StorageInfoError = errors.new_class('StorageInfoError')
10+
11+
local storage_info = {}
12+
13+
local STORAGE_INFO_FUNC_NAME = 'storage_info_on_storage'
14+
local CRUD_STORAGE_INFO_FUNC_NAME = utils.get_storage_call(STORAGE_INFO_FUNC_NAME)
15+
16+
--- Storage status information.
17+
--
18+
-- @function storage_info_on_storage
19+
--
20+
-- @return a table with storage status.
21+
local function storage_info_on_storage()
22+
return {status = "running"}
23+
end
24+
25+
function storage_info.init(user)
26+
utils.init_storage_call(user, STORAGE_INFO_FUNC_NAME, storage_info_on_storage)
27+
end
28+
29+
--- Polls replicas for storage state
30+
--
31+
-- @function call
32+
--
33+
-- @tparam ?number opts.timeout
34+
-- Function call timeout
35+
--
36+
-- @tparam ?string|table opts.vshard_router
37+
-- Cartridge vshard group name or vshard router instance.
38+
--
39+
-- @return a table of storage states by replica id.
40+
function storage_info.call(opts)
41+
checks({
42+
timeout = '?number',
43+
vshard_router = '?string|table',
44+
})
45+
46+
opts = opts or {}
47+
48+
local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
49+
if err ~= nil then
50+
return nil, StorageInfoError:new(err)
51+
end
52+
53+
local replicasets, err = vshard_router:routeall()
54+
if replicasets == nil then
55+
return nil, StorageInfoError:new("Failed to get router replicasets: %s", err.err)
56+
end
57+
58+
local futures_by_replicas = {}
59+
local replica_state_by_id = {}
60+
local async_opts = {is_async = true}
61+
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
62+
63+
for _, replicaset in pairs(replicasets) do
64+
for replica_id, replica in pairs(replicaset.replicas) do
65+
local master = utils.get_replicaset_master(replicaset, {cached = false})
66+
67+
replica_state_by_id[replica_id] = {
68+
status = "error",
69+
is_master = master == replica
70+
}
71+
72+
local ok, res = pcall(replica.conn.call, replica.conn, CRUD_STORAGE_INFO_FUNC_NAME,
73+
{}, async_opts)
74+
if ok then
75+
futures_by_replicas[replica_id] = res
76+
else
77+
local err_msg = string.format("Error getting storage info for %s", replica_id)
78+
if res ~= nil then
79+
log.error("%s: %s", err_msg, res)
80+
replica_state_by_id[replica_id].message = tostring(res)
81+
else
82+
log.error(err_msg)
83+
replica_state_by_id[replica_id].message = err_msg
84+
end
85+
end
86+
end
87+
end
88+
89+
local deadline = fiber.clock() + timeout
90+
for replica_id, future in pairs(futures_by_replicas) do
91+
local wait_timeout = deadline - fiber.clock()
92+
if wait_timeout < 0 then
93+
wait_timeout = 0
94+
end
95+
96+
local result, err = future:wait_result(wait_timeout)
97+
if result == nil then
98+
future:discard()
99+
local err_msg = string.format("Error getting storage info for %s", replica_id)
100+
if err ~= nil then
101+
if err.type == 'ClientError' and err.code == box.error.NO_SUCH_PROC then
102+
replica_state_by_id[replica_id].status = "uninitialized"
103+
else
104+
log.error("%s: %s", err_msg, err)
105+
replica_state_by_id[replica_id].message = tostring(err)
106+
end
107+
else
108+
log.error(err_msg)
109+
replica_state_by_id[replica_id].message = err_msg
110+
end
111+
else
112+
replica_state_by_id[replica_id].status = result[1].status or "uninitialized"
113+
end
114+
end
115+
116+
return replica_state_by_id
117+
end
118+
119+
return storage_info

0 commit comments

Comments
 (0)