Skip to content

Commit 9b626bb

Browse files
committed
[TNTP-2109] Add safe mode with bucket_ref/unref
1 parent 2f630af commit 9b626bb

File tree

7 files changed

+137
-26
lines changed

7 files changed

+137
-26
lines changed

crud/common/call.lua

Lines changed: 99 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
local errors = require('errors')
2+
local vshard = require('vshard')
23

34
local call_cache = require('crud.common.call_cache')
45
local dev_checks = require('crud.common.dev_checks')
56
local utils = require('crud.common.utils')
67
local sharding_utils = require('crud.common.sharding.utils')
7-
local fiber_clock = require('fiber').clock
8+
local fiber = require('fiber')
89
local const = require('crud.common.const')
10+
local rebalance = require('crud.common.rebalance')
911

1012
local BaseIterator = require('crud.common.map_call_cases.base_iter')
1113
local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor')
@@ -14,14 +16,79 @@ local CallError = errors.new_class('CallError')
1416

1517
local CALL_FUNC_NAME = 'call_on_storage'
1618
local CRUD_CALL_FUNC_NAME = utils.get_storage_call(CALL_FUNC_NAME)
17-
19+
local CRUD_CALL_FIBER_NAME = CRUD_CALL_FUNC_NAME .. '/'
1820

1921
local call = {}
2022

21-
local function call_on_storage(run_as_user, func_name, ...)
23+
local function bucket_unref_many(bucket_ids, mode)
24+
local all_ok = true
25+
local last_err = nil
26+
for _, bucket_id in pairs(bucket_ids) do
27+
local ok, err = vshard.storage.bucket_unref(bucket_id, mode)
28+
if not ok then
29+
all_ok = nil
30+
last_err = err
31+
end
32+
end
33+
return all_ok, last_err
34+
end
35+
36+
local function bucket_ref_many(bucket_ids, mode)
37+
local reffed = {}
38+
for _, bucket_id in pairs(bucket_ids) do
39+
local ok, err = vshard.storage.bucket_ref(bucket_id, mode)
40+
if not ok then
41+
bucket_unref_many(reffed, mode)
42+
return nil, err
43+
end
44+
table.insert(reffed, bucket_id)
45+
end
46+
return true, nil
47+
end
48+
49+
local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, ...)
50+
fiber.name(CRUD_CALL_FIBER_NAME .. 'safe/' .. func_name)
51+
52+
local ok, ref_err = bucket_ref_many(bucket_ids, mode)
53+
if not ok then
54+
return nil, ref_err
55+
end
56+
57+
local res = {box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)}
58+
59+
ok, ref_err = bucket_unref_many(bucket_ids, mode)
60+
if not ok then
61+
return nil, ref_err
62+
end
63+
64+
return unpack(res, 1, table.maxn(res))
65+
end
66+
67+
local function call_on_storage_fast(run_as_user, _, _, func_name, ...)
68+
fiber.name(CRUD_CALL_FIBER_NAME .. 'fast/' .. func_name)
69+
2270
return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)
2371
end
2472

73+
local call_on_storage = rebalance.safe_mode and call_on_storage_safe or call_on_storage_fast
74+
75+
local function safe_mode_enable()
76+
call_on_storage = call_on_storage_safe
77+
78+
for fb_id, fb in pairs(fiber.info()) do
79+
if string.find(fb.name, CRUD_CALL_FIBER_NAME) then
80+
fiber.kill(fb_id)
81+
end
82+
end
83+
end
84+
85+
local function safe_mode_disable()
86+
call_on_storage = call_on_storage_fast
87+
end
88+
89+
rebalance.register_safe_mode_enable_hook(safe_mode_enable)
90+
rebalance.register_safe_mode_disable_hook(safe_mode_disable)
91+
2592
call.storage_api = {[CALL_FUNC_NAME] = call_on_storage}
2693

2794
function call.get_vshard_call_name(mode, prefer_replica, balance)
@@ -82,8 +149,10 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, buc
82149
))
83150
end
84151

85-
local function retry_call_with_master_discovery(replicaset, method, func_name, func_args, call_opts)
86-
local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args)
152+
local function retry_call_with_master_discovery(vshard_router, replicaset,
153+
method, func_name, func_args,
154+
call_opts, mode, bucket_ids)
155+
local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args)
87156

