Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,598 changes: 1,598 additions & 0 deletions src/v/pandaproxy/api/api-doc/schema_registry.json

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions src/v/pandaproxy/schema_registry/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ redpanda_cc_library(
"avro.h",
"compatibility.h",
"configuration.h",
"context_router.h",
"error.h",
"errors.h",
"exceptions.h",
Expand Down Expand Up @@ -162,6 +163,7 @@ redpanda_cc_library(
"//src/v/kafka/client:configuration",
"//src/v/kafka/client:exceptions",
"//src/v/pandaproxy:json",
"//src/v/pandaproxy:parsing",
"@seastar",
],
)
Expand Down Expand Up @@ -251,6 +253,7 @@ redpanda_cc_library(
":types",
"//src/v/config:startup_config",
"//src/v/kafka/client",
"//src/v/utils:variant",
],
)

Expand Down
25 changes: 19 additions & 6 deletions src/v/pandaproxy/schema_registry/auth.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "pandaproxy/schema_registry/types.h"
#include "pandaproxy/server.h"
#include "security/acl.h"
#include "utils/variant.h"

#include <variant>

Expand Down Expand Up @@ -42,27 +43,39 @@ class auth {
// AuthZ is required to be performed in the handler as the resource is
// unknown
using deferred = named_type<std::monostate, class deferred_tag>;
// AuthZ will be performed against the context-qualified subject extracted
// from both the {context} and {subject} path parameters
using context_prefix_subject
= named_type<std::monostate, class context_prefix_subject_tag>;

using op = security::acl_operation;
/// Authorization-time resource type.
using resource
= std::variant<none, deferred, global, context_subject, cluster>;
/// Route-registration-time resource type — includes
/// `context_prefix_subject`, which is resolved to `context_subject` before
/// authorization.
using route_resource = extend_variant_t<resource, context_prefix_subject>;

using regular_function_handler = ss::noncopyable_function<
ss::future<server::reply_t>(server::request_t, server::reply_t)>;
using deferred_function_handler = ss::noncopyable_function<ss::future<
server::reply_t>(
server::request_t, server::reply_t, std::optional<request_auth_result>)>;
using deferred_function_handler
= ss::noncopyable_function<ss::future<server::reply_t>(
server::request_t,
server::reply_t,
std::optional<request_auth_result>,
std::string_view operation_name)>;
using function_handler
= std::variant<regular_function_handler, deferred_function_handler>;

auth(level lvl, std::optional<op> op, resource res)
auth(level lvl, std::optional<op> op, route_resource res)
: _lvl{lvl}
, _op{op}
, _res{std::move(res)} {}

level get_level() const { return _lvl; }
std::optional<op> get_op() const { return _op; }
const resource& get_resource() const { return _res; }
const route_resource& get_resource() const { return _res; }
bool is_deferred() const {
return std::holds_alternative<auth::deferred>(get_resource());
}
Expand All @@ -77,7 +90,7 @@ class auth {
private:
level _lvl;
std::optional<op> _op;
resource _res;
route_resource _res;
};

} // namespace pandaproxy::schema_registry
30 changes: 17 additions & 13 deletions src/v/pandaproxy/schema_registry/authorization.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "container/chunked_hash_map.h"
#include "pandaproxy/api/api-doc/schema_registry.json.hh"
#include "pandaproxy/parsing/httpd.h"
#include "pandaproxy/schema_registry/context_router.h"
#include "pandaproxy/schema_registry/service.h"
#include "pandaproxy/schema_registry/sharded_store.h"
#include "pandaproxy/schema_registry/types.h"
Expand Down Expand Up @@ -54,15 +55,21 @@ namespace {

auth::resource
extract_resource_from_request(const server::request_t& rq, const auth& auth) {
auto resource = auth.get_resource();
ss::visit(
resource,
[&rq](context_subject& ctx_sub) {
ctx_sub = context_subject::from_string(
return ss::visit(
auth.get_resource(),
[&rq](const context_subject&) -> auth::resource {
return context_subject::from_string(
parse::request_param<ss::sstring>(*rq.req, "subject"));
},
[](const auto&) {});
return resource;
[&rq](const auth::context_prefix_subject&) -> auth::resource {
auto ctx = parse_normalized_context(*rq.req);
auto sub = parse::request_param<ss::sstring>(*rq.req, "subject");
if (!starts_with_context(sub)) {
sub = fmt::format(":{}:{}", ctx, sub);
}
return context_subject::from_string(sub);
},
[](const auto& res) -> auth::resource { return res; });
}

void throw_unauthorized() {
Expand Down Expand Up @@ -157,10 +164,9 @@ void handle_authz(

void handle_get_schemas_ids_id_authz(
const server::request_t& rq,
std::string_view operation_name,
std::optional<request_auth_result>& auth_result,
const chunked_vector<context_subject>& subjects) {
const auto& operation_name
= ss::httpd::schema_registry_json::get_schemas_ids_id.operations.nickname;
constexpr auto op = security::acl_operation::read;
if (!auth_result.has_value()) {
// ACLs or authentication is disabled
Expand Down Expand Up @@ -222,10 +228,9 @@ void handle_get_schemas_ids_id_authz(

void handle_get_subjects_authz(
const server::request_t& rq,
std::string_view operation_name,
std::optional<request_auth_result>& auth_result,
chunked_vector<context_subject>& subjects) {
const auto& operation_name
= ss::httpd::schema_registry_json::get_subjects.operations.nickname;
constexpr auto op = security::acl_operation::describe;

if (!auth_result.has_value()) {
Expand Down Expand Up @@ -286,11 +291,10 @@ void handle_get_subjects_authz(

ss::future<> handle_get_contexts_authz(
const server::request_t& rq,
std::string_view operation_name,
sharded_store& store,
std::optional<request_auth_result>& auth_result,
chunked_vector<context>& contexts) {
const auto& operation_name
= ss::httpd::schema_registry_json::get_contexts.operations.nickname;
constexpr auto op = security::acl_operation::describe;

if (!auth_result.has_value()) {
Expand Down
3 changes: 3 additions & 0 deletions src/v/pandaproxy/schema_registry/authorization.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ void handle_authz(

void handle_get_schemas_ids_id_authz(
const server::request_t& rq,
std::string_view operation_name,
std::optional<request_auth_result>& auth_result,
const chunked_vector<context_subject>& subjects);

void handle_get_subjects_authz(
const server::request_t& rq,
std::string_view operation_name,
std::optional<request_auth_result>& auth_result,
chunked_vector<context_subject>& subjects);

Expand All @@ -47,6 +49,7 @@ void handle_get_subjects_authz(
/// - Empty contexts: user needs sr_registry describe access
ss::future<> handle_get_contexts_authz(
const server::request_t& rq,
std::string_view operation_name,
sharded_store& store,
std::optional<request_auth_result>& auth_result,
chunked_vector<context>& contexts);
Expand Down
104 changes: 104 additions & 0 deletions src/v/pandaproxy/schema_registry/context_router.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2026 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#pragma once

#include "base/seastarx.h"
#include "pandaproxy/parsing/httpd.h"
#include "pandaproxy/schema_registry/errors.h"

#include <seastar/core/sstring.hh>
#include <seastar/http/request.hh>

#include <fmt/format.h>

#include <string_view>

namespace pandaproxy::schema_registry {

/// \brief Normalize a context name from a URL path parameter.
inline ss::sstring normalize_context(std::string_view ctx) {
if (ctx.starts_with(':')) {
ctx.remove_prefix(1);
}

if (ctx.ends_with(':')) {
ctx.remove_suffix(1);
}

if (ctx.find(':') != std::string_view::npos) {
throw as_exception(context_invalid(ctx));
}

if (!ctx.starts_with('.')) {
return {fmt::format(".{}", ctx)};
}
return ss::sstring(ctx);
}

/// \brief Parse the "context" path parameter and normalize it.
inline ss::sstring parse_normalized_context(const ss::http::request& req) {
return normalize_context(parse::request_param<ss::sstring>(req, "context"));
}

/// \brief Check if a string already has a context prefix.
inline bool starts_with_context(std::string_view s) {
return s.starts_with(":.") || s.starts_with(":*:");
}

/// \brief Scope the "subject" path parameter by prepending the context.
///
/// ctx must already be normalized (in the form ".name"). The resulting
/// subject is ":.ctx:subject".
inline void scope_subject_param(ss::http::request& req, std::string_view ctx) {
auto sub = req.get_path_param("subject");
if (!starts_with_context(sub)) {
req.param.set(
ss::sstring("subject"),
ss::sstring(fmt::format("/:{0}:{1}", ctx, sub)));
}
}

/// \brief Inject or prepend context into the "subject" query parameter.
///
/// ctx must already be normalized.
inline void scope_subject_query(ss::http::request& req, std::string_view ctx) {
auto existing = req.get_query_param("subject");
if (existing.empty()) {
req.set_query_param("subject", fmt::format(":{0}:", ctx));
} else if (!starts_with_context(existing)) {
req.set_query_param("subject", fmt::format(":{0}:{1}", ctx, existing));
}
}

/// \brief Inject or prepend context into the "subjectPrefix" query parameter.
///
/// ctx must already be normalized.
inline void
scope_subject_prefix_query(ss::http::request& req, std::string_view ctx) {
auto existing = req.get_query_param("subjectPrefix");
if (existing.empty()) {
req.set_query_param("subjectPrefix", fmt::format(":{0}:", ctx));
} else if (!starts_with_context(existing)) {
req.set_query_param(
"subjectPrefix", fmt::format(":{0}:{1}", ctx, existing));
}
}

/// \brief Inject the context as a context-only qualified subject path
/// parameter.
///
/// ctx must already be normalized.
inline void
inject_context_as_subject(ss::http::request& req, std::string_view ctx) {
req.param.set(
ss::sstring("subject"), ss::sstring(fmt::format("/:{0}:", ctx)));
}

} // namespace pandaproxy::schema_registry
4 changes: 4 additions & 0 deletions src/v/pandaproxy/schema_registry/error.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ struct error_category final : std::error_category {
return "The specified context is not empty";
case error_code::subject_invalid:
return "The specified subject is not valid";
case error_code::context_invalid:
return "The specified context name is not valid";
}
return "(unrecognized error)";
}
Expand Down Expand Up @@ -164,6 +166,8 @@ struct error_category final : std::error_category {
return reply_error_code::context_not_empty; // 42211
case error_code::subject_invalid:
return reply_error_code::subject_invalid; // 42208
case error_code::context_invalid:
return reply_error_code::bad_request; // 400
}
return {};
}
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ enum class error_code {
writes_disabled,
context_not_empty,
subject_invalid,
context_invalid,
};

std::error_code make_error_code(error_code);
Expand Down
6 changes: 6 additions & 0 deletions src/v/pandaproxy/schema_registry/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@ inline error_info context_not_empty(const context& ctx) {
fmt::format("The specified context '{}' is not empty.", ctx())};
}

inline error_info context_invalid(std::string_view ctx) {
return error_info{
error_code::context_invalid,
fmt::format("The specified context '{}' is not valid.", ctx)};
}

inline bool failed_subject_schema_lookup(std::error_code ec) {
return ec == error_code::subject_not_found
|| ec == error_code::subject_version_not_found;
Expand Down
Loading
Loading