Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions src/v/pandaproxy/schema_registry/avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,47 @@ result<void> sanitize(json::Value& v, sanitize_context& ctx);
result<void> sanitize(json::Value::Object& o, sanitize_context& ctx);
result<void> sanitize(json::Value::Array& a, sanitize_context& ctx);

bool is_avro_type_name(std::string_view name) {
return string_switch<bool>(name)
.match("null", true)
.match("boolean", true)
.match("int", true)
.match("long", true)
.match("float", true)
.match("double", true)
.match("bytes", true)
.match("string", true)
.match("record", true)
.match("enum", true)
.match("array", true)
.match("map", true)
.match("fixed", true)
.default_match(false);
}

/// Shorten a fully-qualified named type reference to its simple name when the
/// reference's namespace matches the enclosing namespace. Per the Avro spec
/// (Names, section 2), such a reference is semantically equivalent to the
/// unqualified form. This matches Confluent Schema Registry sanitization,
/// which consistently unqualifies references so that equivalent schemas
/// canonicalize identically.
void unqualify_type_reference(json::Value& val, sanitize_context& ctx) {
if (!val.IsString() || val.GetStringLength() == 0) {
return;
}
std::string_view sv{val.GetString(), val.GetStringLength()};
auto last_dot = sv.find_last_of('.');
if (last_dot == std::string_view::npos) {
return;
}
std::string_view namespace_part = sv.substr(0, last_dot);
std::string_view name_part = sv.substr(last_dot + 1);
if (namespace_part == ctx.ns.top() && !is_avro_type_name(name_part)) {
auto shortened = ss::sstring{name_part};
val.SetString(shortened.data(), shortened.length(), ctx.alloc);
}
}