88157
-- In case cluster was just bootstrapped with auto master discovery,
89158
-- replicaset may miss master.
@@ -93,7 +162,20 @@ local function retry_call_with_master_discovery(replicaset, method, func_name, f
93162
return resp, err
94163
end
95164

96-
if err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then
165+
-- This is a partial copy of error handling from vshard.router.router_call_impl()
166+
-- It is much simpler mostly because bucket_set() can't be accessed from outside vshard.
167+
if err.name == 'WRONG_BUCKET' or
168+
err.name == 'BUCKET_IS_LOCKED' or
169+
err.name == 'TRANSFER_IS_IN_PROGRESS' then
170+
vshard_router:_bucket_reset(err.bucket_id)
171+
172+
-- Substitute replicaset only for single bucket_id calls.
173+
if err.destination and vshard_router.replicasets[err.destination] and #bucket_ids == 1 then
174+
replicaset = vshard_router.replicasets[err.destination]
175+
else
176+
return nil, err
177+
end
178+
elseif err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then
97179
replicaset:locate_master()
98180
end
99181

@@ -145,10 +227,10 @@ function call.map(vshard_router, func_name, func_args, opts)
145227
request_timeout = opts.mode == 'read' and opts.request_timeout or nil,
146228
}
147229
while iter:has_next() do
148-
local args, replicaset, replicaset_id = iter:get()
230+
local args, replicaset, replicaset_id, bucket_ids = iter:get()
149231

150-
local future, err = retry_call_with_master_discovery(replicaset, vshard_call_name,
151-
func_name, args, call_opts)
232+
local future, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name,
233+
func_name, args, call_opts, opts.mode, bucket_ids)
152234

153235
if err ~= nil then
154236
local result_info = {
@@ -170,9 +252,9 @@ function call.map(vshard_router, func_name, func_args, opts)
170252
futures_by_replicasets[replicaset_id] = future
171253
end
172254

173-
local deadline = fiber_clock() + timeout
255+
local deadline = fiber.clock() + timeout
174256
for replicaset_id, future in pairs(futures_by_replicasets) do
175-
local wait_timeout = deadline - fiber_clock()
257+
local wait_timeout = deadline - fiber.clock()
176258
if wait_timeout < 0 then
177259
wait_timeout = 0
178260
end
@@ -221,9 +303,9 @@ function call.single(vshard_router, bucket_id, func_name, func_args, opts)
221303
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
222304
local request_timeout = opts.mode == 'read' and opts.request_timeout or nil
223305

224-
local res, err = retry_call_with_master_discovery(replicaset, vshard_call_name,
225-
func_name, func_args, {timeout = timeout,
226-
request_timeout = request_timeout})
306+
local res, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name,
307+
func_name, func_args, {timeout = timeout, request_timeout = request_timeout},
308+
opts.mode, {bucket_id})
227309
if err ~= nil then
228310
return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id)
229311
end
@@ -248,8 +330,9 @@ function call.any(vshard_router, func_name, func_args, opts)
248330
end
249331
local replicaset_id, replicaset = next(replicasets)
250332

251-
local res, err = retry_call_with_master_discovery(replicaset, 'call',
252-
func_name, func_args, {timeout = timeout})
333+
local res, err = retry_call_with_master_discovery(vshard_router, replicaset, 'call',
334+
func_name, func_args, {timeout = timeout},
335+
'read', {})
253336
if err ~= nil then
254337
return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id)
255338
end

crud/common/map_call_cases/base_iter.lua

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,13 @@ end
6767
-- @return[1] table func_args
6868
-- @return[2] table replicaset
6969
-- @return[3] string replicaset_id
70+
-- @return[4] table bucket_ids
7071
function BaseIterator:get()
7172
local replicaset_id = self.next_index
7273
local replicaset = self.next_replicaset
7374
self.next_index, self.next_replicaset = next(self.replicasets, self.next_index)
7475

75-
return self.func_args, replicaset, replicaset_id
76+
return self.func_args, replicaset, replicaset_id, {}
7677
end
7778

7879
return BaseIterator

crud/common/map_call_cases/batch_insert_iter.lua

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ end
6868
-- @return[1] table func_args
6969
-- @return[2] table replicaset
7070
-- @return[3] string replicaset_id
71+
-- @return[4] table bucket_ids
7172
function BatchInsertIterator:get()
7273
local replicaset_id = self.next_index
7374
local replicaset = self.next_batch.replicaset
@@ -76,10 +77,11 @@ function BatchInsertIterator:get()
7677
self.next_batch.tuples,
7778
self.opts,
7879
}
80+
local bucket_ids = self.next_batch.bucket_ids
7981

8082
self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index)
8183

82-
return func_args, replicaset, replicaset_id
84+
return func_args, replicaset, replicaset_id, bucket_ids
8385
end
8486

8587
return BatchInsertIterator

crud/common/map_call_cases/batch_upsert_iter.lua

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ end
7676
-- @return[1] table func_args
7777
-- @return[2] table replicaset
7878
-- @return[3] string replicaset_id
79+
-- @return[4] table bucket_ids
7980
function BatchUpsertIterator:get()
8081
local replicaset_id = self.next_index
8182
local replicaset = self.next_batch.replicaset
@@ -85,10 +86,11 @@ function BatchUpsertIterator:get()
8586
self.next_batch.operations,
8687
self.opts,
8788
}
89+
local bucket_ids = self.next_batch.bucket_ids
8890

