From e079de5cf82e75f2af492766efc3b93fab9c782e Mon Sep 17 00:00:00 2001 From: Robert Young Date: Fri, 15 Aug 2025 16:52:32 +1200 Subject: [PATCH 1/9] Add topic name lookup facility Signed-off-by: Robert Young --- proposals/008-topic-name-lookup-facility.md | 104 ++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 proposals/008-topic-name-lookup-facility.md diff --git a/proposals/008-topic-name-lookup-facility.md b/proposals/008-topic-name-lookup-facility.md new file mode 100644 index 0000000..b8617a2 --- /dev/null +++ b/proposals/008-topic-name-lookup-facility.md @@ -0,0 +1,104 @@ +# Proposal 008: Resolve Topic Names from Topic IDs in the Filter Framework + +## Summary + +This proposal outlines a new feature for the Kroxylicious filter framework: a mechanism to resolve topic names from their corresponding topic IDs. This will enable filters to implement business logic based on human-readable topic names, rather than the underlying Kafka implementation detail of topic IDs. + +--- + +## Current Situation + +Currently, the Kroxylicious framework does not provide a direct way for filters to translate a topic ID back into its topic name. Filters that receive requests containing only topic IDs have no simple, framework-supported method to determine the names of the topics they are interacting with. + +--- + +## Motivation + +Apache Kafka increasingly uses **Topic IDs** as unique and stable identifiers for topics, especially in newer RPCs. This approach robustly handles edge cases, such as when a topic is deleted and later recreated with the same name. You can find more details in [KIP-516: Topic Identifiers](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers). In Kafka 4.1.0 Produce Requests will begin using +topic ids, impacting multiple Filters offered by the project. + +However, humans and business rules operate on **topic names**, not opaque IDs. For example, a policy is typically defined as "Encrypt data for the `sensitive_data` topic" or "Validate schemas for the `invoices` topic." Kroxylicious filters, which enforce these policies, are naturally designed to work with topic names. + +To bridge this gap and allow filters to operate effectively, the framework must provide a reliable way to convert topic IDs back to their corresponding names. + +--- + +## Proposal + +I propose: +1. adding a new method to the `FilterContext` interface that allows filters to look up topic names for a given set of topic IDs. +2. This will drive a request to the upstream to learn the topic names. +3. We will cache the topic names at the edge of the proxy/upstream per channel. +4. Existing Filters that mutate MetadataResponse will compose with this API without modification. + +### 1. API Addition + +The following method will be added to the `FilterContext`: + +```java +/** + * Asynchronously resolves a set of topic UUIDs to their corresponding topic names. + * + * @param topicUuids A Set of topic UUIDs to resolve. + * @return A CompletionStage that will complete with an unmodifiable Map, + * mapping each topic UUID to its topic name. The stage will complete + * exceptionally if any UUID in the set cannot be resolved. + */ +CompletionStage> getTopicNames(Set topicUuids); +``` + +This method will internally send a `Metadata` request to the upstream Kafka cluster to fetch the required topic information. + +The operation will fail atomically to ensure filters do not operate on a partial or incomplete mapping of IDs to names. + +### 2. Composability + +A core value of Kroxylicious is filter composability. Other filters with virtualization capabilities (e.g., a multitenancy filter) may need to alter the topic names presented to downstream filters. + +To support this, the underlying `Metadata` request generated by this new method will flow through the standard filter chain. This allows any filter to intercept and mutate the `MetadataResponse`, ensuring seamless composition with existing filters like `Multitenant` without requiring any changes to them. + +### 3. Performance and Caching + +To avoid overwhelming the upstream cluster with frequent `Metadata` requests, the responses from this lookup should be cached. Equally we do not wish to impede client and other internal requests, we +should forward them to the upstream. + +I propose marking these framework-generated requests as a special class of "edge-cacheable" requests. This tells the framework that it is safe to learn the topic names and produce short-circuit responses without calling out to the upstream cluster, reducing redundant network traffic. + +The 'marking' can be implemented by tagging the RequestFrame and propagating it through the correlation manager. Then we can +configure a caching filter to only intercept requests and responses with that tag. This would be a framework only feature, not surfaced in the Filter API. + +The 'edge' aspect is saying that we want to cache what the upstream said the topic names are, at the edge of the Proxy -> Upstream, but they must then traverse the Filter chain to maintain composability. + +We can implement a very long lived cache invalidation strategy as Kafkatopics cannot be renamed. Inconsistencies will arise if the upstream topics are deleted but we should be able to rely on the upstream failing if we forward any interactions with deleted topics. + +### 4. Security + +Kafka ACLs may restrict a user's access to topic metadata. We must not introduce a security vulnerability that leaks information about topics to unauthorized users. + +Therefore, the initial implementation of the cache will be **per-channel**. This approach scopes the cache 1:1 with an upstream connection and its authenticated principal, ensuring that a user can only receive metadata they are authorized to see. In the future, we could explore a virtual-cluster-level cache, but this would require users to explicitly opt-in after being warned of the potential security trade-offs. + +--- + +## Affected Projects + +* **kroxylicious-api**: Affected by the addition of the new method to the `FilterContext` interface. +* **kroxylicious-runtime**: Affected by the implementation of the topic name resolution logic, including caching and request handling. + +--- + +## Compatibility + +The change to `FilterContext` is **backwards compatible**, as it involves adding a new default method. + +The underlying `Metadata` request will use the lowest API version capable of looking up topics by ID. The rationale is that if a client has received topic IDs, the upstream broker must be capable of resolving them via a `Metadata` request. + +--- + +## Rejected Alternatives + +An alternative considered was to introduce a new, specialized API for these resource requests instead of using the existing `Metadata` RPC flow. Filters like `Multitenant` would then need to implement this new API to intercept and modify the name lookup. + +* **Downside**: This approach complicates development for plugin authors by requiring them to implement an additional API. The existing filter chain is already well-suited for intercepting Kafka RPCs, and the short-circuiting logic for caching is already in place. +* **Upside**: A custom API could potentially be more optimized, as it would avoid the overhead of creating Kafka RPC objects. + +Ultimately, reusing the existing, familiar `Metadata` filter mechanism was chosen as it provides better composability and a simpler developer experience, which outweighs the minor potential for optimization. From eab10b8e68b8203992a5e92929e9b556d6ce9bbf Mon Sep 17 00:00:00 2001 From: Robert Young Date: Tue, 23 Sep 2025 11:42:06 +1200 Subject: [PATCH 2/9] Apply review feedback Signed-off-by: Robert Young --- proposals/008-topic-name-lookup-facility.md | 154 +++++++++++++------- 1 file changed, 101 insertions(+), 53 deletions(-) diff --git a/proposals/008-topic-name-lookup-facility.md b/proposals/008-topic-name-lookup-facility.md index b8617a2..5c0ce88 100644 --- a/proposals/008-topic-name-lookup-facility.md +++ b/proposals/008-topic-name-lookup-facility.md @@ -1,104 +1,152 @@ # Proposal 008: Resolve Topic Names from Topic IDs in the Filter Framework -## Summary + +* [Proposal 008: Resolve Topic Names from Topic IDs in the Filter Framework](#proposal-008-resolve-topic-names-from-topic-ids-in-the-filter-framework) + * [Summary](#summary) + * [Current Situation](#current-situation) + * [Motivation](#motivation) + * [Proposal](#proposal) + * [1. API Addition](#1-api-addition) + * [2. Metadata Request](#2-metadata-request) + * [3. Composability](#3-composability) + * [Affected Projects](#affected-projects) + * [Compatibility](#compatibility) + * [Rejected Alternatives](#rejected-alternatives) + * [Name-to-id lookup API](#name-to-id-lookup-api) + * [Separate Resource Request API for obtaining metadata](#separate-resource-request-api-for-obtaining-metadata) + * [Future Extension](#future-extension) + -This proposal outlines a new feature for the Kroxylicious filter framework: a mechanism to resolve topic names from their corresponding topic IDs. This will enable filters to implement business logic based on human-readable topic names, rather than the underlying Kafka implementation detail of topic IDs. +## Summary ---- +This proposal outlines a new feature for the Kroxylicious filter framework: a mechanism to resolve topic names from +their corresponding topic IDs. This will give Filter Authors a simple API that composes neatly with other Filters. ## Current Situation -Currently, the Kroxylicious framework does not provide a direct way for filters to translate a topic ID back into its topic name. Filters that receive requests containing only topic IDs have no simple, framework-supported method to determine the names of the topics they are interacting with. +Apache Kafka increasingly uses **Topic IDs** as unique and stable identifiers for topics, especially in newer RPCs. This +approach robustly handles edge cases, such as when a topic is deleted and later recreated with the same name. You can +find more details +in [KIP-516: Topic Identifiers](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers). In +Kafka 4.1.0 Produce Requests have begun using topic ids when the client supports it, impacting multiple Filters offered +by the project. ---- +Currently, the Kroxylicious framework does not provide a direct way for filters to translate a topic ID back into its +topic name. We have worked around this by intercepting the Api Versions response in those filters, and downgrading the +maximum Produce version to 12, before the introduction of topic ids to that RPC. Meaning clients will send topic names. ## Motivation -Apache Kafka increasingly uses **Topic IDs** as unique and stable identifiers for topics, especially in newer RPCs. This approach robustly handles edge cases, such as when a topic is deleted and later recreated with the same name. You can find more details in [KIP-516: Topic Identifiers](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers). In Kafka 4.1.0 Produce Requests will begin using -topic ids, impacting multiple Filters offered by the project. +Numerous Proxy Filters are topic-oriented, they need to select which topics to operate on and apply logic per topic. +Currently they identify topics by their names, so if they receive topic ids in the RPCS they will need to convert them +to names. This would require each individual Filter to make out-of-band Metadata requests, or intercept +downstream-client-initiated Metadata responses to obtain the names, and likely implement their own caching strategy to +reduce load on the upstream cluster. -However, humans and business rules operate on **topic names**, not opaque IDs. For example, a policy is typically defined as "Encrypt data for the `sensitive_data` topic" or "Validate schemas for the `invoices` topic." Kroxylicious filters, which enforce these policies, are naturally designed to work with topic names. +Theoretically, in future Filters could have the opposite problem if they are configured in terms of topic id, but +receive messages containing the name. -To bridge this gap and allow filters to operate effectively, the framework must provide a reliable way to convert topic IDs back to their corresponding names. +Filters in the chain are able to modify topic names and identifiers, therefore the names/identifiers visible to a Filter +instance depend on the manipulations of these other Filters. ---- +To bridge this gap and allow filters to operate effectively, the framework must provide a reliable way to convert topic +IDs back to their corresponding names, that composes with other Filters that manipulate the topic identifiers. ## Proposal I propose: -1. adding a new method to the `FilterContext` interface that allows filters to look up topic names for a given set of topic IDs. -2. This will drive a request to the upstream to learn the topic names. -3. We will cache the topic names at the edge of the proxy/upstream per channel. -4. Existing Filters that mutate MetadataResponse will compose with this API without modification. + +1. adding a [new method](#1-api-addition) to the `FilterContext` interface that allows filters to look up topic names + for a given set of topic IDs. +2. This will drive a [Metadata request](#2-metadata-request) to the upstream to learn the topic names. +3. If upstream Filters mutate the topic names, these mutations [will be reflected](#3-composability) in the topic names + returned by the new API ### 1. API Addition The following method will be added to the `FilterContext`: ```java +class TopicMappingException extends RuntimeException { +} + +/** +* topicName XOR exception will be non-null +*/ +record TopicNameResult(@Nullable String topicName, @Nullable TopicMappingException exception){} + /** * Asynchronously resolves a set of topic UUIDs to their corresponding topic names. * * @param topicUuids A Set of topic UUIDs to resolve. * @return A CompletionStage that will complete with an unmodifiable Map, - * mapping each topic UUID to its topic name. The stage will complete - * exceptionally if any UUID in the set cannot be resolved. + * mapping each topic UUID to its topic name. Every input topicUuid must have an entry + * in the result map. */ -CompletionStage> getTopicNames(Set topicUuids); +CompletionStage> getTopicNames(Set topicUuids); ``` -This method will internally send a `Metadata` request to the upstream Kafka cluster to fetch the required topic information. - -The operation will fail atomically to ensure filters do not operate on a partial or incomplete mapping of IDs to names. - -### 2. Composability - -A core value of Kroxylicious is filter composability. Other filters with virtualization capabilities (e.g., a multitenancy filter) may need to alter the topic names presented to downstream filters. - -To support this, the underlying `Metadata` request generated by this new method will flow through the standard filter chain. This allows any filter to intercept and mutate the `MetadataResponse`, ensuring seamless composition with existing filters like `Multitenant` without requiring any changes to them. +### 2. Metadata Request -### 3. Performance and Caching +This method will internally send a `Metadata` request to the upstream Kafka cluster to fetch the required topic +information. We will send the highest version supported by both the proxy and upstream broker. Ideally we would send +the same version that the downstream client would pick, but the proxy may not have observed a Metadata request from +a client on any given channel, in which case the Proxy must pick a version to use. -To avoid overwhelming the upstream cluster with frequent `Metadata` requests, the responses from this lookup should be cached. Equally we do not wish to impede client and other internal requests, we -should forward them to the upstream. +The operation will return as many mappings as it can, and describe any issues with obtaining individual UUIDs, such as +non-existence or authorization errors. -I propose marking these framework-generated requests as a special class of "edge-cacheable" requests. This tells the framework that it is safe to learn the topic names and produce short-circuit responses without calling out to the upstream cluster, reducing redundant network traffic. +### 3. Composability -The 'marking' can be implemented by tagging the RequestFrame and propagating it through the correlation manager. Then we can -configure a caching filter to only intercept requests and responses with that tag. This would be a framework only feature, not surfaced in the Filter API. +A core value of Kroxylicious is filter composability. Other filters with virtualization capabilities (e.g., a +multitenancy filter) may need to alter the topic names or ids presented to downstream filters. -The 'edge' aspect is saying that we want to cache what the upstream said the topic names are, at the edge of the Proxy -> Upstream, but they must then traverse the Filter chain to maintain composability. +To support this, the underlying `Metadata` request generated by this new method will flow through the standard filter +chain. This allows any filter to intercept and mutate the `MetadataResponse`, ensuring seamless composition with +existing filters like `Multitenant` without requiring any changes to them. -We can implement a very long lived cache invalidation strategy as Kafkatopics cannot be renamed. Inconsistencies will arise if the upstream topics are deleted but we should be able to rely on the upstream failing if we forward any interactions with deleted topics. - -### 4. Security - -Kafka ACLs may restrict a user's access to topic metadata. We must not introduce a security vulnerability that leaks information about topics to unauthorized users. +## Affected Projects -Therefore, the initial implementation of the cache will be **per-channel**. This approach scopes the cache 1:1 with an upstream connection and its authenticated principal, ensuring that a user can only receive metadata they are authorized to see. In the future, we could explore a virtual-cluster-level cache, but this would require users to explicitly opt-in after being warned of the potential security trade-offs. +* **kroxylicious-api**: Affected by the addition of the new method to the `FilterContext` interface. +* **kroxylicious-runtime**: Affected by the implementation of the topic name resolution logic ---- +## Compatibility -## Affected Projects +The change to `FilterContext` is **backwards compatible**, as it involves adding a new default method and result class -* **kroxylicious-api**: Affected by the addition of the new method to the `FilterContext` interface. -* **kroxylicious-runtime**: Affected by the implementation of the topic name resolution logic, including caching and request handling. +## Rejected Alternatives ---- +### Name-to-id lookup API -## Compatibility +We considered also implementing the inverse, looking up topic IDs when we only know the topic names. However, this +direction has more consistency issues, the topic id for a topic name can change if that topic is deleted and re-created. +Filters caching topic ids may not be exposed to the RPCs that could drive cache invalidation and could continue +operating with old topic ids. So we think starting with topic id -> topic name has fewer risks around cache +invalidation. -The change to `FilterContext` is **backwards compatible**, as it involves adding a new default method. +### Separate Resource Request API for obtaining metadata -The underlying `Metadata` request will use the lowest API version capable of looking up topics by ID. The rationale is that if a client has received topic IDs, the upstream broker must be capable of resolving them via a `Metadata` request. +An alternative considered was to introduce a new, specialized API for these resource requests instead of using the +existing `Metadata` RPC flow. Filters like `Multitenant` would then need to implement this new API to intercept and +modify the name lookup. ---- +* **Downside**: This approach complicates development for plugin authors by requiring them to implement an additional + API. The existing filter chain is already well-suited for intercepting Kafka RPCs, and the short-circuiting logic for + caching is already in place. +* **Upside**: A custom API could potentially be more optimized, as it would avoid the overhead of creating Kafka RPC + objects. -## Rejected Alternatives +Ultimately, reusing the existing, familiar `Metadata` filter mechanism was chosen as it provides better composability +and a simpler developer experience, which outweighs the minor potential for optimization. -An alternative considered was to introduce a new, specialized API for these resource requests instead of using the existing `Metadata` RPC flow. Filters like `Multitenant` would then need to implement this new API to intercept and modify the name lookup. +## Future Extension -* **Downside**: This approach complicates development for plugin authors by requiring them to implement an additional API. The existing filter chain is already well-suited for intercepting Kafka RPCs, and the short-circuiting logic for caching is already in place. -* **Upside**: A custom API could potentially be more optimized, as it would avoid the overhead of creating Kafka RPC objects. +In the future we could add Framework level caching. We could consider options like: -Ultimately, reusing the existing, familiar `Metadata` filter mechanism was chosen as it provides better composability and a simpler developer experience, which outweighs the minor potential for optimization. +1. caching the response from target cluster, so that all Filters for a given channel would use a single cache. Their + requests would still traverse the Filter change, but a component could short-circuit if it knows all the the names + for the desired topic ids. +2. caching per FilterHandler, we could cache per Filter instance that wants to learn topic names. +3. caching across channels, this would be complex if we wanted to ensure that users ACLs are honoured so that a user + cannot learn the names of topics they are not allowed to access. It is more achievable if the proxy is also 100% + responsible for authorization. \ No newline at end of file From e8b8aaa84eff0a58d2c6dd1c236921d9caf12a91 Mon Sep 17 00:00:00 2001 From: Robert Young Date: Wed, 8 Oct 2025 16:28:17 +1300 Subject: [PATCH 3/9] Apply feedback Signed-off-by: Robert Young --- proposals/008-topic-name-lookup-facility.md | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/proposals/008-topic-name-lookup-facility.md b/proposals/008-topic-name-lookup-facility.md index 5c0ce88..15ab62f 100644 --- a/proposals/008-topic-name-lookup-facility.md +++ b/proposals/008-topic-name-lookup-facility.md @@ -29,7 +29,7 @@ approach robustly handles edge cases, such as when a topic is deleted and later find more details in [KIP-516: Topic Identifiers](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers). In Kafka 4.1.0 Produce Requests have begun using topic ids when the client supports it, impacting multiple Filters offered -by the project. +by the Kroxylicious project. Currently, the Kroxylicious framework does not provide a direct way for filters to translate a topic ID back into its topic name. We have worked around this by intercepting the Api Versions response in those filters, and downgrading the @@ -70,20 +70,26 @@ The following method will be added to the `FilterContext`: class TopicMappingException extends RuntimeException { } +// the server responded with an error code +class KafkaServerErrorException(org.apache.kafka.common.protocol.Errors error) extends TopicMappingException { + +} + /** * topicName XOR exception will be non-null */ record TopicNameResult(@Nullable String topicName, @Nullable TopicMappingException exception){} /** - * Asynchronously resolves a set of topic UUIDs to their corresponding topic names. + * Asynchronously resolves a collection of topic UUIDs to their corresponding topic names. * * @param topicUuids A Set of topic UUIDs to resolve. - * @return A CompletionStage that will complete with an unmodifiable Map, - * mapping each topic UUID to its topic name. Every input topicUuid must have an entry - * in the result map. + * @return A CompletionStage that will complete with an unmodifiable Map, + * mapping each topic UUID to its topic name result. Every input topicUuid must have an entry + * in the result map. The stage may be completed exceptionally with an {@link TimeoutException} + * if the Server takes too long to respond. */ -CompletionStage> getTopicNames(Set topicUuids); +CompletionStage> topicNames(Collection topicUuids); ``` ### 2. Metadata Request From c69ac95117e0a8177c5e82b0a7d69384da6e58d6 Mon Sep 17 00:00:00 2001 From: Robert Young Date: Wed, 8 Oct 2025 16:34:51 +1300 Subject: [PATCH 4/9] Add javadoc for chaining guarantees Signed-off-by: Robert Young --- proposals/008-topic-name-lookup-facility.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/proposals/008-topic-name-lookup-facility.md b/proposals/008-topic-name-lookup-facility.md index 15ab62f..75ca1a7 100644 --- a/proposals/008-topic-name-lookup-facility.md +++ b/proposals/008-topic-name-lookup-facility.md @@ -88,6 +88,10 @@ record TopicNameResult(@Nullable String topicName, @Nullable TopicMappingExcepti * mapping each topic UUID to its topic name result. Every input topicUuid must have an entry * in the result map. The stage may be completed exceptionally with an {@link TimeoutException} * if the Server takes too long to respond. + *

