Skip to content

Commit 5fa62ae

Browse files
authored
Fix using outdated space schema (#111)
The problem is that if schema is changed on storages, router still uses outdated schema from vshard connection. It leads to errors like "non-existent space" when space is already created. The solution is quite simple - all such errors leads to `conn:reload_schema()` call and then crud retries. In case of error on the storage-side (e.g. in `*_object` functions when object was flattened using outdated space format) space schema hash is returned. Router compares this hash with hash of schema that was used on router-side, and reloads schema if hashes are different. `conn:reload_schema()` is called by one fiber at time - if reloading is in progress, other fibers just wait for it to end.
1 parent 5c8e3d9 commit 5fa62ae

File tree

18 files changed

+1410
-225
lines changed

18 files changed

+1410
-225
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
77

88
## [Unreleased]
99

10+
### Fixed
11+
12+
* Using outdated schema on router-side
13+
1014
### Added
1115

1216
* Support for UUID field types and UUID values

crud/common/call.lua

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ function call.rw_single(bucket_id, func_name, func_args, options)
160160
)
161161
end
162162

163+
err = errors.wrap(err)
164+
163165
return nil, CallError:new(utils.format_replicaset_error(
164166
replicaset.uuid, "Function returned an error: %s", err
165167
))

crud/common/const.lua

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
local const = {}
2+
3+
const.RELOAD_RETRIES_NUM = 1
4+
const.RELOAD_SCHEMA_TIMEOUT = 3 -- 3 seconds
5+
6+
return const

crud/common/schema.lua

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
local fiber = require('fiber')
2+
local msgpack = require('msgpack')
3+
local digest = require('digest')
4+
local vshard = require('vshard')
5+
local errors = require('errors')
6+
local log = require('log')
7+
8+
local ReloadSchemaError = errors.new_class('ReloadSchemaError', {capture_stack = false})
9+
10+
local const = require('crud.common.const')
11+
12+
local schema = {}
13+
14+
local function table_len(t)
15+
local len = 0
16+
for _ in pairs(t) do
17+
len = len + 1
18+
end
19+
return len
20+
end
21+
22+
local function call_reload_schema_on_replicaset(replicaset, channel)
23+
replicaset.master.conn:reload_schema()
24+
channel:put(true)
25+
end
26+
27+
local function call_reload_schema(replicasets)
28+
local replicasets_num = table_len(replicasets)
29+
local channel = fiber.channel(replicasets_num)
30+
31+
local fibers = {}
32+
for _, replicaset in pairs(replicasets) do
33+
local f = fiber.new(call_reload_schema_on_replicaset, replicaset, channel)
34+
table.insert(fibers, f)
35+
end
36+
37+
for _ = 1,replicasets_num do
38+
if channel:get(const.RELOAD_SCHEMA_TIMEOUT) == nil then
39+
for _, f in ipairs(fibers) do
40+
if fiber:status() ~= 'dead' then
41+
f:cancel()
42+
end
43+
end
44+
return nil, ReloadSchemaError:new("Reloading schema timed out")
45+
end
46+
end
47+
48+
return true
49+
end
50+
51+
local reload_in_progress = false
52+
local reload_schema_cond = fiber.cond()
53+
54+
local function reload_schema(replicasets)
55+
if reload_in_progress then
56+
if not reload_schema_cond:wait(const.RELOAD_SCHEMA_TIMEOUT) then
57+
return nil, ReloadSchemaError:new('Waiting for schema to be reloaded is timed out')
58+
end
59+
else
60+
reload_in_progress = true
61+
62+
local ok, err = call_reload_schema(replicasets)
63+
if not ok then
64+
return nil, err
65+
end
66+
67+
reload_schema_cond:broadcast()
68+
reload_in_progress = false
69+
end
70+
71+
return true
72+
end
73+
74+
-- schema.wrap_func_reload calls func with specified arguments.
75+
-- func should return `res, err, need_reload`
76+
-- If function returned error and `need_reload` is true,
77+
-- then schema is reloaded and one more attempt is performed
78+
-- (but no more than RELOAD_RETRIES_NUM).
79+
-- This wrapper is used for functions that can fail if router uses outdated
80+
-- space schema. In case of such errors these functions returns `need_reload`
81+
-- for schema-dependent errors.
82+
function schema.wrap_func_reload(func, ...)
83+
local i = 0
84+
85+
local res, err, need_reload
86+
while true do
87+
res, err, need_reload = func(...)
88+
89+
if err == nil or not need_reload then
90+
break
91+
end
92+
93+
local ok, reload_schema_err = reload_schema(vshard.router.routeall())
94+
if not ok then
95+
log.warn("Failed to reload schema: %s", reload_schema_err)
96+
break
97+
end
98+
99+
i = i + 1
100+
if i > const.RELOAD_RETRIES_NUM then
101+
break
102+
end
103+
end
104+
105+
return res, err
106+
end
107+
108+
local function get_space_schema_hash(space)
109+
if space == nil then
110+
return ''
111+
end
112+
113+
local indexes_info = {}
114+
for i = 0, table.maxn(space.index) do
115+
local index = space.index[i]
116+
if index ~= nil then
117+
indexes_info[i] = {
118+
unique = index.unique,
119+
parts = index.parts,
120+
id = index.id,
121+
type = index.type,
122+
name = index.name,
123+
path = index.path,
124+
}
125+
end
126+
end
127+
128+
local space_info = {
129+
format = space:format(),
130+
indexes = indexes_info,
131+
}
132+
133+
return digest.murmur(msgpack.encode(space_info))
134+
end
135+
136+
-- schema.wrap_box_space_func_result pcalls some box.space function
137+
-- and returns its result as a table
138+
-- `{res = ..., err = ..., space_schema_hash = ...}`
139+
-- space_schema_hash is computed if function failed and
140+
-- `add_space_schema_hash` is true
141+
function schema.wrap_box_space_func_result(add_space_schema_hash, space, func_name, ...)
142+
local result = {}
143+
144+
local ok, func_res = pcall(space[func_name], space, ...)
145+
if not ok then
146+
result.err = func_res
147+
if add_space_schema_hash then
148+
result.space_schema_hash = get_space_schema_hash(space)
149+
end
150+
else
151+
result.res = func_res
152+
end
153+
154+
return result
155+
end
156+
157+
-- schema.result_needs_reload checks that schema reload can
158+
-- be helpful to avoid storage error.
159+
-- It checks if space_schema_hash returned by storage
160+
-- is the same as hash of space used on router.
161+
-- Note, that storage returns `space_schema_hash = nil`
162+
-- if reloading space format can't avoid the error.
163+
function schema.result_needs_reload(space, result)
164+
if result.space_schema_hash == nil then
165+
return false
166+
end
167+
return result.space_schema_hash ~= get_space_schema_hash(space)
168+
end
169+
170+
return schema

