Skip to content

Commit 6a1200a

Browse files
authored
Add read strategy parameters opts (#132)
Added `mode`, `prefer_replica` and `balance` options for read operations (`get`, `select`, `pairs`). According to this parameters one of vshard calls (`callrw`, `callro`, `callbro`, `callre`, `callbre`) is selected.
1 parent a7e0dd2 commit 6a1200a

File tree

17 files changed

+562
-142
lines changed

17 files changed

+562
-142
lines changed

.luacheckrc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
redefined = false
2-
globals = {'box', 'utf8'}
2+
globals = {'box', 'utf8', 'checkers'}
33
include_files = {'**/*.lua', '*.luacheckrc', '*.rockspec'}
44
exclude_files = {'**/*.rocks/', 'tmp/', 'tarantool-enterprise/'}
55
max_line_length = 120

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
77

88
## [Unreleased]
99

10+
### Added
11+
12+
* `mode`, `prefer_replica` and `balance` options for read operations
13+
(get, select, pairs). According to this parameters one of vshard
14+
calls (`callrw`, `callro`, `callbro`, `callre`, `callbre`) is selected
15+
1016
## [0.5.0] - 2021-03-10
1117

1218
### Fixed

README.md

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,14 @@ where:
113113
* `space_name` (`string`) - name of the space
114114
* `key` (`any`) - primary key value
115115
* `opts`:
116-
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
117-
* `bucket_id` (`?number|cdata`) - bucket ID
118116
* `fields` (`?table`) - field names for getting only a subset of fields
117+
* `bucket_id` (`?number|cdata`) - bucket ID
118+
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
119+
* `mode` (`?string`, `read` or `write`) - if `write` is specified then `get` is
120+
performed on master
121+
* `prefer_replica` (`?boolean`) - if `true` then the preferred target is one of
122+
the replicas
123+
* `balance` (`?boolean`) - use replica according to vshard load balancing policy
119124

120125
Returns metadata and array contains one row, error.
121126

@@ -315,9 +320,14 @@ where:
315320
(`after` option is required in this case).
316321
* `after` (`?table`) - tuple after which objects should be selected
317322
* `batch_size` (`?number`) - number of tuples to process per one request to storage
318-
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
319323
* `bucket_id` (`?number|cdata`) - bucket ID
320-
(is used when select by full primary key is performed)
324+
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
325+
* `mode` (`?string`, `read` or `write`) - if `write` is specified then `select` is
326+
performed on master
327+
* `prefer_replica` (`?boolean`) - if `true` then the preferred target is one of
328+
the replicas
329+
* `balance` (`?boolean`) - use replica according to vshard load balancing policy
330+
321331

322332
Returns metadata and array of rows, error.
323333

crud/common/call.lua

Lines changed: 76 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,70 @@ local call = {}
1212

1313
local DEFAULT_VSHARD_CALL_TIMEOUT = 2
1414

15-
local function call_impl(vshard_call, func_name, func_args, opts)
16-
dev_checks('string', 'string', '?table', {
15+
local function get_vshard_call_name(mode, prefer_replica, balance)
16+
dev_checks('string', '?boolean', '?boolean')
17+
18+
if mode == 'write' then
19+
return 'callrw'
20+
end
21+
22+
if not prefer_replica and not balance then
23+
return 'callro'
24+
end
25+
26+
if not prefer_replica and balance then
27+
return 'callbro'
28+
end
29+
30+
if prefer_replica and not balance then
31+
return 'callre'
32+
end
33+
34+
-- prefer_replica and balance
35+
return 'callbre'
36+
end
37+
38+
local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id)
39+
if err.type == 'ClientError' and type(err.message) == 'string' then
40+
if err.message == string.format("Procedure '%s' is not defined", func_name) then
41+
if func_name:startswith('_crud.') then
42+
err = NotInitializedError:new("crud isn't initialized on replicaset: %q", replicaset_uuid)
43+
else
44+
err = NotInitializedError:new("Function %s is not registered", func_name)
45+
end
46+
end
47+
end
48+
49+
if replicaset_uuid == nil then
50+
local replicaset, _ = vshard.router.route(bucket_id)
51+
if replicaset == nil then
52+
return CallError:new(
53+
"Function returned an error, but we couldn't figure out the replicaset: %s", err
54+
)
55+
end
56+
57+
replicaset_uuid = replicaset.uuid
58+
end
59+
60+
err = errors.wrap(err)
61+
62+
return CallError:new(utils.format_replicaset_error(
63+
replicaset_uuid, "Function returned an error: %s", err
64+
))
65+
end
66+
67+
function call.map(func_name, func_args, opts)
68+
dev_checks('string', '?table', {
69+
mode = 'string',
70+
prefer_replica = '?boolean',
71+
balance = '?boolean',
1772
timeout = '?number',
1873
replicasets = '?table',
1974
})
20-
2175
opts = opts or {}
2276

77+
local vshard_call_name = get_vshard_call_name(opts.mode, opts.prefer_replica, opts.balance)
78+
2379
local timeout = opts.timeout or DEFAULT_VSHARD_CALL_TIMEOUT
2480

2581
local replicasets, err
@@ -35,7 +91,7 @@ local function call_impl(vshard_call, func_name, func_args, opts)
3591
local futures_by_replicasets = {}
3692
local call_opts = {is_async = true}
3793
for _, replicaset in pairs(replicasets) do
38-
local future = replicaset[vshard_call](replicaset, func_name, func_args, call_opts)
94+
local future = replicaset[vshard_call_name](replicaset, func_name, func_args, call_opts)
3995
futures_by_replicasets[replicaset.uuid] = future
4096
end
4197

@@ -53,101 +109,33 @@ local function call_impl(vshard_call, func_name, func_args, opts)
53109
end
54110

55111
if err ~= nil then
56-
if err.type == 'ClientError' and type(err.message) == 'string' then
57-
if err.message == string.format("Procedure '%s' is not defined", func_name) then
58-
if func_name:startswith('_crud.') then
59-
err = NotInitializedError:new("crud isn't initialized on replicaset: %q", replicaset_uuid)
60-
else
61-
err = NotInitializedError:new("Function %s is not registered", func_name)
62-
end
63-
end
64-
end
65-
err = errors.wrap(err)
66-
return nil, CallError:new(utils.format_replicaset_error(
67-
replicaset_uuid, "Function returned an error: %s", err
68-
))
112+
return nil, wrap_vshard_err(err, func_name, replicaset_uuid)
69113
end
114+
70115
results[replicaset_uuid] = result[1]
71116
end
72117

73118
return results
74119
end
75120

76-
--- Calls specified function on all cluster storages.
77-
--
78-
-- Allowed functions to call can be specified by `crud.register` call.
79-
-- If function with specified `opts.func_name` isn't registered,
80-
-- global function with this name is called.
81-
--
82-
-- Uses vshard `replicaset:callrw`
83-
--
84-
-- @function rw
85-
--
86-
-- @param string func_name
87-
-- A function name
88-
--
89-
-- @param ?table func_args
90-
-- Array of arguments to be passed to the function
91-
--
92-
-- @tparam table opts Available options are:
93-
--
94-
-- @tparam ?number opts.timeout
95-
-- Function call timeout
96-
--
97-
-- @tparam ?table opts.replicasets
98-
-- vshard replicasets to call the function.
99-
-- By default, function is called on the all storages.
100-
--
101-
-- Returns map {replicaset_uuid: result} with all specified replicasets results
102-
--
103-
-- @return[1] table
104-
-- @treturn[2] nil
105-
-- @treturn[2] table Error description
106-
--
107-
function call.rw(func_name, func_args, opts)
108-
return call_impl('callrw', func_name, func_args, opts)
109-
end
121+
function call.single(bucket_id, func_name, func_args, opts)
122+
dev_checks('number', 'string', '?table', {
123+
mode = 'string',
124+
prefer_replica = '?boolean',
125+
balance = '?boolean',
126+
timeout = '?number',
127+
})
110128

111-
--- Calls specified function on all cluster storages.
112-
--
113-
-- The same as `rw`, but uses vshard `replicaset:callro`
114-
--
115-
-- @function ro
116-
--
117-
function call.ro(func_name, func_args, opts)
118-
return call_impl('callro', func_name, func_args, opts)
119-
end
129+
local vshard_call_name = get_vshard_call_name(opts.mode, opts.prefer_replica, opts.balance, opts.mode)
120130

121-
--- Calls specified function on a node according to bucket_id.
122-
--
123-
-- Exactly mimics the contract of vshard.router.callrw, but adds
124-
-- better error hangling
125-
--
126-
-- @function rw_single
127-
--
128-
function call.rw_single(bucket_id, func_name, func_args, options)
129-
local res, err = vshard.router.callrw(bucket_id, func_name, func_args, options)
130-
131-
-- This is a workaround, until vshard supports telling us where the error happened
132-
if err ~= nil then
133-
if type(err) == 'table' and err.type == 'ClientError' and type(err.message) == 'string' then
134-
if err.message == string.format("Procedure '%s' is not defined", func_name) then
135-
err = NotInitializedError:new("crud isn't initialized on replicaset")
136-
end
137-
end
138-
139-
local replicaset, _ = vshard.router.route(bucket_id)
140-
if replicaset == nil then
141-
return nil, CallError:new(
142-
"Function returned an error, but we couldn't figure out the replicaset: %s", err
143-
)
144-
end
131+
local timeout = opts.timeout or DEFAULT_VSHARD_CALL_TIMEOUT
145132

146-
err = errors.wrap(err)
133+
local res, err = vshard.router[vshard_call_name](bucket_id, func_name, func_args, {
134+
timeout = timeout,
135+
})
147136

148-
return nil, CallError:new(utils.format_replicaset_error(
149-
replicaset.uuid, "Function returned an error: %s", err
150-
))
137+
if err ~= nil then
138+
return nil, wrap_vshard_err(err, func_name, nil, bucket_id)
151139
end
152140

153141
if res == box.NULL then

crud/delete.lua

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,14 @@ local function call_delete_on_router(space_name, key, opts)
5656
end
5757

5858
local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
59-
local storage_result, err = call.rw_single(
59+
local call_opts = {
60+
mode = 'write',
61+
timeout = opts.timeout,
62+
}
63+
local storage_result, err = call.single(
6064
bucket_id, DELETE_FUNC_NAME,
6165
{space_name, key, opts.fields},
62-
{timeout = opts.timeout}
66+
call_opts
6367
)
6468

6569
if err ~= nil then

crud/get.lua

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ local function call_get_on_router(space_name, key, opts)
4242
timeout = '?number',
4343
bucket_id = '?number|cdata',
4444
fields = '?table',
45+
prefer_replica = '?boolean',
46+
balance = '?boolean',
47+
mode = '?string',
4548
})
4649

4750
opts = opts or {}
@@ -56,14 +59,16 @@ local function call_get_on_router(space_name, key, opts)
5659
end
5760

5861
local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
59-
-- We don't use callro() here, because if the replication is
60-
-- async, there could be a lag between master and replica, so a
61-
-- connector which sequentially calls put() and then get() may get
62-
-- a stale result.
63-
local storage_result, err = call.rw_single(
62+
local call_opts = {
63+
mode = opts.mode or 'read',
64+
prefer_replica = opts.prefer_replica,
65+
balance = opts.balance,
66+
timeout = opts.timeout,
67+
}
68+
local storage_result, err = call.single(
6469
bucket_id, GET_FUNC_NAME,
6570
{space_name, key, opts.fields},
66-
{timeout = opts.timeout}
71+
call_opts
6772
)
6873

6974
if err ~= nil then
@@ -101,6 +106,12 @@ end
101106
-- Bucket ID
102107
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
103108
--
109+
-- @tparam ?boolean opts.prefer_replica
110+
-- Call on replica if it's possible
111+
--
112+
-- @tparam ?boolean opts.balance
113+
-- Use replica according to round-robin load balancing
114+
--
104115
-- @return[1] object
105116
-- @treturn[2] nil
106117
-- @treturn[2] table Error description
@@ -110,6 +121,9 @@ function get.call(space_name, key, opts)
110121
timeout = '?number',
111122
bucket_id = '?number|cdata',
112123
fields = '?table',
124+
prefer_replica = '?boolean',
125+
balance = '?boolean',
126+
mode = '?string',
113127
})
114128

115129
return schema.wrap_func_reload(call_get_on_router, space_name, key, opts)

crud/insert.lua

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,14 @@ local function call_insert_on_router(space_name, tuple, opts)
6868
fields = opts.fields,
6969
}
7070

71-
local storage_result, err = call.rw_single(
71+
local call_opts = {
72+
mode = 'write',
73+
timeout = opts.timeout,
74+
}
75+
local storage_result, err = call.single(
7276
bucket_id, INSERT_FUNC_NAME,
7377
{space_name, tuple, insert_on_storage_opts},
74-
{timeout = opts.timeout}
78+
call_opts
7579
)
7680

7781
if err ~= nil then

crud/replace.lua

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,14 @@ local function call_replace_on_router(space_name, tuple, opts)
7272
fields = opts.fields,
7373
}
7474

75-
local storage_result, err = call.rw_single(
75+
local call_opts = {
76+
mode = 'write',
77+
timeout = opts.timeout,
78+
}
79+
local storage_result, err = call.single(
7680
bucket_id, REPLACE_FUNC_NAME,
7781
{space_name, tuple, insert_on_storage_opts},
78-
{timeout = opts.timeout}
82+
call_opts
7983
)
8084

8185
if err ~= nil then

0 commit comments

Comments
 (0)