Chained Computation stages

+ *

Default and asynchronous default computation stages chained to the returned + * {@link java.util.concurrent.CompletionStage} are guaranteed to be executed by the thread associated with the + * connection. See {@link io.kroxylicious.proxy.filter} for more details. */ CompletionStage> topicNames(Collection topicUuids); ``` From 6345e5629657f80a76e839244959cbed55c85241 Mon Sep 17 00:00:00 2001 From: Robert Young Date: Tue, 14 Oct 2025 15:22:46 +1300 Subject: [PATCH 5/9] Add topic name mapping Signed-off-by: Robert Young --- proposals/008-topic-name-lookup-facility.md | 38 ++++++++++++++++++--- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/proposals/008-topic-name-lookup-facility.md b/proposals/008-topic-name-lookup-facility.md index 75ca1a7..d86574f 100644 --- a/proposals/008-topic-name-lookup-facility.md +++ b/proposals/008-topic-name-lookup-facility.md @@ -80,20 +80,50 @@ class KafkaServerErrorException(org.apache.kafka.common.protocol.Errors error) e */ record TopicNameResult(@Nullable String topicName, @Nullable TopicMappingException exception){} +/** + * The result of discovering the topic names for a collection of topic ids + * @param topicNameResults + */ +public record TopicNameMapping(Map topicNameResults) { + + /** + * @return a map from topic id to topic name lookup exception for all failed lookups + */ + Map failedResults() { + return topicNameResults.entrySet().stream().filter(e -> e.getValue().exception() != null) + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().exception())); + } + + /** + * @return a map of all successfully discovered topic names + */ + Map successfulResults() { + return topicNameResults.entrySet().stream().filter(e -> e.getValue().topicName() != null) + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().topicName())); + } + + /** + * @return true if any of the results were failures + */ + boolean anyFailedResults() { + return topicNameResults.values().stream().anyMatch(topicNameResult -> topicNameResult.exception() != null); + } +} + /** * Asynchronously resolves a collection of topic UUIDs to their corresponding topic names. * - * @param topicUuids A Set of topic UUIDs to resolve. - * @return A CompletionStage that will complete with an unmodifiable Map, + * @param topicUuids A collection of topic UUIDs to resolve. + * @return A CompletionStage that will complete with a TopicNameMapping, * mapping each topic UUID to its topic name result. Every input topicUuid must have an entry - * in the result map. The stage may be completed exceptionally with an {@link TimeoutException} + * in the TopicNameMapping. The stage may be completed exceptionally with an {@link TimeoutException} * if the Server takes too long to respond. *

Chained Computation stages

*

Default and asynchronous default computation stages chained to the returned * {@link java.util.concurrent.CompletionStage} are guaranteed to be executed by the thread associated with the * connection. See {@link io.kroxylicious.proxy.filter} for more details. */ -CompletionStage> topicNames(Collection topicUuids); +CompletionStage topicNames(Collection topicUuids); ``` ### 2. Metadata Request From 4f33d0d61598803f88c629482351c3ec7a4e5aad Mon Sep 17 00:00:00 2001 From: Robert Young Date: Wed, 15 Oct 2025 15:23:01 +1300 Subject: [PATCH 6/9] Simplify interfaces Signed-off-by: Robert Young --- proposals/008-topic-name-lookup-facility.md | 70 +++++++++------------ 1 file changed, 28 insertions(+), 42 deletions(-) diff --git a/proposals/008-topic-name-lookup-facility.md b/proposals/008-topic-name-lookup-facility.md index d86574f..835baa7 100644 --- a/proposals/008-topic-name-lookup-facility.md +++ b/proposals/008-topic-name-lookup-facility.md @@ -19,7 +19,7 @@ ## Summary -This proposal outlines a new feature for the Kroxylicious filter framework: a mechanism to resolve topic names from +This proposal outlines a new feature for the Kroxylicious filter framework: a mechanism to map topic names from their corresponding topic IDs. This will give Filter Authors a simple API that composes neatly with other Filters. ## Current Situation @@ -49,7 +49,7 @@ receive messages containing the name. Filters in the chain are able to modify topic names and identifiers, therefore the names/identifiers visible to a Filter instance depend on the manipulations of these other Filters. -To bridge this gap and allow filters to operate effectively, the framework must provide a reliable way to convert topic +To bridge this gap and allow filters to operate effectively, the framework must provide a reliable way to map topic IDs back to their corresponding names, that composes with other Filters that manipulate the topic identifiers. ## Proposal @@ -59,71 +59,55 @@ I propose: 1. adding a [new method](#1-api-addition) to the `FilterContext` interface that allows filters to look up topic names for a given set of topic IDs. 2. This will drive a [Metadata request](#2-metadata-request) to the upstream to learn the topic names. -3. If upstream Filters mutate the topic names, these mutations [will be reflected](#3-composability) in the topic names - returned by the new API +3. If upstream Filters mutate the topic names and or errors, these mutations [will be reflected](#3-composability) in + the topic names and errors returned by the new API ### 1. API Addition -The following method will be added to the `FilterContext`: +The following members will be added to the `io.kroxylicious.proxy.filter`: ```java -class TopicMappingException extends RuntimeException { +class TopicNameMappingException extends RuntimeException { } -// the server responded with an error code -class KafkaServerErrorException(org.apache.kafka.common.protocol.Errors error) extends TopicMappingException { - -} - -/** -* topicName XOR exception will be non-null -*/ -record TopicNameResult(@Nullable String topicName, @Nullable TopicMappingException exception){} - /** * The result of discovering the topic names for a collection of topic ids - * @param topicNameResults */ -public record TopicNameMapping(Map topicNameResults) { - +public interface TopicNameMapping { /** - * @return a map from topic id to topic name lookup exception for all failed lookups + * @return true if there are any failures */ - Map failedResults() { - return topicNameResults.entrySet().stream().filter(e -> e.getValue().exception() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().exception())); - } + boolean anyFailures(); /** - * @return a map of all successfully discovered topic names + * @return map from topic id to successfully mapped topic name */ - Map successfulResults() { - return topicNameResults.entrySet().stream().filter(e -> e.getValue().topicName() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().topicName())); - } + Map topicNames(); /** - * @return true if any of the results were failures + * @return map from topic id to kafka server error */ - boolean anyFailedResults() { - return topicNameResults.values().stream().anyMatch(topicNameResult -> topicNameResult.exception() != null); - } + Map failures(); } +``` +The following method will be added to the `io.kroxylicious.proxy.filter.FilterContext`: + +```java /** - * Asynchronously resolves a collection of topic UUIDs to their corresponding topic names. - * - * @param topicUuids A collection of topic UUIDs to resolve. - * @return A CompletionStage that will complete with a TopicNameMapping, - * mapping each topic UUID to its topic name result. Every input topicUuid must have an entry - * in the TopicNameMapping. The stage may be completed exceptionally with an {@link TimeoutException} - * if the Server takes too long to respond. + * Maps all of the given {@code topicIds} to the current corresponding topic names. + * @param topicIds topic ids to resolve + * @return CompletionStage for a TopicNameMapping. The TopicNameMapping is guaranteed to contain either + * a name or {@link org.apache.kafka.common.protocol.Errors} for each topic id requested. If a name or + * error cannot be determined for any topic id then this stage will be completed exceptionally with a + * {@link TopicNameMappingException}. *

