Skip to content

Commit 2f630af

Browse files
committed
[TNTP-2109] Switch to safe mode on vshard rebalance
1 parent a35f0a8 commit 2f630af

File tree

4 files changed

+234
-2
lines changed

4 files changed

+234
-2
lines changed

crud.lua

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ local readview = require('crud.readview')
2323
local schema = require('crud.schema')
2424
local storage_info = require('crud.storage_info')
2525
local storage = require('crud.storage')
26+
local rebalance = require('crud.common.rebalance')
2627

2728
local crud = {}
2829

@@ -158,8 +159,23 @@ crud.readview = readview.new
158159
-- @function schema
159160
crud.schema = schema.call
160161

162+
crud.rebalance = {}
163+
164+
-- @refer rebalance.router_cache_clear
165+
-- @function router_cache_clear
166+
crud.rebalance.router_cache_clear = rebalance.router.cache_clear
167+
168+
-- @refer rebalance.router_cache_length
169+
-- @function router_cache_length
170+
crud.rebalance.router_cache_length = rebalance.router.cache_length
171+
172+
-- @refer rebalance.router_cache_last_clear_ts
173+
-- @function router_cache_last_clear_ts
174+
crud.rebalance.router_cache_last_clear_ts = rebalance.router.cache_last_clear_ts
175+
161176
function crud.init_router()
162-
rawset(_G, 'crud', crud)
177+
rawset(_G, 'crud', crud)
178+
rebalance.metrics.enable_router_metrics()
163179
end
164180

165181
function crud.stop_router()

crud/common/rebalance.lua

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
local fiber = require('fiber')
2+
local log = require('log')
3+
local vshard_consts = require('vshard.consts')
4+
local utils = require('crud.common.utils')
5+
6+
local has_metrics_module, metrics = pcall(require, 'metrics')
7+
8+
local SETTINGS_SPACE_NAME = '_crud_settings'
9+
local SAFE_MOD_ENABLE_EVENT = '_crud.safe_mode_enable'
10+
11+
local M = {
12+
safe_mode = false,
13+
safe_mode_enable_hooks = {},
14+
safe_mode_disable_hooks = {},
15+
_router_cache_last_clear_ts = fiber.time()
16+
}
17+
18+
local function create_space()
19+
local settings_space = box.schema.space.create(SETTINGS_SPACE_NAME, {
20+
engine = 'memtx',
21+
format = {
22+
{ name = 'key', type = 'string' },
23+
{ name = 'value', type = 'any' },
24+
},
25+
if_not_exists = true,
26+
})
27+
settings_space:create_index('primary', { parts = { 'key' }, if_not_exists = true })
28+
end
29+
30+
local function safe_mode_trigger(_, new, space, op)
31+
if space ~= '_bucket' then
32+
return
33+
end
34+
if (op == 'INSERT' and new.status == vshard_consts.BUCKET.RECEIVING) or
35+
(op == 'REPLACE' and new.status == vshard_consts.BUCKET.SENDING) then
36+
box.broadcast(SAFE_MOD_ENABLE_EVENT, true)
37+
end
38+
end
39+
40+
local function register_enable_hook(func)
41+
M.safe_mode_enable_hooks[func] = true
42+
end
43+
44+
local function remove_enable_hook(func)
45+
M.safe_mode_enable_hooks[func] = nil
46+
end
47+
48+
local function register_disable_hook(func)
49+
M.safe_mode_disable_hooks[func] = true
50+
end
51+
52+
local function remove_disable_hook(func)
53+
M.safe_mode_disable_hooks[func] = nil
54+
end
55+
56+
local function safe_mode_status()
57+
return M.safe_mode
58+
end
59+
60+
local function safe_mode_enable()
61+
if not box.info.ro then
62+
box.space[SETTINGS_SPACE_NAME]:replace{ 'safe_mode', true }
63+
for _, trig in pairs(box.space._bucket:on_replace()) do
64+
if trig == safe_mode_trigger then
65+
box.space._bucket:on_replace(nil, safe_mode_trigger)
66+
end
67+
end
68+
end
69+
M.safe_mode = true
70+
71+
for hook, _ in pairs(M.safe_mode_enable_hooks) do
72+
hook()
73+
end
74+
75+
log.info('Rebalance safe mode enabled')
76+
end
77+
78+
local function safe_mode_disable()
79+
if not box.info.ro then
80+
box.space[SETTINGS_SPACE_NAME]:replace{ 'safe_mode', false }
81+
box.space._bucket:on_replace(safe_mode_trigger)
82+
end
83+
M.safe_mode = false
84+
85+
for hook, _ in pairs(M.safe_mode_disable_hooks) do
86+
hook()
87+
end
88+
89+
log.info('Rebalance safe mode disabled')
90+
end
91+
92+
local function rebalance_init()
93+
M.metrics.enable_storage_metrics()
94+
95+
-- box.watch was introduced in tarantool 2.10.0
96+
if not utils.tarantool_supports_box_watch() then
97+
log.warn('This version of tarantool does not support autoswitch to safe mode during rebalance. '
98+
.. 'Update to newer version or use `_crud.rebalance_safe_mode_enable()` to enable safe mode manually.')
99+
return
100+
end
101+
102+
box.watch('box.status', function()
103+
if box.info.ro then
104+
return
105+
end
106+
107+
local stored_safe_mode
108+
if box.space[SETTINGS_SPACE_NAME] == nil then
109+
create_space()
110+
box.space[SETTINGS_SPACE_NAME]:insert{ 'safe_mode', false }
111+
else
112+
stored_safe_mode = box.space[SETTINGS_SPACE_NAME]:get{ 'safe_mode' }
113+
end
114+
M.safe_mode = stored_safe_mode and stored_safe_mode.value or false
115+
116+
if M.safe_mode then
117+
for hook, _ in pairs(M.safe_mode_enable_hooks) do
118+
hook()
119+
end
120+
else
121+
box.space._bucket:on_replace(safe_mode_trigger)
122+
for hook, _ in pairs(M.safe_mode_disable_hooks) do
123+
hook()
124+
end
125+
end
126+
end)
127+
128+
box.watch(SAFE_MOD_ENABLE_EVENT, function(_, do_enable)
129+
if box.info.ro or not do_enable then
130+
return
131+
end
132+
safe_mode_enable()
133+
end)
134+
end
135+
136+
local function router_cache_clear()
137+
M._router_cache_last_clear_ts = fiber.time()
138+
return utils.get_vshard_router_instance():_route_map_clear()
139+
end
140+
141+
local function router_cache_length()
142+
return utils.get_vshard_router_instance().known_bucket_count
143+
end
144+
145+
local function router_cache_last_clear_ts()
146+
return M._router_cache_last_clear_ts
147+
end
148+
149+
-- Rebalance related metrics
150+
local function enable_storage_metrics()
151+
if not has_metrics_module then
152+
return
153+
end
154+
155+
local safe_mode_enabled_gauge = metrics.gauge(
156+
'tnt_crud_storage_safe_mode_enabled',
157+
"is safe mode enabled on this storage instance"
158+
)
159+
160+
metrics.register_callback(function()
161+
safe_mode_enabled_gauge:set(safe_mode_status() and 1 or 0)
162+
end)
163+
end
164+
165+
local function enable_router_metrics()
166+
if not has_metrics_module then
167+
return
168+
end
169+
170+
local router_cache_length_gauge = metrics.gauge(
171+
'tnt_crud_router_cache_length',
172+
"number of bucket routes in vshard router cache"
173+
)
174+
local router_cache_last_clear_ts_gauge = metrics.gauge(
175+
'tnt_crud_router_cache_last_clear_ts',
176+
"when vshard router cache was cleared last time"
177+
)
178+
179+
metrics.register_callback(function()
180+
router_cache_length_gauge:set(router_cache_length())
181+
router_cache_last_clear_ts_gauge:set(router_cache_last_clear_ts())
182+
end)
183+
end
184+
185+
M.init = rebalance_init
186+
M.safe_mode_status = safe_mode_status
187+
M.safe_mode_enable = safe_mode_enable
188+
M.safe_mode_disable = safe_mode_disable
189+
M.register_safe_mode_enable_hook = register_enable_hook
190+
M.remove_safe_mode_enable_hook = remove_enable_hook
191+
M.register_safe_mode_disable_hook = register_disable_hook
192+
M.remove_safe_mode_disable_hook = remove_disable_hook
193+
194+
M.router = {
195+
cache_clear = router_cache_clear,
196+
cache_length = router_cache_length,
197+
cache_last_clear_ts = router_cache_last_clear_ts,
198+
}
199+
200+
M.storage_api = {
201+
rebalance_safe_mode_status = safe_mode_status,
202+
rebalance_safe_mode_enable = safe_mode_enable,
203+
rebalance_safe_mode_disable = safe_mode_disable,
204+
}
205+
206+
M.metrics = {
207+
enable_storage_metrics = enable_storage_metrics,
208+
enable_router_metrics = enable_router_metrics,
209+
}
210+
211+
return M

