Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 38 additions & 67 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
local errors = require('errors')
local vshard = require('vshard')

local call_cache = require('crud.common.call_cache')
local dev_checks = require('crud.common.dev_checks')
Expand All @@ -8,6 +7,7 @@ local sharding_utils = require('crud.common.sharding.utils')
local fiber = require('fiber')
local const = require('crud.common.const')
local rebalance = require('crud.common.rebalance')
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')

local BaseIterator = require('crud.common.map_call_cases.base_iter')
local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor')
Expand All @@ -20,53 +20,13 @@ local CRUD_CALL_FIBER_NAME = CRUD_CALL_FUNC_NAME .. '/'

local call = {}

local function bucket_unref_many(bucket_ids, mode)
local all_ok = true
local last_err = nil
for _, bucket_id in pairs(bucket_ids) do
local ok, err = vshard.storage.bucket_unref(bucket_id, mode)
if not ok then
all_ok = nil
last_err = err
end
end
return all_ok, last_err
end

local function bucket_ref_many(bucket_ids, mode)
local reffed = {}
for _, bucket_id in pairs(bucket_ids) do
local ok, err = vshard.storage.bucket_ref(bucket_id, mode)
if not ok then
bucket_unref_many(reffed, mode)
return nil, err
end
table.insert(reffed, bucket_id)
end
return true, nil
end

local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, ...)
local function call_on_storage_safe(run_as_user, func_name, ...)
fiber.name(CRUD_CALL_FIBER_NAME .. 'safe/' .. func_name)

local ok, ref_err = bucket_ref_many(bucket_ids, mode)
if not ok then
return nil, ref_err
end

local res = {box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)}

ok, ref_err = bucket_unref_many(bucket_ids, mode)
if not ok then
return nil, ref_err
end

return unpack(res, 1, table.maxn(res))
return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)
end

local function call_on_storage_fast(run_as_user, _, _, func_name, ...)
local function call_on_storage_fast(run_as_user, func_name, ...)
fiber.name(CRUD_CALL_FIBER_NAME .. 'fast/' .. func_name)

return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)
end

Expand Down Expand Up @@ -149,10 +109,11 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, buc
))
end

local function retry_call_with_master_discovery(vshard_router, replicaset,
method, func_name, func_args,
call_opts, mode, bucket_ids)
local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args)
--- Executes a vshard call and retries once after performing recovery actions
--- like bucket cache reset, destination redirect (for single calls), or master discovery.
local function call_with_retry_and_recovery(vshard_router,
replicaset, method, func_name, func_args, call_opts, is_single_call)
local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args)

-- In case cluster was just bootstrapped with auto master discovery,
-- replicaset may miss master.
Expand All @@ -164,16 +125,28 @@ local function retry_call_with_master_discovery(vshard_router, replicaset,

-- This is a partial copy of error handling from vshard.router.router_call_impl()
-- It is much simpler mostly because bucket_set() can't be accessed from outside vshard.
if err.name == 'WRONG_BUCKET' or
err.name == 'BUCKET_IS_LOCKED' or
err.name == 'TRANSFER_IS_IN_PROGRESS' then
vshard_router:_bucket_reset(err.bucket_id)

-- Substitute replicaset only for single bucket_id calls.
if err.destination and vshard_router.replicasets[err.destination] and #bucket_ids == 1 then
replicaset = vshard_router.replicasets[err.destination]
else
return nil, err
if err.class_name == bucket_ref_unref.BucketRefError.name then
local redirect_replicaset
if is_single_call and #err.bucket_ref_errs == 1 then
local single_err = err.bucket_ref_errs[1]
local destination = single_err.vshard_err.destination
if destination and vshard_router.replicasets[destination] then
redirect_replicaset = vshard_router.replicasets[destination]
end
end

for _, bucket_ref_err in pairs(err.bucket_ref_errs) do
local bucket_id = bucket_ref_err.bucket_id
local vshard_err = bucket_ref_err.vshard_err
if vshard_err.name == 'WRONG_BUCKET' or
vshard_err.name == 'BUCKET_IS_LOCKED' or
vshard_err.name == 'TRANSFER_IS_IN_PROGRESS' then
vshard_router:_bucket_reset(bucket_id)
end
end

if redirect_replicaset ~= nil then
replicaset = redirect_replicaset
end
elseif err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then
replicaset:locate_master()
Expand Down Expand Up @@ -227,10 +200,10 @@ function call.map(vshard_router, func_name, func_args, opts)
request_timeout = opts.mode == 'read' and opts.request_timeout or nil,
}
while iter:has_next() do
local args, replicaset, replicaset_id, bucket_ids = iter:get()
local args, replicaset, replicaset_id = iter:get()