Chained Computation stages

*

Default and asynchronous default computation stages chained to the returned * {@link java.util.concurrent.CompletionStage} are guaranteed to be executed by the thread associated with the * connection. See {@link io.kroxylicious.proxy.filter} for more details. + *

*/ -CompletionStage topicNames(Collection topicUuids); +CompletionStage topicNames(Collection topicIds); ``` ### 2. Metadata Request @@ -134,7 +118,9 @@ the same version that the downstream client would pick, but the proxy may not ha a client on any given channel, in which case the Proxy must pick a version to use. The operation will return as many mappings as it can, and describe any issues with obtaining individual UUIDs, such as -non-existence or authorization errors. +non-existence or authorization errors. The TopicNameMapping will contain either a name or a kafka `Errors` for each +requested topic id. If we could not obtain Metadata for all the topic ids for any reason, then the `CompletionStage` will +be completed exceptionally with a `TopicNameMappingException`. ### 3. Composability From 9c6f346baafcf59bc0424202cebe0cba396d95a4 Mon Sep 17 00:00:00 2001 From: Robert Young Date: Wed, 15 Oct 2025 15:24:45 +1300 Subject: [PATCH 7/9] Correct TopicNameMapping description Signed-off-by: Robert Young --- proposals/008-topic-name-lookup-facility.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/proposals/008-topic-name-lookup-facility.md b/proposals/008-topic-name-lookup-facility.md index 835baa7..e6a3a2d 100644 --- a/proposals/008-topic-name-lookup-facility.md +++ b/proposals/008-topic-name-lookup-facility.md @@ -117,10 +117,9 @@ information. We will send the highest version supported by both the proxy and up the same version that the downstream client would pick, but the proxy may not have observed a Metadata request from a client on any given channel, in which case the Proxy must pick a version to use. -The operation will return as many mappings as it can, and describe any issues with obtaining individual UUIDs, such as -non-existence or authorization errors. The TopicNameMapping will contain either a name or a kafka `Errors` for each -requested topic id. If we could not obtain Metadata for all the topic ids for any reason, then the `CompletionStage` will -be completed exceptionally with a `TopicNameMappingException`. +The TopicNameMapping will contain either a name or a kafka `Errors` for each requested topic id. If we could not obtain +a name or Error from the MetadataResponse, for all the requested topic ids, then the `CompletionStage` will be completed +exceptionally with a `TopicNameMappingException`. ### 3. Composability From 35b73511a2d0e332cc22b5d236fe4ea95c196a3b Mon Sep 17 00:00:00 2001 From: Robert Young Date: Fri, 31 Oct 2025 16:50:52 +1300 Subject: [PATCH 8/9] Update and renumber Signed-off-by: Robert Young --- ...y.md => 007-topic-name-lookup-facility.md} | 75 +++++++++++++------ 1 file changed, 52 insertions(+), 23 deletions(-) rename proposals/{008-topic-name-lookup-facility.md => 007-topic-name-lookup-facility.md} (70%) diff --git a/proposals/008-topic-name-lookup-facility.md b/proposals/007-topic-name-lookup-facility.md similarity index 70% rename from proposals/008-topic-name-lookup-facility.md rename to proposals/007-topic-name-lookup-facility.md index e6a3a2d..285dabb 100644 --- a/proposals/008-topic-name-lookup-facility.md +++ b/proposals/007-topic-name-lookup-facility.md @@ -1,4 +1,4 @@ -# Proposal 008: Resolve Topic Names from Topic IDs in the Filter Framework +# Proposal 007: Resolve Topic Names from Topic IDs in the Filter Framework * [Proposal 008: Resolve Topic Names from Topic IDs in the Filter Framework](#proposal-008-resolve-topic-names-from-topic-ids-in-the-filter-framework) @@ -67,7 +67,28 @@ I propose: The following members will be added to the `io.kroxylicious.proxy.filter`: ```java -class TopicNameMappingException extends RuntimeException { +/** + * Indicates there was some problem obtaining a name for a topic id + */ +public class TopicNameMappingException extends RuntimeException { + private final Errors error; + + public TopicNameMappingException(Errors error) { + this(error, error.message(), error.exception()); + } + + public TopicNameMappingException(Errors error, String message) { + this(error, message, error.exception()); + } + + public TopicNameMappingException(Errors error, String message, Throwable cause) { + super(message, cause); + this.error = Objects.requireNonNull(error); + } + + public Errors getError() { + return error; + } } /** @@ -80,34 +101,41 @@ public interface TopicNameMapping { boolean anyFailures(); /** - * @return map from topic id to successfully mapped topic name + * @return immutable map from topic id to successfully mapped topic name */ Map topicNames(); /** - * @return map from topic id to kafka server error + * Describes the reason for every failed mapping. Expected exception types are: + *
    + *
  • {@link TopLevelMetadataErrorException} indicates that we attempted to obtain Metadata from upstream, but received a top-level error in the response
  • + *
  • {@link TopicLevelMetadataErrorException} indicates that we attempted to obtain Metadata from upstream, but received a topic-level error in the response
  • + *
  • {@link TopicNameMappingException} can be used to convey any other exception
  • + *
+ * All the exception types offer {@link TopicNameMappingException#getError()} for conveniently determining the cause. Unhandled + * exceptions will be mapped to an {@link Errors#UNKNOWN_SERVER_ERROR}. Callers will be able to use this to detect expected + * cases like {@link Errors#UNKNOWN_TOPIC_ID}. + * @return immutable map from topic id to kafka server error */ - Map failures(); + Map failures(); } - ``` The following method will be added to the `io.kroxylicious.proxy.filter.FilterContext`: ```java -/** - * Maps all of the given {@code topicIds} to the current corresponding topic names. - * @param topicIds topic ids to resolve - * @return CompletionStage for a TopicNameMapping. The TopicNameMapping is guaranteed to contain either - * a name or {@link org.apache.kafka.common.protocol.Errors} for each topic id requested. If a name or - * error cannot be determined for any topic id then this stage will be completed exceptionally with a - * {@link TopicNameMappingException}. - *

Chained Computation stages

- *

Default and asynchronous default computation stages chained to the returned - * {@link java.util.concurrent.CompletionStage} are guaranteed to be executed by the thread associated with the - * connection. See {@link io.kroxylicious.proxy.filter} for more details. - *

- */ -CompletionStage topicNames(Collection topicIds); + /** + * Attempts to map all the given {@code topicIds} to the current corresponding topic names. + * @param topicIds topic ids to map to names + * @return a CompletionStage that will be completed with a complete mapping, with every requested topic id mapped to either an + * {@link TopicNameMappingException} or a name. All failure modes should complete the stage with a TopicNameMapping, with the + * TopicNameMapping used to convey the reason for failure, rather than failing the Stage. + *

Chained Computation stages

+ *

Default and asynchronous default computation stages chained to the returned + * {@link java.util.concurrent.CompletionStage} are guaranteed to be executed by the thread + * associated with the connection. See {@link io.kroxylicious.proxy.filter} for more details. + *

+ */ + CompletionStage topicNames(Collection topicIds); ``` ### 2. Metadata Request @@ -117,9 +145,10 @@ information. We will send the highest version supported by both the proxy and up the same version that the downstream client would pick, but the proxy may not have observed a Metadata request from a client on any given channel, in which case the Proxy must pick a version to use. -The TopicNameMapping will contain either a name or a kafka `Errors` for each requested topic id. If we could not obtain -a name or Error from the MetadataResponse, for all the requested topic ids, then the `CompletionStage` will be completed -exceptionally with a `TopicNameMappingException`. +The TopicNameMapping will contain either a name or a kafka `TopicNameMappingException` for each requested topic id. +We strive to always provide a `TopicNameMapping` under the various failure scenarios, preferring to Map every +id to an exception, rather than fail the stage. The TopicNameMappingException offers a Kafka Errors enum for conveniently +detecting expected cases like `UNKNOWN_TOPIC_ID`, internal exceptions will be mapped to `UNKNOWN_SERVER_ERROR`. ### 3. Composability From a476d1831a26b69463db64fb62caf7c517d231df Mon Sep 17 00:00:00 2001 From: Robert Young Date: Fri, 31 Oct 2025 16:52:28 +1300 Subject: [PATCH 9/9] fixup number Signed-off-by: Robert Young --- proposals/007-topic-name-lookup-facility.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proposals/007-topic-name-lookup-facility.md b/proposals/007-topic-name-lookup-facility.md index 285dabb..47bd207 100644 --- a/proposals/007-topic-name-lookup-facility.md +++ b/proposals/007-topic-name-lookup-facility.md @@ -1,7 +1,7 @@ # Proposal 007: Resolve Topic Names from Topic IDs in the Filter Framework -* [Proposal 008: Resolve Topic Names from Topic IDs in the Filter Framework](#proposal-008-resolve-topic-names-from-topic-ids-in-the-filter-framework) +* [Proposal 007: Resolve Topic Names from Topic IDs in the Filter Framework](#proposal-007-resolve-topic-names-from-topic-ids-in-the-filter-framework) * [Summary](#summary) * [Current Situation](#current-situation) * [Motivation](#motivation)