result<void>
sanitize_union_symbol_name(json::Value& name, sanitize_context& ctx) {
// A name should have the leading dot stripped iff it's the only one
Expand All @@ -298,6 +339,8 @@ sanitize_union_symbol_name(json::Value& name, sanitize_context& ctx) {
// SetString uses memcpy, take a copy so the range doesn't overlap.
auto new_name = ss::sstring{fullname_sv};
name.SetString(new_name.data(), new_name.length(), ctx.alloc);
} else if (last_dot != std::string::npos) {
unqualify_type_reference(name, ctx);
}
return outcome::success();
}
Expand Down Expand Up @@ -339,6 +382,13 @@ result<void> sanitize_avro_type(
if (auto res = sanitize(i.value, ctx); !res.has_value()) {
return res;
}
if (i.value.IsString()) {
std::string_view member_name{
i.name.GetString(), i.name.GetStringLength()};
if (member_name == "items" || member_name == "values") {
unqualify_type_reference(i.value, ctx);
}
}
}
break;
case avro::AVRO_RECORD: {
Expand Down Expand Up @@ -456,6 +506,7 @@ result<void> sanitize(json::Value::Object& o, sanitize_context& ctx) {
if (res.has_error()) {
return res.assume_error();
} else if (t_it->value.GetType() == json::Type::kStringType) {
unqualify_type_reference(t_it->value, ctx);
std::string_view type_sv = {
t_it->value.GetString(), t_it->value.GetStringLength()};
auto res = sanitize_avro_type(o, type_sv, ctx);
Expand Down
93 changes: 93 additions & 0 deletions src/v/pandaproxy/schema_registry/test/sanitize_avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,96 @@ BOOST_AUTO_TEST_CASE(test_sanitize_avro_debzium) {
pps::sanitize_avro_schema_definition(debezium_schema.share()).value(),
debezium_schema);
}

// Schemas with qualified and unqualified named type references should
// normalize to the same form. Per the Avro spec (Names), an unqualified
// name is resolved relative to the enclosing namespace.

const pps::schema_definition qualified_items_ref{
R"({"type":"record","name":"Outer","namespace":"com.example","fields":[{"name":"items","type":{"type":"array","items":{"type":"record","name":"Inner","fields":[{"name":"val","type":"string"}]}}},{"name":"more","type":{"type":"array","items":"com.example.Inner"}}]})",
pps::schema_type::avro};

const pps::schema_definition unqualified_items_ref{
R"({"type":"record","name":"Outer","namespace":"com.example","fields":[{"name":"items","type":{"type":"array","items":{"type":"record","name":"Inner","fields":[{"name":"val","type":"string"}]}}},{"name":"more","type":{"type":"array","items":"Inner"}}]})",
pps::schema_type::avro};

BOOST_AUTO_TEST_CASE(test_sanitize_avro_normalize_items_type_reference) {
auto sanitized_a = pps::sanitize_avro_schema_definition(
qualified_items_ref.share())
.value();
auto sanitized_b = pps::sanitize_avro_schema_definition(
unqualified_items_ref.share())
.value();
BOOST_REQUIRE_EQUAL(sanitized_a, sanitized_b);
}

const pps::schema_definition qualified_union_ref{
R"({"type":"record","name":"Outer","namespace":"com.example","fields":[{"name":"inner","type":{"type":"record","name":"Inner","fields":[{"name":"val","type":"string"}]}},{"name":"ref","type":["null","com.example.Inner"]}]})",
pps::schema_type::avro};

const pps::schema_definition unqualified_union_ref{
R"({"type":"record","name":"Outer","namespace":"com.example","fields":[{"name":"inner","type":{"type":"record","name":"Inner","fields":[{"name":"val","type":"string"}]}},{"name":"ref","type":["null","Inner"]}]})",
pps::schema_type::avro};

BOOST_AUTO_TEST_CASE(test_sanitize_avro_normalize_union_type_reference) {
auto sanitized_a = pps::sanitize_avro_schema_definition(
qualified_union_ref.share())
.value();
auto sanitized_b = pps::sanitize_avro_schema_definition(
unqualified_union_ref.share())
.value();
BOOST_REQUIRE_EQUAL(sanitized_a, sanitized_b);
}

const pps::schema_definition qualified_field_type_ref{
R"({"type":"record","name":"Outer","namespace":"com.example","fields":[{"name":"inner","type":{"type":"record","name":"Inner","fields":[{"name":"val","type":"string"}]}},{"name":"ref","type":"com.example.Inner"}]})",
pps::schema_type::avro};

const pps::schema_definition unqualified_field_type_ref{
R"({"type":"record","name":"Outer","namespace":"com.example","fields":[{"name":"inner","type":{"type":"record","name":"Inner","fields":[{"name":"val","type":"string"}]}},{"name":"ref","type":"Inner"}]})",
pps::schema_type::avro};

BOOST_AUTO_TEST_CASE(test_sanitize_avro_normalize_field_type_reference) {
auto sanitized_a = pps::sanitize_avro_schema_definition(
qualified_field_type_ref.share())
.value();
auto sanitized_b = pps::sanitize_avro_schema_definition(
unqualified_field_type_ref.share())
.value();
BOOST_REQUIRE_EQUAL(sanitized_a, sanitized_b);
}

const pps::schema_definition qualified_map_values_ref{
R"({"type":"record","name":"Outer","namespace":"com.example","fields":[{"name":"inner","type":{"type":"record","name":"Inner","fields":[{"name":"val","type":"string"}]}},{"name":"lookup","type":{"type":"map","values":"com.example.Inner"}}]})",
pps::schema_type::avro};

const pps::schema_definition unqualified_map_values_ref{
R"({"type":"record","name":"Outer","namespace":"com.example","fields":[{"name":"inner","type":{"type":"record","name":"Inner","fields":[{"name":"val","type":"string"}]}},{"name":"lookup","type":{"type":"map","values":"Inner"}}]})",
pps::schema_type::avro};

BOOST_AUTO_TEST_CASE(test_sanitize_avro_normalize_map_values_reference) {
auto sanitized_a = pps::sanitize_avro_schema_definition(
qualified_map_values_ref.share())
.value();
auto sanitized_b = pps::sanitize_avro_schema_definition(
unqualified_map_values_ref.share())
.value();
BOOST_REQUIRE_EQUAL(sanitized_a, sanitized_b);
}

// When there is no enclosing namespace, unqualified names stay unqualified
// (nothing to strip) and fully-qualified names keep their dotted form since
// their namespace does not match the (empty) enclosing namespace.
const pps::schema_definition no_ns_unqualified_union_ref{
R"({"type":"record","name":"Outer","fields":[{"name":"inner","type":{"type":"record","name":"Inner","fields":[{"name":"val","type":"string"}]}},{"name":"ref","type":["null","Inner"]}]})",
pps::schema_type::avro};

BOOST_AUTO_TEST_CASE(test_sanitize_avro_no_normalize_without_namespace) {
auto sanitized = pps::sanitize_avro_schema_definition(
no_ns_unqualified_union_ref.share())
.value();
pps::schema_definition expected{
R"({"type":"record","name":"Outer","fields":[{"name":"inner","type":{"type":"record","name":"Inner","fields":[{"name":"val","type":"string"}]}},{"name":"ref","type":["null","Inner"]}]})",
pps::schema_type::avro};
BOOST_REQUIRE_EQUAL(sanitized, expected);
}
Loading