crud/common/utils.lua

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
local errors = require('errors')
22
local ffi = require('ffi')
3+
local vshard = require('vshard')
34

5+
local schema = require('crud.common.schema')
46
local dev_checks = require('crud.common.dev_checks')
57

68
local FlattenError = errors.new_class("FlattenError", {capture_stack = false})
79
local UnflattenError = errors.new_class("UnflattenError", {capture_stack = false})
810
local ParseOperationsError = errors.new_class('ParseOperationsError', {capture_stack = false})
911
local ShardingError = errors.new_class('ShardingError', {capture_stack = false})
12+
local GetSpaceFormatError = errors.new_class('GetSpaceFormatError', {capture_stack = false})
1013

1114
local utils = {}
1215

@@ -34,30 +37,52 @@ end
3437
function utils.get_space(space_name, replicasets)
3538
local replicaset = select(2, next(replicasets))
3639
local space = replicaset.master.conn.space[space_name]
40+
3741
return space
3842
end
3943

44+
function utils.get_space_format(space_name, replicasets)
45+
local space = utils.get_space(space_name, replicasets)
46+
if space == nil then
47+
return nil, GetSpaceFormatError:new("Space %q doesn't exist", space_name)
48+
end
49+
50+
local space_format = space:format()
51+
52+
return space_format
53+
end
54+
4055
local system_fields = { bucket_id = true }
4156

4257
function utils.flatten(object, space_format, bucket_id)
4358
if object == nil then return nil end
4459

4560
local tuple = {}
4661

62+
local fieldnames = {}
63+
4764
for fieldno, field_format in ipairs(space_format) do
48-
local value = object[field_format.name]
65+
local fieldname = field_format.name
66+
local value = object[fieldname]
4967

50-
if not system_fields[field_format.name] then
68+
if not system_fields[fieldname] then
5169
if not field_format.is_nullable and value == nil then
52-
return nil, FlattenError:new("Field %q isn't nullable", field_format.name)
70+
return nil, FlattenError:new("Field %q isn't nullable", fieldname)
5371
end
5472
end
5573

56-
if bucket_id ~= nil and field_format.name == 'bucket_id' then
74+
if bucket_id ~= nil and fieldname == 'bucket_id' then
5775
value = bucket_id
5876
end
5977

6078
tuple[fieldno] = value
79+
fieldnames[fieldname] = true
80+
end
81+
82+
for fieldname in pairs(object) do
83+
if not fieldnames[fieldname] then
84+
return nil, FlattenError:new("Unknown field %q is specified", fieldname)
85+
end
6186
end
6287

6388
return tuple
@@ -124,7 +149,7 @@ local function determine_enabled_features()
124149
enabled_tarantool_features.uuids = major >= 2 and (minor > 4 or minor == 4 and patch >= 1)
125150
end
126151

127-
local function tarantool_supports_fieldpaths()
152+
function utils.tarantool_supports_fieldpaths()
128153
if enabled_tarantool_features.fieldpaths == nil then
129154
determine_enabled_features()
130155
end
@@ -141,7 +166,7 @@ function utils.tarantool_supports_uuids()
141166
end
142167

143168
function utils.convert_operations(user_operations, space_format)
144-
if tarantool_supports_fieldpaths() then
169+
if utils.tarantool_supports_fieldpaths() then
145170
return user_operations
146171
end
147172

@@ -226,4 +251,29 @@ function utils.is_uuid(value)
226251
return ffi.istype(uuid_t, value)
227252
end
228253

254+
function utils.format_result(rows, space)
255+
return {
256+
metadata = table.copy(space:format()),
257+
rows = rows,
258+
}
259+
end
260+
261+
local function flatten_obj(space_name, obj)
262+
local space_format, err = utils.get_space_format(space_name, vshard.router.routeall())
263+
if err ~= nil then
264+
return nil, FlattenError:new("Failed to get space format: %s", err), true
265+
end
266+
267+
local tuple, err = utils.flatten(obj, space_format)
268+
if err ~= nil then
269+
return nil, FlattenError:new("Object is specified in bad format: %s", err), true
270+
end
271+
272+
return tuple
273+
end
274+
275+
function utils.flatten_obj_reload(space_name, obj)
276+
return schema.wrap_func_reload(flatten_obj, space_name, obj)
277+
end
278+
229279
return utils

0 commit comments

Comments
 (0)