local future, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name,
func_name, args, call_opts, opts.mode, bucket_ids)
local future, err = call_with_retry_and_recovery(vshard_router, replicaset, vshard_call_name,
func_name, args, call_opts, false)

if err ~= nil then
local result_info = {
Expand Down Expand Up @@ -303,9 +276,8 @@ function call.single(vshard_router, bucket_id, func_name, func_args, opts)
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
local request_timeout = opts.mode == 'read' and opts.request_timeout or nil

local res, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name,
func_name, func_args, {timeout = timeout, request_timeout = request_timeout},
opts.mode, {bucket_id})
local res, err = call_with_retry_and_recovery(vshard_router, replicaset, vshard_call_name,
func_name, func_args, {timeout = timeout, request_timeout = request_timeout}, true)
if err ~= nil then
return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id)
end
Expand All @@ -330,9 +302,8 @@ function call.any(vshard_router, func_name, func_args, opts)
end
local replicaset_id, replicaset = next(replicasets)

local res, err = retry_call_with_master_discovery(vshard_router, replicaset, 'call',
func_name, func_args, {timeout = timeout},
'read', {})
local res, err = call_with_retry_and_recovery(vshard_router, replicaset, 'call',
func_name, func_args, {timeout = timeout}, false)
if err ~= nil then
return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id)
end
Expand Down
3 changes: 1 addition & 2 deletions crud/common/map_call_cases/base_iter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,12 @@ end
-- @return[1] table func_args
-- @return[2] table replicaset
-- @return[3] string replicaset_id
-- @return[4] table bucket_ids
function BaseIterator:get()
local replicaset_id = self.next_index
local replicaset = self.next_replicaset
self.next_index, self.next_replicaset = next(self.replicasets, self.next_index)

return self.func_args, replicaset, replicaset_id, {}
return self.func_args, replicaset, replicaset_id
end

return BaseIterator
4 changes: 1 addition & 3 deletions crud/common/map_call_cases/batch_insert_iter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ end
-- @return[1] table func_args
-- @return[2] table replicaset
-- @return[3] string replicaset_id
-- @return[4] table bucket_ids
function BatchInsertIterator:get()
local replicaset_id = self.next_index
local replicaset = self.next_batch.replicaset
Expand All @@ -77,11 +76,10 @@ function BatchInsertIterator:get()
self.next_batch.tuples,
self.opts,
}
local bucket_ids = self.next_batch.bucket_ids

self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index)

return func_args, replicaset, replicaset_id, bucket_ids
return func_args, replicaset, replicaset_id
end

return BatchInsertIterator
4 changes: 1 addition & 3 deletions crud/common/map_call_cases/batch_upsert_iter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ end
-- @return[1] table func_args
-- @return[2] table replicaset
-- @return[3] string replicaset_id
-- @return[4] table bucket_ids
function BatchUpsertIterator:get()
local replicaset_id = self.next_index
local replicaset = self.next_batch.replicaset
Expand All @@ -86,11 +85,10 @@ function BatchUpsertIterator:get()
self.next_batch.operations,
self.opts,
}
local bucket_ids = self.next_batch.bucket_ids

self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index)

return func_args, replicaset, replicaset_id, bucket_ids
return func_args, replicaset, replicaset_id
end

return BatchUpsertIterator
Loading
Loading