crud/schema.lua

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ schema.system_spaces = {
4646
['_tt_migrations'] = true,
4747
-- https://github.com/tarantool/cluster-federation/blob/01738cafa0dc7a3138e64f93c4e84cb323653257/src/internal/utils/utils.go#L17
4848
['_cdc_state'] = true,
49+
-- crud/common/rebalance.lua
50+
['_crud_settings'] = true,
4951
}
5052

5153
local function get_crud_schema(space)

crud/storage.lua

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ local dev_checks = require('crud.common.dev_checks')
44
local stash = require('crud.common.stash')
55
local utils = require('crud.common.utils')
66

7+
local rebalance = require('crud.common.rebalance')
78
local call = require('crud.common.call')
89
local sharding_metadata = require('crud.common.sharding.sharding_metadata')
910
local insert = require('crud.insert')
@@ -62,6 +63,7 @@ local function init_storage_call(user, storage_api)
6263
end
6364

6465
local modules_with_storage_api = {
66+
rebalance,
6567
call,
6668
sharding_metadata,
6769
insert,
@@ -103,6 +105,8 @@ local function init_impl()
103105
user = utils.get_this_replica_user() or 'guest'
104106
end
105107

108+
rebalance.init()
109+
106110
for _, module in ipairs(modules_with_storage_api) do
107111
init_storage_call(user, module.storage_api)
108112
end
@@ -141,7 +145,6 @@ function storage.stop()
141145
internal_stash.watcher:unregister()
142146
internal_stash.watcher = nil
143147
end
144-
145148
rawset(_G, utils.STORAGE_NAMESPACE, nil)
146149
end
147150

0 commit comments

Comments
 (0)