8991
self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index)
9092

91-
return func_args, replicaset, replicaset_id
93+
return func_args, replicaset, replicaset_id, bucket_ids
9294
end
9395

9496
return BatchUpsertIterator

crud/common/sharding/init.lua

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,8 +324,10 @@ function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts)
324324
local record_by_replicaset = batches[replicaset_id] or {
325325
replicaset = replicaset,
326326
tuples = {},
327+
bucket_ids = {},
327328
}
328329
table.insert(record_by_replicaset.tuples, tuple)
330+
record_by_replicaset.bucket_ids[sharding_data.bucket_id] = true
329331

330332
if opts.operations ~= nil then
331333
record_by_replicaset.operations = record_by_replicaset.operations or {}
@@ -335,6 +337,14 @@ function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts)
335337
batches[replicaset_id] = record_by_replicaset
336338
end
337339

340+
for _, rbr in pairs(batches) do
341+
local bucket_ids = {}
342+
for bid, _ in pairs(rbr.bucket_ids) do
343+
table.insert(bucket_ids, bid)
344+
end
345+
rbr.bucket_ids = bucket_ids
346+
end
347+
338348
return {
339349
batches = batches,
340350
sharding_func_hash = sharding_func_hash,

crud/select/merger.lua

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,11 @@ local function fetch_chunk(context, state)
171171

172172
-- change context.func_args too, but it does not matter
173173
next_func_args[4].after_tuple = cursor.after_tuple
174-
local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, next_func_args)
174+
local mode = "read"
175+
local bucket_ids = {}
176+
local func_args_ext = utils.append_array(
177+
{ box.session.effective_user(), bucket_ids, mode, func_name },
178+
next_func_args)
175179

176180
if context.readview then
177181
next_state = {future = context.future_replica.conn:call("_crud.call_on_storage",
@@ -203,7 +207,10 @@ local function new(vshard_router, replicasets, space, index_id, func_name, func_
203207
local buf = buffer.ibuf()
204208
local net_box_opts = {is_async = true, buffer = buf,
205209
skip_header = utils.tarantool_supports_netbox_skip_header_option() or nil}
206-
local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args)
210+
local bucket_ids = {}
211+
local func_args_ext = utils.append_array(
212+
{ box.session.effective_user(), bucket_ids, mode, func_name },
213+
func_args)
207214
local future = replicaset[vshard_call_name](replicaset, "_crud.call_on_storage",
208215
func_args_ext, net_box_opts)
209216

@@ -279,8 +286,13 @@ local function new_readview(vshard_router, replicasets, readview_info, space, in
279286
local net_box_opts = {is_async = true, buffer = buf,
280287
skip_header = utils.tarantool_supports_netbox_skip_header_option() or nil}
281288
func_args[4].readview_id = replicaset_info.id
282-
local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args)
283-
local future = replica.conn:call("_crud.call_on_storage", func_args_ext, net_box_opts)
289+
local mode = "read"
290+
local bucket_ids = {}
291+
local func_args_ext = utils.append_array(
292+
{ box.session.effective_user(), bucket_ids, mode, func_name },
293+
func_args)
294+
local future = replica.conn:call("_crud.call_on_storage",
295+
func_args_ext, net_box_opts)
284296

285297
-- Create a source.
286298
local context = {

test/unit/privileges_test.lua

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@ g.before_all(function()
1717
end)
1818

1919
g.test_prepend_current_user_smoke = function()
20-
local res = call.storage_api.call_on_storage(box.session.effective_user(), "unittestfunc", {"too", "foo"})
20+
local res = call.storage_api.call_on_storage(
21+
box.session.effective_user(), {}, "read", "unittestfunc", {"too", "foo"})
2122
t.assert_equals(res, {"too", "foo"})
2223
end
2324

2425
g.test_non_existent_user = function()
2526
t.assert_error_msg_contains("User 'non_existent_user' is not found",
26-
call.storage_api.call_on_storage, "non_existent_user", "unittestfunc")
27+
call.storage_api.call_on_storage, "non_existent_user", {}, "read", "unittestfunc")
2728
end
2829

2930
g.test_that_the_session_switches_back = function()
@@ -34,7 +35,7 @@ g.test_that_the_session_switches_back = function()
3435
local reference_user = box.session.effective_user()
3536
t.assert_not_equals(reference_user, "unittestuser")
3637

37-
local res = call.storage_api.call_on_storage("unittestuser", "unittestfunc2")
38+
local res = call.storage_api.call_on_storage("unittestuser", {}, "read", "unittestfunc2")
3839
t.assert_equals(res, "unittestuser")
3940
t.assert_equals(box.session.effective_user(), reference_user)
4041
end

0 commit comments

Comments
 (0)