From f8a3e53ef1ffeeaa0db1e7af8b983702788be5f6 Mon Sep 17 00:00:00 2001 From: Urjit Patel <105218041+Uzziee@users.noreply.github.com> Date: Tue, 18 Nov 2025 12:18:40 +0530 Subject: [PATCH 1/2] hot reload feature proposal Signed-off-by: Urjit Patel <105218041+Uzziee@users.noreply.github.com> --- proposals/012-hot-reload-feature.md | 905 ++++++++++++++++++++++++++++ 1 file changed, 905 insertions(+) create mode 100644 proposals/012-hot-reload-feature.md diff --git a/proposals/012-hot-reload-feature.md b/proposals/012-hot-reload-feature.md new file mode 100644 index 0000000..ca760fe --- /dev/null +++ b/proposals/012-hot-reload-feature.md @@ -0,0 +1,905 @@ +# Hot Reload Feature + +As of today, any changes to virtual cluster configs (addition/removal/modification) require a full restart of kroxylicious app. This proposal is to add a dynamic reload feature, which will enable operators to modify virtual cluster configurations (add/remove/modify clusters) while **maintaining service availability for unaffected clusters** without the need for full application restarts. This feature will transform Kroxylicious from a **"restart-to-configure"** system to a **"live-reconfiguration"** system + +This proposal is structured as a multi-part implementation to ensure clear separation of concerns and manageable development phases. + +- Part 1: Configuration Change Detection - This part focuses on monitoring configuration files, parsing changes, and comparing old vs new configurations to identify exactly which virtual clusters need to be restarted. It provides a clean interface that returns structured change operations (additions, removals, modifications) without actually performing any restart operations. + +- Part 2: Graceful Virtual Cluster Restart - This part handles the actual restart operations, including graceful connection draining, in-flight message completion, and rollback mechanisms. It takes the change decisions from Part 1 and executes them safely while ensuring minimal service disruption. + +# Part 1: Configuration Change Detection Framework + +With this framework, kroxylicious will be able to detect config file changes (using standard fileWatcher service) and using various detector interfaces, it will figure out which virtual clusters are added/removed or modified. The list of affected clusters will be then passed on to the Part 2 of this feature, where the clusters would be gracefully restarted (or rollbacked to previous stable state in case of any failures ) + +POC PR - https://github.com/kroxylicious/kroxylicious/pull/2901 + +## Core Classes & Structure + +1. **ConfigWatcherService** - File system monitoring and configuration loading orchestrator + - Monitors configuration file changes using Java NIO WatchService + - Parses YAML configuration files using the existing ConfigParser when changes are detected + - Provides graceful shutdown of executor services and watch resources + - Triggers configuration change callbacks asynchronously to initiate the hot-reload process + - Handles file parsing errors and continues monitoring + - `KafkaProxy.java` - As this is the entry point to the proxy app, this class will configure the callback which needs to be triggered when there is a config change. This class will also be responsible for setting up the ConfigurationChangeHandler and ConfigWatcherService + +``` +public final class KafkaProxy implements AutoCloseable { + ..... + public KafkaProxy(PluginFactoryRegistry pfr, Configuration config, Features features, Path configFilePath) { + ..... + // Initialize configuration change handler with direct list of detectors + this.configurationChangeHandler = new ConfigurationChangeHandler( + List.of( + new VirtualClusterChangeDetector(), + new FilterChangeDetector()), + virtualClusterManager); + } + + ... + + public CompletableFuture startConfigurationWatcher(Path configFilePath) { + ..... + this.configWatcherService = new ConfigWatcherService( + configFilePath, + this::handleConfigurationChange, + Duration.ofMillis(500) // 500ms debounce delay + ); + + return configWatcherService.start(); + } + ... + + public CompletableFuture stopConfigurationWatcher() { + if (configWatcherService == null) { + return CompletableFuture.completedFuture(null); + } + return configWatcherService.stop().thenRun(() -> { + configWatcherService = null; + }); + } + + private void handleConfigurationChange(Configuration newConfig) { + try { + Configuration newValidatedConfig = validate(newConfig, features); + Configuration oldConfig = this.config; + + // Create models once to avoid excessive logging during change detection + List oldModels = oldConfig.virtualClusterModel(pfr); + List newModels = newValidatedConfig.virtualClusterModel(pfr); + ConfigurationChangeContext changeContext = new ConfigurationChangeContext( + oldConfig, newValidatedConfig, oldModels, newModels); + + // Delegate to the configuration change handler + configurationChangeHandler.handleConfigurationChange(changeContext) + .thenRun(() -> { + // Update the stored configuration after successful hot-reload + this.config = newValidatedConfig; + // Synchronize the virtualClusterModels with the new configuration to ensure consistency + this.virtualClusterModels = newModels; + LOGGER.info("Configuration and virtual cluster models successfully updated"); + }); + } + catch (Exception e) { + LOGGER.error("Failed to validate or process configuration change", e); + } + } +} +``` +``` +public class ConfigWatcherService { + ... + public ConfigWatcherService(Path configFilePath, + Consumer onConfigurationChanged) { + + } + ... + public CompletableFuture start() {} + public CompletableFuture stop() {} + + //There will be more methods which will schedule a FileWatcher on the configpath + // and trigger the handleConfigurationChange() whenever there is a valid change + + private void handleConfigurationChange() { + ... + onConfigurationChanged.accept(newConfiguration); + ... + } + +} +``` + +2. **ConfigurationChangeHandler** - Orchestrates the entire configuration change process from detection to execution with rollback capability. + - This handler accepts a list of detector interfaces which run and identify which virtual clusters are affected. + - Once we get to know the list of clusters that need to be added/removed/restarted, this class will call the VirtualClusterManager methods to perform addition/deletion/restarts (This class will be discussed in part 2) + - This class also creates an instance of `ConfigurationChangeRollbackTracker`, which tracks what operations are being applied. So in case of any failures, the operations performed can be reversed to previous stable state. +``` +public class ConfigurationChangeHandler { + + public ConfigurationChangeHandler(List changeDetectors, + VirtualClusterManager virtualClusterManager) { + this.changeDetectors = List.copyOf(changeDetectors); + this.virtualClusterManager = virtualClusterManager; + ... + + /** + * Main entry point for handling configuration changes. + */ + public CompletableFuture handleConfigurationChange( + ConfigurationChangeContext changeContext) { + + // 1. Detect changes using all registered detectors + ChangeResult changes = detectChanges(changeContext); + + if (!changes.hasChanges()) { + LOGGER.info("No changes detected - hot-reload not needed"); + return CompletableFuture.completedFuture(null); + } + + // 2. Process changes with rollback tracking + ConfigurationChangeRollbackTracker rollbackTracker = new ConfigurationChangeRollbackTracker(); + + return processConfigurationChanges(changes, changeContext, rollbackTracker) + .thenRun(() -> { + LOGGER.info("Configuration hot-reload completed successfully - {} operations processed", + changes.getTotalOperations()); + }) + .whenComplete((result, throwable) -> { + if (throwable != null) { + LOGGER.error("Configuration change failed - initiating rollback", throwable); + performRollback(rollbackTracker); + } + }); + } + + /** + * Coordinates multiple change detectors and aggregates their results. + */ + private ChangeResult detectChanges(ConfigurationChangeContext context) { + Set allClustersToRemove = new LinkedHashSet<>(); + Set allClustersToAdd = new LinkedHashSet<>(); + Set allClustersToModify = new LinkedHashSet<>(); + + changeDetectors.forEach(detector -> { + try { + ChangeResult detectorResult = detector.detectChanges(context); + allClustersToRemove.addAll(detectorResult.clustersToRemove()); + allClustersToAdd.addAll(detectorResult.clustersToAdd()); + allClustersToModify.addAll(detectorResult.clustersToModify()); + } + catch (Exception e) { + LOGGER.error("Error in change detector '{}': {}", detector.getName(), e.getMessage(), e); + // Continue with other detectors even if one fails + } + }); + + return new ChangeResult( + new ArrayList<>(allClustersToRemove), + new ArrayList<>(allClustersToAdd), + new ArrayList<>(allClustersToModify)); + } + + /** + * Processes configuration changes in the correct order: Remove -> Modify -> Add + */ + private CompletableFuture processConfigurationChanges( + ChangeResult changes, + ConfigurationChangeContext context, + ConfigurationChangeRollbackTracker rollbackTracker) { + + // Sequential processing using stream.reduce() with CompletableFuture chaining + CompletableFuture chain = CompletableFuture.completedFuture(null); + + // All the below operations will happen by calling methods of `VirtualClusterManager` + // 1. Remove clusters first (to free up ports/resources) + // 2. Restart modified existing clusters + // 3. Add new clusters last + + return chain; + } + + /** + * Performs rollback of all successful operations in reverse order in failure + */ + private CompletableFuture performRollback(ConfigurationChangeRollbackTracker tracker) { + // Rollback in reverse order: Added -> Modified -> Removed + // Remove clusters that were successfully added + // Restore clusters that were modified (revert to old configuration) + // Re-add clusters that were removed + } + + +} +``` + +3. **ConfigurationChangeRollbackTracker** - This class maintains a record of all cluster operations (removals, modifications, additions) so they can be reversed if the overall configuration change fails. +``` +public class ConfigurationChangeRollbackTracker { + + /** + * Tracks a cluster removal operation. + */ + public void trackRemoval(String clusterName, VirtualClusterModel removedModel) {} + + /** + * Tracks a cluster modification operation. + */ + public void trackModification(String clusterName, VirtualClusterModel originalModel, VirtualClusterModel newModel) {} + + /** + * Tracks a cluster addition operation. + */ + public void trackAddition(String clusterName, VirtualClusterModel addedModel) {} +} +``` + +4. **ChangeDetector (Interface)** - Strategy pattern interface for different types of change detection. + - Currently we have only have one implementation - `VirtualClusterChangeDetector` which will detect changes in virtual cluster models, in future we can add a detector for filter changes aka `FilterChangeDetector` + - Provides a consistent API for comparing old vs new configurations via detectChanges() + - Returns structured ChangeResult objects with specific operations needed +``` +public interface ChangeDetector { + /** + * Name of this change detector for logging and debugging. + */ + String getName(); + + /** + * Detect configuration changes and return structured change information. + * @param context The configuration context containing old and new configurations + * @return ChangeResult containing categorized cluster operations + */ + ChangeResult detectChanges(ConfigurationChangeContext context); +} +``` +``` +public class VirtualClusterChangeDetector implements ChangeDetector { + + private static final Logger LOGGER = LoggerFactory.getLogger(VirtualClusterChangeDetector.class); + + @Override + public String getName() { + return "VirtualClusterChangeDetector"; + } + + @Override + public ChangeResult detectChanges(ConfigurationChangeContext context) { + // Check for modified clusters using equals() comparison + List modifiedClusters = findModifiedClusters(context); + + // Check for new clusters (exist in new but not old) + List newClusters = findNewClusters(context); + + // Check for removed clusters (exist in old but not new) + List removedClusters = findRemovedClusters(context); + + return new ChangeResult(removedClusters, newClusters, modifiedClusters); + } +``` + +5. **ConfigurationChangeContext (Record)** - Immutable data container providing context for change detection. + - Holds old and new Configuration objects for comparison +``` +public record ConfigurationChangeContext( + Configuration oldConfig, + Configuration newConfig, + List oldModels, + List newModels +) {} +``` + +6. **ChangeResult (Record)** - Contains lists of cluster names for each operation type (remove/add/modify) +``` +public record ChangeResult( + List clustersToRemove, + List clustersToAdd, + List clustersToModify +) {} +``` + +## Flow diagram + +Image + + + +# Part 2: Graceful Virtual Cluster Restart + +Part 2 of the hot-reload implementation focuses on gracefully restarting of virtual clusters. This component receives structured change operations from Part 1 and executes them in a carefully orchestrated sequence: **connection draining → resource deregistration → new resource registration → connection restoration.** + +The design emphasizes minimal service disruption by ensuring all in-flight Kafka requests complete before closing connections (or when a timeout is hit). + +## Core Classes & Structure + +1. **VirtualClusterManager** + - **What it does** - Acts as the high-level orchestrator for all virtual cluster lifecycle operations during hot-reload. `ConfigurationChangeHandler` calls the `VirtualClusterManager` to restart/add/remove clusters when there is a config change + - **Key Responsibilities:** + - **Cluster Addition**: Takes a new `VirtualClusterModel` and brings it online by registering it using `EndpointRegistry` + - **Cluster Removal**: Safely takes down an existing cluster by first draining all connections gracefully, then deregistering it using EndpointRegistry + - **Cluster Restart**: Performs a complete cluster reconfiguration by removing the old version and adding the new version with updated settings + - **Rollback Integration**: Automatically tracks all successful operations so they can be undone if later operations fail +``` +public class VirtualClusterManager { + + ... + public VirtualClusterManager(EndpointRegistry endpointRegistry, + ConnectionDrainManager connectionDrainManager) { + this.endpointRegistry = endpointRegistry; + this.connectionDrainManager = connectionDrainManager; + } + + /** + * Gracefully removes a virtual cluster by draining connections and deregistering endpoints. + */ + public CompletableFuture removeVirtualCluster(String clusterName, + List oldModels, + ConfigurationChangeRollbackTracker rollbackTracker) { + // 1. Find cluster model to remove + VirtualClusterModel clusterToRemove = findClusterModel(oldModels, clusterName); + + // 2. Drain connections gracefully (30s timeout) + return connectionDrainManager.gracefullyDrainConnections(clusterName, Duration.ofSeconds(30)) + .thenCompose(v -> { + // 3. Deregister all gateways from endpoint registry + var deregistrationFutures = clusterToRemove.gateways().values().stream() + .map(gateway -> endpointRegistry.deregisterVirtualCluster(gateway)) + .toArray(CompletableFuture[]::new); + + return CompletableFuture.allOf(deregistrationFutures); + }) + .thenRun(() -> { + // 4. Track removal for potential rollback + rollbackTracker.trackRemoval(clusterName, clusterToRemove); + LOGGER.info("Successfully removed virtual cluster '{}'", clusterName); + }); + } + + /** + * Restarts a virtual cluster with new configuration (remove + add). + */ + public CompletableFuture restartVirtualCluster(String clusterName, + List oldModels, + List newModels, + ConfigurationChangeRollbackTracker rollbackTracker) { + VirtualClusterModel oldModel = findClusterModel(oldModels, clusterName); + VirtualClusterModel newModel = findClusterModel(newModels, clusterName); + + // Step 1: Remove existing cluster (drain + deregister) + return removeVirtualCluster(clusterName, oldModels, rollbackTracker) + .thenCompose(v -> { + // Step 2: Add new cluster with updated configuration + return addVirtualCluster(clusterName, List.of(newModel), rollbackTracker); + }) + .thenRun(() -> { + // Step 3: Track modification and stop draining + rollbackTracker.trackModification(clusterName, oldModel, newModel); + connectionDrainManager.stopDraining(clusterName); + LOGGER.info("Successfully restarted virtual cluster '{}' with new configuration", clusterName); + }); + } + + /** + * Adds a new virtual cluster by registering endpoints and enabling connections. + */ + public CompletableFuture addVirtualCluster(String clusterName, + List newModels, + ConfigurationChangeRollbackTracker rollbackTracker) { + VirtualClusterModel newModel = findClusterModel(newModels, clusterName); + + return registerVirtualCluster(newModel) + .thenRun(() -> { + // Stop draining to allow new connections + connectionDrainManager.stopDraining(clusterName); + rollbackTracker.trackAddition(clusterName, newModel); + LOGGER.info("Successfully added new virtual cluster '{}'", clusterName); + }); + } + + /** + * Registers all gateways for a virtual cluster with the endpoint registry. + */ + private CompletableFuture registerVirtualCluster(VirtualClusterModel model) { + LOGGER.info("Registering virtual cluster '{}' with {} gateways", + model.getClusterName(), model.gateways().size()); + + var registrationFutures = model.gateways().values().stream() + .map(gateway -> endpointRegistry.registerVirtualCluster(gateway)) + .toArray(CompletableFuture[]::new); + + return CompletableFuture.allOf(registrationFutures) + .thenRun(() -> LOGGER.info("Successfully registered virtual cluster '{}' with all gateways", + model.getClusterName())); + } +} +``` + +2. **ConnectionDrainManager** + - **What it does** - Implements the graceful connection draining strategy during cluster restarts. This is what makes hot-reload "graceful" - it ensures that client requests in progress are completed rather than dropped. + - **Key Responsibilities:** + - **Draining Mode Control**: Starts/stops "draining mode" where new connections are rejected but existing ones continue + - **Backpressure Strategy**: Applies intelligent backpressure by disabling the channel`autoRead` on downstream channels while keeping upstream channels active. This is done so that any “new” client messages are rejected, while the upstream channel is kept open so that the existing inflight requests are delivered to kafka and their response are successfully delivered back to the client. + - **In-Flight Monitoring**: Continuously monitors pending Kafka requests and waits for them to complete before closing connections. This is done using `InFlightMessageTracker` class. + - **Explanation of the draining strategy** + - **Phase 1: Initiate Draining Mode** - Set cluster to "draining mode" in which any new connection attempts will be rejected. Then we proceed to gracefully closing the connection. + ``` + public CompletableFuture gracefullyDrainConnections(String clusterName, Duration totalTimeout) { + // 1. Get current connection and message state + int totalConnections = connectionTracker.getTotalConnectionCount(clusterName); + int totalInFlight = inFlightTracker.getTotalPendingRequestCount(clusterName); + + LOGGER.info("Starting graceful drain for cluster '{}' with {} connections and {} in-flight requests ({}s timeout)", + clusterName, totalConnections, totalInFlight, totalTimeout.getSeconds()); + + // 2. Enter draining mode - reject new connections + return startDraining(clusterName) + .thenCompose(v -> { + if (totalConnections == 0) { + // Fast path: no connections to drain + return CompletableFuture.completedFuture(null); + } else { + // Proceed with connection closure + return gracefullyCloseConnections(clusterName, totalTimeout); + } + }); + } + + public CompletableFuture startDraining(String clusterName) { + drainingClusters.put(clusterName, new AtomicBoolean(true)); + return CompletableFuture.completedFuture(null); + } + ``` + - **Phase 2: Apply Backpressure Strategy** - we set `autoRead = false` only on the downstream channel to reject any new client messages. `ConnectionTracker` class tracks which downstream/upstream channels are active for a given cluster name. + - **Downstream (Client→Proxy)** - `autoRead = false` - Prevents clients from sending NEW requests while allowing existing requests to complete + - **Upstream (Proxy→Kafka)** - `autoRead = true` - Allows Kafka responses to flow back to complete pending requests. In-flight request count decreases naturally as responses arrive + ``` + public CompletableFuture gracefullyCloseConnections(String clusterName, Duration timeout) { + // 1. Get separate channel collections + Set downstreamChannels = connectionTracker.getDownstreamActiveChannels(clusterName); + Set upstreamChannels = connectionTracker.getUpstreamActiveChannels(clusterName); + + // 2. Apply different strategies to different channel types + var allCloseFutures = new ArrayList>(); + + // Add downstream channel close futures + downstreamChannels.stream() + .map(this::disableAutoReadOnDownstreamChannel) + .map(channel -> gracefullyCloseChannel(channel, clusterName, timeout, "DOWNSTREAM")) + .forEach(allCloseFutures::add); + + // Add upstream channel close futures + upstreamChannels.stream() + .map(channel -> gracefullyCloseChannel(channel, clusterName, timeout, "UPSTREAM")) + .forEach(allCloseFutures::add); + + return CompletableFuture.allOf(allCloseFutures.toArray(new CompletableFuture[0])); + } + + private Channel disableAutoReadOnDownstreamChannel(Channel downstreamChannel) { + try { + if (downstreamChannel.isActive()) { + // Get the KafkaProxyFrontendHandler from the channel pipeline + KafkaProxyFrontendHandler frontendHandler = downstreamChannel.pipeline().get(KafkaProxyFrontendHandler.class); + if (frontendHandler != null) { + frontendHandler.applyBackpressure(); + LOGGER.debug("Applied backpressure via frontend handler for channel: L:/{}, R:/{}", + downstreamChannel.localAddress(), downstreamChannel.remoteAddress()); + } + else { + LOGGER.debug("Manually applying backpressure for channel: L:/{}, R:/{}", + downstreamChannel.localAddress(), downstreamChannel.remoteAddress()); + // Fallback to manual method if handler not found + downstreamChannel.config().setAutoRead(false); + } + } + } + catch (Exception e) { + LOGGER.warn("Failed to disable autoRead for downstream channel L:/{}, R:/{} - continuing with drain", + downstreamChannel.localAddress(), downstreamChannel.remoteAddress(), e); + } + return downstreamChannel; + } + ``` + + - **Phase 3: Monitor In-Flight Message Completion and close channel** - Monitor in-flight count every 100ms for draining while waiting for in-flight count to reach zero naturally. If for some reason, the in-flight count does not reach zero (hangs, could be due to underlying kafka going down), force close after timeout to prevent indefinite hangs. Once in-flight count reaches zero (or after the timeout), close the channel immediately. + ``` + private CompletableFuture gracefullyCloseChannel(Channel channel, String clusterName, + String channelType, Duration timeout) { + CompletableFuture future = new CompletableFuture<>(); + long startTime = System.currentTimeMillis(); + + // Schedule timeout + ScheduledFuture timeoutTask = scheduler.schedule(() -> { + if (!future.isDone()) { + LOGGER.warn("Graceful shutdown timeout exceeded for {} channel L:/{}, R:/{} in cluster '{}' - forcing immediate closure", + channelType, channel.localAddress(), channel.remoteAddress(), clusterName); + closeChannelImmediately(channel, future); + } + }, timeoutMillis, TimeUnit.MILLISECONDS); + + // Schedule periodic checks for in-flight messages + ScheduledFuture checkTask = scheduler.scheduleAtFixedRate(() -> { + try { + if (future.isDone()) { + return; + } + + int pendingRequests = inFlightTracker.getPendingRequestCount(clusterName, channel); + long elapsed = System.currentTimeMillis() - startTime; + + if (pendingRequests == 0) { + LOGGER.info("In-flight messages cleared for {} channel L:/{}, R:/{} in cluster '{}' - proceeding with connection closure ({}ms elapsed)", + channelType, channel.localAddress(), channel.remoteAddress(), clusterName, elapsed); + closeChannelImmediately(channel, future); + } + else { + // Just wait for existing in-flight messages to complete naturally + // Do NOT call channel.read() as it would trigger processing of new messages + int totalPending = inFlightTracker.getTotalPendingRequestCount(clusterName); + LOGGER.debug("Waiting for {} channel L:/{}, R:/{} in cluster '{}' to drain: {} pending requests (cluster total: {}, {}ms elapsed)", + channelType, channel.localAddress(), channel.remoteAddress(), clusterName, pendingRequests, totalPending, elapsed); + } + } + catch (Exception e) { + LOGGER.error("Unexpected error during graceful shutdown monitoring for channel L:/{}, R:/{} in cluster '{}'", + channel.localAddress(), channel.remoteAddress(), clusterName, e); + future.completeExceptionally(e); + } + }, 50, 100, TimeUnit.MILLISECONDS); // Check every 100ms for faster response + + // Cancel scheduled tasks when future completes and log final result + future.whenComplete((result, throwable) -> { + timeoutTask.cancel(false); + checkTask.cancel(false); + + if (throwable == null) { + LOGGER.info("Successfully completed graceful shutdown of {} channel L:/{}, R:/{} in cluster '{}'", + channelType, channel.localAddress(), channel.remoteAddress(), clusterName); + } + else { + LOGGER.error("Graceful shutdown failed for {} channel L:/{}, R:/{} in cluster '{}': {}", + channelType, channel.localAddress(), channel.remoteAddress(), clusterName, throwable.getMessage()); + } + }); + + return future; + } + + private void closeChannelImmediately(Channel channel, CompletableFuture future) { + if (future.isDone()) { + return; + } + + channel.close().addListener(channelFuture -> { + if (channelFuture.isSuccess()) { + future.complete(null); + } + else { + future.completeExceptionally(channelFuture.cause()); + } + }); + } + ``` + - **How will drain mode reject new client connections ?** - For this, we will put a check in KafkaProxyFrontendHandler#channelActive method to reject new connections, if the particular cluster is in drain mode. + ``` + public class KafkaProxyFrontendHandler + extends ChannelInboundHandlerAdapter + implements NetFilter.NetFilterContext { + .... + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + this.clientCtx = ctx; + + // Check if we should accept this connection (not draining) + String clusterName = virtualClusterModel.getClusterName(); + if (connectionDrainManager != null && !connectionDrainManager.shouldAcceptConnection(clusterName)) { + LOGGER.info("Rejecting new connection for draining cluster '{}'", clusterName); + ctx.close(); + return; + } + + this.proxyChannelStateMachine.onClientActive(this); + super.channelActive(this.clientCtx); + } + .... + } + ``` + +3. **ConnectionTracker** + - **What it does** - Maintains real-time inventory of all active network connections per virtual cluster. You can't gracefully drain connections if you don't know what connections exist - this class provides that visibility. + - **Key Responsibilities:** + - **Bidirectional Tracking**: Separately tracks downstream connections (client→proxy) and upstream connections (proxy→Kafka) + - **Channel Management**: Maintains collections of active `Channel` objects for bulk operations like graceful closure + - **Lifecycle Integration**: Integrates with `ProxyChannelStateMachine` to automatically track connection establishment and closure + - **Cleanup Logic**: Automatically removes references to closed channels and cleans up empty cluster entries +``` +public class ConnectionTracker { + + // Downstream connections (client → proxy) + private final Map downstreamConnections = new ConcurrentHashMap<>(); + private final Map> downstreamChannelsByCluster = new ConcurrentHashMap<>(); + + // Upstream connections (proxy → target Kafka cluster) + private final Map upstreamConnections = new ConcurrentHashMap<>(); + private final Map> upstreamChannelsByCluster = new ConcurrentHashMap<>(); + + public void onDownstreamConnectionEstablished(String clusterName, Channel channel) { + downstreamConnections.computeIfAbsent(clusterName, k -> new AtomicInteger(0)).incrementAndGet(); + downstreamChannelsByCluster.computeIfAbsent(clusterName, k -> ConcurrentHashMap.newKeySet()).add(channel); + } + + public void onDownstreamConnectionClosed(String clusterName, Channel channel) { + onConnectionClosed(clusterName, channel, downstreamConnections, downstreamChannelsByCluster); + } + + + /** + Called by ConnectionDrainManager + */ + public Set getDownstreamActiveChannels(String clusterName) { + Set channels = downstreamChannelsByCluster.get(clusterName); + return channels != null ? Set.copyOf(channels) : Set.of(); + } + + // === UPSTREAM CONNECTION TRACKING === + public void onUpstreamConnectionEstablished(String clusterName, Channel channel) { + upstreamConnections.computeIfAbsent(clusterName, k -> new AtomicInteger(0)).incrementAndGet(); + upstreamChannelsByCluster.computeIfAbsent(clusterName, k -> ConcurrentHashMap.newKeySet()).add(channel); + } + + public void onUpstreamConnectionClosed(String clusterName, Channel channel) { + onConnectionClosed(clusterName, channel, upstreamConnections, upstreamChannelsByCluster); + } + + /** + Called by ConnectionDrainManager + */ + public Set getUpstreamActiveChannels(String clusterName) { + Set channels = upstreamChannelsByCluster.get(clusterName); + return channels != null ? Set.copyOf(channels) : Set.of(); + } + + /** + Called by ConnectionDrainManager + */ + public int getTotalConnectionCount(String clusterName) { + return getDownstreamActiveConnectionCount(clusterName) + getUpstreamActiveConnectionCount(clusterName); + } + + /** + * Common method to remove a connection and clean up empty entries. + * This method decrements the connection counter and removes the channel from the set, + * cleaning up empty entries to prevent memory leaks. + */ + private void onConnectionClosed(String clusterName, Channel channel, + Map connectionCounters, + Map> channelsByCluster) { + // Decrement counter and remove if zero or negative + AtomicInteger counter = connectionCounters.get(clusterName); + if (counter != null) { + counter.decrementAndGet(); + if (counter.get() <= 0) { + connectionCounters.remove(clusterName); + } + } + + // Remove channel from set and remove empty sets + Set channels = channelsByCluster.get(clusterName); + if (channels != null) { + channels.remove(channel); + if (channels.isEmpty()) { + channelsByCluster.remove(clusterName); + } + } + } +} +``` + +4. **InFlightMessageTracker** + - **What it does** - Tracks **pending Kafka requests** to ensure no messages are lost during connection closure. This enables the "wait for completion" strategy - connections are only closed after all pending requests have received responses. + - **Key Responsibilities:** + - **Request Tracking**: Increments counters when Kafka requests are sent upstream in `ProxyChannelStateMachine` + - **Response Tracking**: Decrements counters when Kafka responses are received in `ProxyChannelStateMachine` + - **Channel Cleanup**: Handles cleanup when channels close unexpectedly, adjusting counts appropriately +``` +public class InFlightMessageTracker { + + // Map from cluster name to channel to pending request count + private final Map> pendingRequests = new ConcurrentHashMap<>(); + + // Map from cluster name to total pending requests for quick lookup + private final Map totalPendingByCluster = new ConcurrentHashMap<>(); + + /** + * Records that a request has been sent to the upstream cluster. + * + * @param clusterName The name of the virtual cluster. + * @param channel The channel handling the request. + */ + public void onRequestSent(String clusterName, Channel channel) { + pendingRequests.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>()) + .computeIfAbsent(channel, k -> new AtomicInteger(0)) + .incrementAndGet(); + + totalPendingByCluster.computeIfAbsent(clusterName, k -> new AtomicInteger(0)) + .incrementAndGet(); + } + + /** + * Records that a response has been received from the upstream cluster. + * + * @param clusterName The name of the virtual cluster. + * @param channel The channel handling the response. + */ + public void onResponseReceived(String clusterName, Channel channel) { + Map clusterRequests = pendingRequests.get(clusterName); + if (clusterRequests != null) { + AtomicInteger channelCounter = clusterRequests.get(channel); + if (channelCounter != null) { + int remaining = channelCounter.decrementAndGet(); + if (remaining <= 0) { + clusterRequests.remove(channel); + if (clusterRequests.isEmpty()) { + pendingRequests.remove(clusterName); + } + } + + AtomicInteger totalCounter = totalPendingByCluster.get(clusterName); + if (totalCounter != null) { + int totalRemaining = totalCounter.decrementAndGet(); + if (totalRemaining <= 0) { + totalPendingByCluster.remove(clusterName); + } + } + } + } + } + + /** + * Records that a channel has been closed, clearing all pending requests for that channel. + * + * @param clusterName The name of the virtual cluster. + * @param channel The channel that was closed. + */ + public void onChannelClosed(String clusterName, Channel channel) { + Map clusterRequests = pendingRequests.get(clusterName); + if (clusterRequests != null) { + AtomicInteger channelCounter = clusterRequests.remove(channel); + if (channelCounter != null) { + int pendingCount = channelCounter.get(); + if (pendingCount > 0) { + // Subtract from total + AtomicInteger totalCounter = totalPendingByCluster.get(clusterName); + if (totalCounter != null) { + int newTotal = totalCounter.addAndGet(-pendingCount); + if (newTotal <= 0) { + totalPendingByCluster.remove(clusterName); + } + } + } + } + + if (clusterRequests.isEmpty()) { + pendingRequests.remove(clusterName); + } + } + } + + /** + * Gets the number of pending requests for a specific channel in a virtual cluster. + * + * @param clusterName The name of the virtual cluster. + * @param channel The channel. + * @return The number of pending requests. + */ + public int getPendingRequestCount(String clusterName, Channel channel) { + Map clusterRequests = pendingRequests.get(clusterName); + if (clusterRequests != null) { + AtomicInteger counter = clusterRequests.get(channel); + return counter != null ? Math.max(0, counter.get()) : 0; + } + return 0; + } + + /** + * Gets the total number of pending requests for a virtual cluster across all channels. + * + * @param clusterName The name of the virtual cluster. + * @return The total number of pending requests. + */ + public int getTotalPendingRequestCount(String clusterName) { + AtomicInteger counter = totalPendingByCluster.get(clusterName); + return counter != null ? Math.max(0, counter.get()) : 0; + } +} +``` + +5. **Changes in ProxyChannelStateMachine** - We need to enhance the existing state machine for + - **Connection Lifecycle**: Automatically notifies ConnectionTracker when connections are established/closed + - **In-flight Message Tracking**: Automatically notifies InFlightMessageTracker when requests/responses flow through + +Example code changes for existing ProxyChannelStateMachine methods +``` +void messageFromServer(Object msg) { + // Track responses received from upstream Kafka (completing in-flight requests) + if (inFlightTracker != null && msg instanceof ResponseFrame && backendHandler != null) { + inFlightTracker.onResponseReceived(clusterName, backendHandler.serverCtx().channel()); + } + + .... + + // Track responses being sent to client on downstream channel + if (inFlightTracker != null && msg instanceof ResponseFrame) { + inFlightTracker.onResponseReceived(clusterName, frontendHandler.clientCtx().channel()); + } +} + +void messageFromClient(Object msg) { + // Track requests being sent upstream (creating in-flight messages) + if (inFlightTracker != null && msg instanceof RequestFrame && backendHandler != null) { + inFlightTracker.onRequestSent(clusterName, backendHandler.serverCtx().channel()); + } + + .... +} + +void onClientRequest(SaslDecodePredicate dp, + Object msg) { + .... + + // Track requests received from client on downstream channel + if (inFlightTracker != null && msg instanceof RequestFrame) { + inFlightTracker.onRequestSent(clusterName, frontendHandler.clientCtx().channel()); + } + + .... +} + +void onServerInactive() { + // Track upstream connection closure + if (connectionTracker != null && backendHandler != null) { + connectionTracker.onUpstreamConnectionClosed(clusterName, backendHandler.serverCtx().channel()); + } + // Clear any pending in-flight messages for this upstream channel + if (inFlightTracker != null && backendHandler != null) { + inFlightTracker.onChannelClosed(clusterName, backendHandler.serverCtx().channel()); + } + + .... +} + +void onClientInactive() { + // Track downstream connection closure + if (connectionTracker != null && frontendHandler != null) { + connectionTracker.onDownstreamConnectionClosed(clusterName, frontendHandler.clientCtx().channel()); + } + // Clear any pending in-flight messages for this downstream channel + if (inFlightTracker != null && frontendHandler != null) { + inFlightTracker.onChannelClosed(clusterName, frontendHandler.clientCtx().channel()); + } + + .... +} + +private void toClientActive(ProxyChannelState.ClientActive clientActive, + KafkaProxyFrontendHandler frontendHandler) { + .... + // Track downstream connection establishment + if (connectionTracker != null) { + connectionTracker.onDownstreamConnectionEstablished(clusterName, frontendHandler.clientCtx().channel()); + } +} + +private void toForwarding(Forwarding forwarding) { + .... + // Track upstream connection establishment + if (connectionTracker != null && backendHandler != null) { + connectionTracker.onUpstreamConnectionEstablished(clusterName, backendHandler.serverCtx().channel()); + } +} +``` + +# Challenges/Open questions +- If for some reason, loading of the new cluster configs fails, the code will automatically rollback to the previous state. However this will put the app in such a state that the current config file content does not match with the actual running cluster config. +- What if the rollback fails (for some unforeseen reason), the only way for the operator to know this is via Logs. In such cases, a full app restart might be required. +- If there are multiple gateway nodes running, and if there is failure in few nodes, we may have to introduce some sort of status co-ordinator rather than relying that all instances will behave the same. From 2a9747e2c8e0e1fc6a53d50ddc48ea271b82d29f Mon Sep 17 00:00:00 2001 From: Urjit Patel <105218041+Uzziee@users.noreply.github.com> Date: Thu, 29 Jan 2026 09:45:49 +0530 Subject: [PATCH 2/2] replace file watcher with HTTP based approach Signed-off-by: Urjit Patel <105218041+Uzziee@users.noreply.github.com> --- proposals/012-hot-reload-feature.md | 1810 +++++++++++++++++++-------- 1 file changed, 1296 insertions(+), 514 deletions(-) diff --git a/proposals/012-hot-reload-feature.md b/proposals/012-hot-reload-feature.md index ca760fe..588714c 100644 --- a/proposals/012-hot-reload-feature.md +++ b/proposals/012-hot-reload-feature.md @@ -1,166 +1,438 @@ -# Hot Reload Feature +# Hot Reload Feature - HTTP-Based Approach -As of today, any changes to virtual cluster configs (addition/removal/modification) require a full restart of kroxylicious app. This proposal is to add a dynamic reload feature, which will enable operators to modify virtual cluster configurations (add/remove/modify clusters) while **maintaining service availability for unaffected clusters** without the need for full application restarts. This feature will transform Kroxylicious from a **"restart-to-configure"** system to a **"live-reconfiguration"** system +As of today, any changes to virtual cluster configs (addition/removal/modification) require a full restart of Kroxylicious app. +This proposal describes the dynamic reload feature, which enables operators to modify virtual cluster configurations (add/remove/modify clusters) while **maintaining service availability for unaffected clusters** without the need for full application restarts. +This feature transforms Kroxylicious from a **"restart-to-configure"** system to a **"live-reconfiguration"** system. -This proposal is structured as a multi-part implementation to ensure clear separation of concerns and manageable development phases. +## HTTP-Based vs File Watcher Approach -- Part 1: Configuration Change Detection - This part focuses on monitoring configuration files, parsing changes, and comparing old vs new configurations to identify exactly which virtual clusters need to be restarted. It provides a clean interface that returns structured change operations (additions, removals, modifications) without actually performing any restart operations. +The original proposal used a file-based watcher mechanism to detect configuration changes. This has been replaced with an **HTTP-based trigger mechanism** for the following reasons: -- Part 2: Graceful Virtual Cluster Restart - This part handles the actual restart operations, including graceful connection draining, in-flight message completion, and rollback mechanisms. It takes the change decisions from Part 1 and executes them safely while ensuring minimal service disruption. +| Aspect | File Watcher | HTTP Endpoint | +|--------|--------------|---------------| +| **Trigger** | Automatic on file change | Explicit HTTP POST request | +| **Control** | Passive monitoring | Active, operator-controlled | +| **Configuration Delivery** | Read from filesystem | Sent in request body | +| **Response** | Asynchronous (via logs) | Synchronous HTTP response | +| **Validation** | After file is saved | Before applying changes | +| **Rollback** | Manual file restore | Automatic on failure | -# Part 1: Configuration Change Detection Framework +This proposal is structured as a multi-part implementation to ensure clear separation of concerns: -With this framework, kroxylicious will be able to detect config file changes (using standard fileWatcher service) and using various detector interfaces, it will figure out which virtual clusters are added/removed or modified. The list of affected clusters will be then passed on to the Part 2 of this feature, where the clusters would be gracefully restarted (or rollbacked to previous stable state in case of any failures ) +- **Part 1: HTTP-Based Configuration Reload Endpoint** - This part focuses on the HTTP endpoint that receives new configurations, validates them, and triggers the hot-reload process. It provides synchronous feedback via HTTP response with detailed reload results. -POC PR - https://github.com/kroxylicious/kroxylicious/pull/2901 +- **Part 2: Graceful Virtual Cluster Restart** - This part handles the actual restart operations, including graceful connection draining, in-flight message completion, and rollback mechanisms. It takes the change decisions from Part 1 and executes them safely while ensuring minimal service disruption. -## Core Classes & Structure +**POC PR** - https://github.com/kroxylicious/kroxylicious/pull/3176 +--- + +# Part 1: HTTP-Based Configuration Reload Endpoint + +With this framework, operators can trigger configuration reloads by sending an HTTP POST request to `/admin/config/reload` with the new YAML configuration in the request body. The endpoint validates the configuration, detects changes, and orchestrates the reload process with full rollback support. + +## Endpoint Configuration -1. **ConfigWatcherService** - File system monitoring and configuration loading orchestrator - - Monitors configuration file changes using Java NIO WatchService - - Parses YAML configuration files using the existing ConfigParser when changes are detected - - Provides graceful shutdown of executor services and watch resources - - Triggers configuration change callbacks asynchronously to initiate the hot-reload process - - Handles file parsing errors and continues monitoring - - `KafkaProxy.java` - As this is the entry point to the proxy app, this class will configure the callback which needs to be triggered when there is a config change. This class will also be responsible for setting up the ConfigurationChangeHandler and ConfigWatcherService +To enable the reload endpoint, add the following to your kroxylicious configuration: +```yaml +management: + endpoints: + prometheus: {} + configReload: + enabled: true + timeout: 60s # Optional, defaults to 60s ``` -public final class KafkaProxy implements AutoCloseable { - ..... - public KafkaProxy(PluginFactoryRegistry pfr, Configuration config, Features features, Path configFilePath) { - ..... - // Initialize configuration change handler with direct list of detectors - this.configurationChangeHandler = new ConfigurationChangeHandler( - List.of( - new VirtualClusterChangeDetector(), - new FilterChangeDetector()), - virtualClusterManager); - } - ... +## HTTP API - public CompletableFuture startConfigurationWatcher(Path configFilePath) { - ..... - this.configWatcherService = new ConfigWatcherService( - configFilePath, - this::handleConfigurationChange, - Duration.ofMillis(500) // 500ms debounce delay - ); +**Endpoint:** `POST /admin/config/reload` - return configWatcherService.start(); - } - ... +**Request:** +- **Method:** POST +- **Content-Type:** `application/yaml`, `text/yaml`, or `application/x-yaml` +- **Body:** Complete YAML configuration - public CompletableFuture stopConfigurationWatcher() { - if (configWatcherService == null) { - return CompletableFuture.completedFuture(null); - } - return configWatcherService.stop().thenRun(() -> { - configWatcherService = null; - }); +**Response:** +- **Content-Type:** `application/json` +- **Status:** 200 OK (success), 400 Bad Request (validation error), 409 Conflict (concurrent reload), 500 Internal Server Error (failure) + +**Example Response (Success):** +```json +{ + "success": true, + "message": "Configuration reloaded successfully", + "clustersModified": 1, + "clustersAdded": 0, + "clustersRemoved": 0, + "timestamp": "2024-01-15T10:30:00Z" +} +``` + +**Example Response (Failure):** +```json +{ + "success": false, + "message": "Configuration validation failed: invalid bootstrap servers", + "clustersModified": 0, + "clustersAdded": 0, + "clustersRemoved": 0, + "timestamp": "2024-01-15T10:30:00Z" +} +``` + +> **WARNING:** This endpoint has NO authentication and is INSECURE by design. Use network policies or firewalls to restrict access. + +## Core Classes & Structure + +### 1. ConfigurationReloadEndpoint + +HTTP POST endpoint handler for triggering configuration reload at `/admin/config/reload`. + +- **What it does**: Receives HTTP POST requests containing YAML configuration and initiates the reload process +- **Key Responsibilities:** + - Extracts the YAML configuration from the HTTP request body + - Delegates request processing to `ReloadRequestProcessor` + - Formats successful responses using `ResponseFormatter` + - Handles different exception types and returns appropriate HTTP status codes: + - `400 Bad Request` for validation errors (invalid YAML, wrong content-type) + - `409 Conflict` for concurrent reload attempts + - `500 Internal Server Error` for reload failures + - Provides structured JSON response with reload results + +```java +public class ConfigurationReloadEndpoint implements Function { + + public static final String PATH = "/admin/config/reload"; + + private final ReloadRequestProcessor requestProcessor; + private final ResponseFormatter responseFormatter; + + public ConfigurationReloadEndpoint( + ReloadRequestProcessor requestProcessor, + ResponseFormatter responseFormatter) { + this.requestProcessor = Objects.requireNonNull(requestProcessor); + this.responseFormatter = Objects.requireNonNull(responseFormatter); } - private void handleConfigurationChange(Configuration newConfig) { + @Override + public HttpResponse apply(HttpRequest request) { try { - Configuration newValidatedConfig = validate(newConfig, features); - Configuration oldConfig = this.config; - - // Create models once to avoid excessive logging during change detection - List oldModels = oldConfig.virtualClusterModel(pfr); - List newModels = newValidatedConfig.virtualClusterModel(pfr); - ConfigurationChangeContext changeContext = new ConfigurationChangeContext( - oldConfig, newValidatedConfig, oldModels, newModels); - - // Delegate to the configuration change handler - configurationChangeHandler.handleConfigurationChange(changeContext) - .thenRun(() -> { - // Update the stored configuration after successful hot-reload - this.config = newValidatedConfig; - // Synchronize the virtualClusterModels with the new configuration to ensure consistency - this.virtualClusterModels = newModels; - LOGGER.info("Configuration and virtual cluster models successfully updated"); - }); + // Create context from request + ReloadRequestContext context = ReloadRequestContext.from(request); + + // Process request through handler chain + ReloadResponse response = requestProcessor.process(context); + + // Format and return response + return responseFormatter.format(response, request); } - catch (Exception e) { - LOGGER.error("Failed to validate or process configuration change", e); + catch (ValidationException e) { + return createErrorResponse(request, HttpResponseStatus.BAD_REQUEST, e.getMessage()); + } + catch (ConcurrentReloadException e) { + return createErrorResponse(request, HttpResponseStatus.CONFLICT, e.getMessage()); + } + catch (ReloadException e) { + return createErrorResponse(request, HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage()); + } + } +} +``` + +### 2. ReloadRequestProcessor + +Processes reload requests using the **Chain of Responsibility** pattern. Each handler performs a specific task and passes the context to the next handler. + +- **What it does**: Orchestrates the request processing pipeline by chaining multiple handlers that validate, parse, and execute the reload +- **Key Responsibilities:** + - Builds the handler chain in the correct order: validation → parsing → execution + - Passes an immutable `ReloadRequestContext` through each handler + - Each handler can enrich the context (e.g., add parsed Configuration) or throw exceptions + - Returns the final `ReloadResponse` from the context after all handlers complete + - Enforces maximum content length (10MB) to prevent memory exhaustion + +```java +public class ReloadRequestProcessor { + + private static final int MAX_CONTENT_LENGTH = 10 * 1024 * 1024; // 10MB + + private final List handlers; + + public ReloadRequestProcessor( + ConfigParser parser, + ConfigurationReloadOrchestrator orchestrator, + long timeoutSeconds) { + this.handlers = List.of( + new ContentTypeValidationHandler(), // 1. Validates Content-Type header + new ContentLengthValidationHandler(MAX_CONTENT_LENGTH), // 2. Validates body size + new ConfigurationParsingHandler(parser), // 3. Parses YAML to Configuration + new ConfigurationReloadHandler(orchestrator, timeoutSeconds)); // 4. Executes reload + } + + public ReloadResponse process(ReloadRequestContext context) throws ReloadException { + ReloadRequestContext currentContext = context; + + for (ReloadRequestHandler handler : handlers) { + currentContext = handler.handle(currentContext); } + + return currentContext.getResponse(); } } ``` + +**Handler Chain:** + +``` +┌─────────────────────────────────┐ +│ ContentTypeValidationHandler │ Validates Content-Type: application/yaml +└───────────────┬─────────────────┘ + │ + ▼ +┌─────────────────────────────────┐ +│ ContentLengthValidationHandler │ Validates body size <= 10MB +└───────────────┬─────────────────┘ + │ + ▼ +┌─────────────────────────────────┐ +│ ConfigurationParsingHandler │ Parses YAML → Configuration object +└───────────────┬─────────────────┘ + │ + ▼ +┌─────────────────────────────────┐ +│ ConfigurationReloadHandler │ Executes reload via orchestrator +└─────────────────────────────────┘ ``` -public class ConfigWatcherService { - ... - public ConfigWatcherService(Path configFilePath, - Consumer onConfigurationChanged) { - + +### 3. ReloadRequestContext + +Immutable context object passed through the request processing chain. Uses the builder pattern for creating modified contexts. + +- **What it does**: Carries request data and processing results through the handler chain without mutation +- **Key Responsibilities:** + - Holds the original `HttpRequest` and extracted request body + - Stores the parsed `Configuration` after parsing handler completes + - Stores the final `ReloadResponse` after reload handler completes + - Provides immutable "with" methods that return new context instances with updated fields + - Uses `Builder` pattern for clean construction and modification + +```java +public class ReloadRequestContext { + + private final HttpRequest httpRequest; + private final String requestBody; + private final Configuration parsedConfiguration; + private final ReloadResponse response; + + public static ReloadRequestContext from(HttpRequest request) { + String body = null; + if (request instanceof FullHttpRequest fullRequest) { + ByteBuf content = fullRequest.content(); + if (content.readableBytes() > 0) { + body = content.toString(StandardCharsets.UTF_8); + } + } + + return new Builder() + .withHttpRequest(request) + .withRequestBody(body) + .build(); } - ... - public CompletableFuture start() {} - public CompletableFuture stop() {} - //There will be more methods which will schedule a FileWatcher on the configpath - // and trigger the handleConfigurationChange() whenever there is a valid change + // Immutable "with" methods return new context instances + public ReloadRequestContext withParsedConfiguration(Configuration config) { + return new Builder(this).withParsedConfiguration(config).build(); + } - private void handleConfigurationChange() { - ... - onConfigurationChanged.accept(newConfiguration); - ... - } - + public ReloadRequestContext withResponse(ReloadResponse response) { + return new Builder(this).withResponse(response).build(); + } } ``` -2. **ConfigurationChangeHandler** - Orchestrates the entire configuration change process from detection to execution with rollback capability. - - This handler accepts a list of detector interfaces which run and identify which virtual clusters are affected. - - Once we get to know the list of clusters that need to be added/removed/restarted, this class will call the VirtualClusterManager methods to perform addition/deletion/restarts (This class will be discussed in part 2) - - This class also creates an instance of `ConfigurationChangeRollbackTracker`, which tracks what operations are being applied. So in case of any failures, the operations performed can be reversed to previous stable state. +### 4. ConfigurationReloadOrchestrator + +Orchestrates configuration reload operations with **concurrency control**, **validation**, and **state tracking**. Uses `ReentrantLock` to prevent concurrent reloads. + +- **What it does**: Acts as the main coordinator for the entire reload workflow, from validation through execution to state management +- **Key Responsibilities:** + - **Concurrency Control**: Uses `ReentrantLock.tryLock()` to prevent concurrent reloads and returns `ConcurrentReloadException` if a reload is already in progress + - **Configuration Validation**: Validates the new configuration using the `Features` framework before applying + - **FilterChainFactory Management**: Creates a new `FilterChainFactory` with updated filter definitions and performs atomic swap on success + - **Rollback on Failure**: If reload fails, closes the new factory and keeps the old factory active + - **State Tracking**: Maintains reload state (IDLE/IN_PROGRESS) via `ReloadStateManager` + - **Disk Persistence**: Persists successful configuration to disk by replacing the existing config file with the new one. A backup of the old config is also taken (.bak extension) + +```java +public class ConfigurationReloadOrchestrator { + + private final ConfigurationChangeHandler configurationChangeHandler; + private final PluginFactoryRegistry pluginFactoryRegistry; + private final Features features; + private final ReloadStateManager stateManager; + private final ReentrantLock reloadLock; + + private Configuration currentConfiguration; + private final @Nullable Path configFilePath; + + // Shared mutable reference to FilterChainFactory - enables atomic swaps during hot reload + private final AtomicReference filterChainFactoryRef; + + public ConfigurationReloadOrchestrator( + Configuration initialConfiguration, + ConfigurationChangeHandler configurationChangeHandler, + PluginFactoryRegistry pluginFactoryRegistry, + Features features, + @Nullable Path configFilePath, + AtomicReference filterChainFactoryRef) { + this.currentConfiguration = Objects.requireNonNull(initialConfiguration); + this.configurationChangeHandler = Objects.requireNonNull(configurationChangeHandler); + this.pluginFactoryRegistry = Objects.requireNonNull(pluginFactoryRegistry); + this.features = Objects.requireNonNull(features); + this.filterChainFactoryRef = Objects.requireNonNull(filterChainFactoryRef); + this.configFilePath = configFilePath; + this.stateManager = new ReloadStateManager(); + this.reloadLock = new ReentrantLock(); + } + + /** + * Reload configuration with concurrency control. + * This method implements the Template Method pattern - it defines the reload algorithm + * skeleton with fixed steps. + */ + public CompletableFuture reload(Configuration newConfig) { + // 1. Check if reload already in progress + if (!reloadLock.tryLock()) { + return CompletableFuture.failedFuture( + new ConcurrentReloadException("A reload operation is already in progress")); + } + + Instant startTime = Instant.now(); + + try { + // 2. Mark reload as started + stateManager.startReload(); + + // 3. Validate configuration + Configuration validatedConfig = validateConfiguration(newConfig); + + // 4. Execute reload + return executeReload(validatedConfig, startTime) + .whenComplete((result, error) -> { + if (error != null) { + stateManager.recordFailure(error); + } + else { + stateManager.recordSuccess(result); + this.currentConfiguration = validatedConfig; + persistConfigurationToDisk(validatedConfig); + } + }); + } + finally { + reloadLock.unlock(); + } + } + + /** + * Execute the configuration reload by creating a new FilterChainFactory, + * building a change context, and delegating to ConfigurationChangeHandler. + */ + private CompletableFuture executeReload(Configuration newConfig, Instant startTime) { + // 1. Create new FilterChainFactory with updated filter definitions + FilterChainFactory newFactory = new FilterChainFactory(pluginFactoryRegistry, newConfig.filterDefinitions()); + + // 2. Get old factory for rollback capability + FilterChainFactory oldFactory = filterChainFactoryRef.get(); + + // 3. Build change context with both old and new factories + List oldModels = currentConfiguration.virtualClusterModel(pluginFactoryRegistry); + List newModels = newConfig.virtualClusterModel(pluginFactoryRegistry); + + ConfigurationChangeContext changeContext = new ConfigurationChangeContext( + currentConfiguration, newConfig, + oldModels, newModels, + oldFactory, newFactory); + + // 4. Execute configuration changes + return configurationChangeHandler.handleConfigurationChange(changeContext) + .thenApply(v -> { + // SUCCESS: Atomically swap to new factory + filterChainFactoryRef.set(newFactory); + if (oldFactory != null) { + oldFactory.close(); + } + return buildReloadResult(changeContext, startTime); + }) + .exceptionally(error -> { + // FAILURE: Rollback - close new factory, keep old factory + newFactory.close(); + throw new CompletionException("Configuration reload failed", error); + }); + } +} ``` + +### 5. ConfigurationChangeHandler + +Orchestrates the entire configuration change process from detection to execution with rollback capability. + +- **What it does**: Coordinates multiple change detectors, aggregates their results, and executes cluster operations in the correct order +- **Key Responsibilities:** + - **Detector Coordination**: Accepts a list of `ChangeDetector` implementations and runs all of them to identify changes + - **Result Aggregation**: Uses `LinkedHashSet` to merge results from all detectors, removing duplicates while maintaining order + - **Ordered Execution**: Processes changes in the correct order: Remove → Modify → Add (to free up ports/resources first) + - **Rollback Tracking**: Creates a `ConfigurationChangeRollbackTracker` to track all successful operations for potential rollback + - **Rollback on Failure**: If any operation fails, initiates rollback of all previously successful operations in reverse order + +```java public class ConfigurationChangeHandler { - + + private final List changeDetectors; + private final VirtualClusterManager virtualClusterManager; + public ConfigurationChangeHandler(List changeDetectors, VirtualClusterManager virtualClusterManager) { this.changeDetectors = List.copyOf(changeDetectors); this.virtualClusterManager = virtualClusterManager; - ... + } /** * Main entry point for handling configuration changes. */ - public CompletableFuture handleConfigurationChange( - ConfigurationChangeContext changeContext) { - - // 1. Detect changes using all registered detectors - ChangeResult changes = detectChanges(changeContext); - - if (!changes.hasChanges()) { - LOGGER.info("No changes detected - hot-reload not needed"); - return CompletableFuture.completedFuture(null); - } - - // 2. Process changes with rollback tracking - ConfigurationChangeRollbackTracker rollbackTracker = new ConfigurationChangeRollbackTracker(); - - return processConfigurationChanges(changes, changeContext, rollbackTracker) - .thenRun(() -> { - LOGGER.info("Configuration hot-reload completed successfully - {} operations processed", - changes.getTotalOperations()); - }) - .whenComplete((result, throwable) -> { - if (throwable != null) { - LOGGER.error("Configuration change failed - initiating rollback", throwable); - performRollback(rollbackTracker); - } - }); - } - - /** + public CompletableFuture handleConfigurationChange(ConfigurationChangeContext changeContext) { + // 1. Detect changes using all registered detectors + ChangeResult changes = detectChanges(changeContext); + + if (!changes.hasChanges()) { + LOGGER.info("No changes detected - hot-reload not needed"); + return CompletableFuture.completedFuture(null); + } + + // 2. Process changes with rollback tracking + ConfigurationChangeRollbackTracker rollbackTracker = new ConfigurationChangeRollbackTracker(); + + return processConfigurationChanges(changes, changeContext, rollbackTracker) + .whenComplete((result, throwable) -> { + if (throwable != null) { + LOGGER.error("Configuration change failed - initiating rollback", throwable); + performRollback(rollbackTracker); + } + else { + LOGGER.info("Configuration hot-reload completed successfully - {} operations processed", + changes.getTotalOperations()); + } + }); + } + + /** * Coordinates multiple change detectors and aggregates their results. */ - private ChangeResult detectChanges(ConfigurationChangeContext context) { + private ChangeResult detectChanges(ConfigurationChangeContext context) { Set allClustersToRemove = new LinkedHashSet<>(); Set allClustersToAdd = new LinkedHashSet<>(); Set allClustersToModify = new LinkedHashSet<>(); - + changeDetectors.forEach(detector -> { try { ChangeResult detectorResult = detector.detectChanges(context); @@ -170,81 +442,52 @@ public class ConfigurationChangeHandler { } catch (Exception e) { LOGGER.error("Error in change detector '{}': {}", detector.getName(), e.getMessage(), e); - // Continue with other detectors even if one fails } }); - + return new ChangeResult( new ArrayList<>(allClustersToRemove), new ArrayList<>(allClustersToAdd), new ArrayList<>(allClustersToModify)); } - /** - * Processes configuration changes in the correct order: Remove -> Modify -> Add + /** + * Processes configuration changes in the correct order: Remove → Modify → Add */ private CompletableFuture processConfigurationChanges( - ChangeResult changes, + ChangeResult changes, ConfigurationChangeContext context, ConfigurationChangeRollbackTracker rollbackTracker) { - - // Sequential processing using stream.reduce() with CompletableFuture chaining + CompletableFuture chain = CompletableFuture.completedFuture(null); - // All the below operations will happen by calling methods of `VirtualClusterManager` // 1. Remove clusters first (to free up ports/resources) // 2. Restart modified existing clusters // 3. Add new clusters last - + return chain; } - - /** - * Performs rollback of all successful operations in reverse order in failure - */ - private CompletableFuture performRollback(ConfigurationChangeRollbackTracker tracker) { - // Rollback in reverse order: Added -> Modified -> Removed - // Remove clusters that were successfully added - // Restore clusters that were modified (revert to old configuration) - // Re-add clusters that were removed - } - - } ``` -3. **ConfigurationChangeRollbackTracker** - This class maintains a record of all cluster operations (removals, modifications, additions) so they can be reversed if the overall configuration change fails. -``` -public class ConfigurationChangeRollbackTracker { - - /** - * Tracks a cluster removal operation. - */ - public void trackRemoval(String clusterName, VirtualClusterModel removedModel) {} +### 6. ChangeDetector Interface - /** - * Tracks a cluster modification operation. - */ - public void trackModification(String clusterName, VirtualClusterModel originalModel, VirtualClusterModel newModel) {} +Strategy pattern interface for different types of change detection. - /** - * Tracks a cluster addition operation. - */ - public void trackAddition(String clusterName, VirtualClusterModel addedModel) {} -} -``` +- **What it does**: Defines a contract for components that detect specific types of configuration changes +- **Key Responsibilities:** + - Provides a consistent API for comparing old vs new configurations via `detectChanges()` + - Returns structured `ChangeResult` objects with specific operations needed (add/remove/modify) + - Enables extensibility - new detectors can be added without modifying existing code + - Currently has two implementations: `VirtualClusterChangeDetector` and `FilterChangeDetector` -4. **ChangeDetector (Interface)** - Strategy pattern interface for different types of change detection. - - Currently we have only have one implementation - `VirtualClusterChangeDetector` which will detect changes in virtual cluster models, in future we can add a detector for filter changes aka `FilterChangeDetector` - - Provides a consistent API for comparing old vs new configurations via detectChanges() - - Returns structured ChangeResult objects with specific operations needed -``` +```java public interface ChangeDetector { /** * Name of this change detector for logging and debugging. */ String getName(); - + /** * Detect configuration changes and return structured change information. * @param context The configuration context containing old and new configurations @@ -253,123 +496,552 @@ public interface ChangeDetector { ChangeResult detectChanges(ConfigurationChangeContext context); } ``` -``` + +### 7. VirtualClusterChangeDetector + +Identifies virtual clusters needing restart due to model changes (new, removed, modified). + +- **What it does**: Compares old and new `VirtualClusterModel` collections to detect cluster-level changes +- **Key Responsibilities:** + - **New Cluster Detection**: Finds clusters that exist in new configuration but not in old (additions) + - **Removed Cluster Detection**: Finds clusters that exist in old configuration but not in new (deletions) + - **Modified Cluster Detection**: Finds clusters that exist in both but have different `VirtualClusterModel` (using `equals()` comparison) + - Uses cluster name as the unique identifier for comparison + +```java public class VirtualClusterChangeDetector implements ChangeDetector { - - private static final Logger LOGGER = LoggerFactory.getLogger(VirtualClusterChangeDetector.class); - + @Override public String getName() { return "VirtualClusterChangeDetector"; } - + @Override public ChangeResult detectChanges(ConfigurationChangeContext context) { // Check for modified clusters using equals() comparison List modifiedClusters = findModifiedClusters(context); - + // Check for new clusters (exist in new but not old) List newClusters = findNewClusters(context); - + // Check for removed clusters (exist in old but not new) List removedClusters = findRemovedClusters(context); - + return new ChangeResult(removedClusters, newClusters, modifiedClusters); } + + private List findModifiedClusters(ConfigurationChangeContext context) { + Map oldModelMap = context.oldModels().stream() + .collect(Collectors.toMap(VirtualClusterModel::getClusterName, model -> model)); + + return context.newModels().stream() + .filter(newModel -> { + VirtualClusterModel oldModel = oldModelMap.get(newModel.getClusterName()); + return oldModel != null && !oldModel.equals(newModel); + }) + .map(VirtualClusterModel::getClusterName) + .collect(Collectors.toList()); + } + + private List findNewClusters(ConfigurationChangeContext context) { + Set oldClusterNames = context.oldModels().stream() + .map(VirtualClusterModel::getClusterName) + .collect(Collectors.toSet()); + + return context.newModels().stream() + .map(VirtualClusterModel::getClusterName) + .filter(name -> !oldClusterNames.contains(name)) + .collect(Collectors.toList()); + } + + private List findRemovedClusters(ConfigurationChangeContext context) { + Set newClusterNames = context.newModels().stream() + .map(VirtualClusterModel::getClusterName) + .collect(Collectors.toSet()); + + return context.oldModels().stream() + .map(VirtualClusterModel::getClusterName) + .filter(name -> !newClusterNames.contains(name)) + .collect(Collectors.toList()); + } +} ``` -5. **ConfigurationChangeContext (Record)** - Immutable data container providing context for change detection. - - Holds old and new Configuration objects for comparison +### 8. FilterChangeDetector + +Identifies clusters needing restart due to filter configuration changes. + +- **What it does**: Detects changes in filter definitions and identifies which virtual clusters are affected +- **Key Responsibilities:** + - **Filter Definition Changes**: Compares `NamedFilterDefinition` objects to find filters where type or config changed (additions/removals are handled by Configuration validation) + - **Default Filters Changes**: Detects changes to the `defaultFilters` list (order matters for filter chain execution) + - Returns only `clustersToModify` - filter changes don't cause cluster additions/removals +- **Cluster Impact Rules**: A cluster is impacted if: + - It uses a filter definition that was modified (either from explicit filters or defaults), OR + - It doesn't specify `cluster.filters()` AND the `defaultFilters` list changed + +```java +public class FilterChangeDetector implements ChangeDetector { + + @Override + public String getName() { + return "FilterChangeDetector"; + } + + @Override + public ChangeResult detectChanges(ConfigurationChangeContext context) { + // Detect filter definition changes + Set modifiedFilterNames = findModifiedFilterDefinitions(context); + + // Detect default filters changes (order matters for filter chain execution) + boolean defaultFiltersChanged = hasDefaultFiltersChanged(context); + + // Find impacted clusters + List clustersToModify = findImpactedClusters(modifiedFilterNames, defaultFiltersChanged, context); + + return new ChangeResult(List.of(), List.of(), clustersToModify); + } + + /** + * Find filter definitions that have been modified. + * A filter is considered modified if the type or config changed. + * Note: Filter additions/removals are not tracked here as they're handled by Configuration validation. + */ + private Set findModifiedFilterDefinitions(ConfigurationChangeContext context) { + Map oldDefs = buildFilterDefMap(context.oldConfig()); + Map newDefs = buildFilterDefMap(context.newConfig()); + + Set modifiedFilterNames = new HashSet<>(); + + // Check each new definition to see if it differs from the old one + for (Map.Entry entry : newDefs.entrySet()) { + String filterName = entry.getKey(); + NamedFilterDefinition newDef = entry.getValue(); + NamedFilterDefinition oldDef = oldDefs.get(filterName); + + // Filter exists in both configs - check if it changed + if (oldDef != null && !oldDef.equals(newDef)) { + modifiedFilterNames.add(filterName); + } + } + + return modifiedFilterNames; + } + + /** + * Check if the default filters list has changed. + * Order matters because filter chain execution is sequential. + */ + private boolean hasDefaultFiltersChanged(ConfigurationChangeContext context) { + List oldDefaults = context.oldConfig().defaultFilters(); + List newDefaults = context.newConfig().defaultFilters(); + // Use Objects.equals for null-safe comparison - checks both content AND order + return !Objects.equals(oldDefaults, newDefaults); + } + + /** + * Find virtual clusters that are impacted by filter changes. + * Uses a simple single-pass approach: iterate through each cluster and check if it's + * affected by any filter change. Prioritizes code clarity over optimization. + */ + private List findImpactedClusters( + Set modifiedFilterNames, + boolean defaultFiltersChanged, + ConfigurationChangeContext context) { + + // Early return if nothing changed + if (modifiedFilterNames.isEmpty() && !defaultFiltersChanged) { + return List.of(); + } + + List impactedClusters = new ArrayList<>(); + + // Simple approach: check each cluster's resolved filters + for (VirtualClusterModel cluster : context.newModels()) { + String clusterName = cluster.getClusterName(); + + // Get this cluster's resolved filters (either explicit or from defaults) + List clusterFilterNames = cluster.getFilters() + .stream() + .map(NamedFilterDefinition::name) + .toList(); + + // Check if cluster uses any modified filter OR uses defaults and defaults changed + boolean usesModifiedFilter = clusterFilterNames.stream() + .anyMatch(modifiedFilterNames::contains); + + boolean usesChangedDefaults = defaultFiltersChanged && + clusterUsesDefaults(cluster, context.newConfig()); + + if (usesModifiedFilter || usesChangedDefaults) { + impactedClusters.add(clusterName); + } + } + + return impactedClusters; + } + + /** + * Check if a cluster uses default filters. + * A cluster uses defaults if it doesn't specify its own filters list. + */ + private boolean clusterUsesDefaults(VirtualClusterModel cluster, Configuration config) { + VirtualCluster vc = config.virtualClusters().stream() + .filter(v -> v.name().equals(cluster.getClusterName())) + .findFirst() + .orElse(null); + + // Cluster uses defaults if it doesn't specify its own filters + return vc != null && vc.filters() == null; + } +} ``` + +### 9. Supporting Records + +**ConfigurationChangeContext** - Immutable context for change detection. +- **What it does**: Provides a single object containing all the data needed for change detection, including both old and new configurations and their pre-computed models +- **Key fields**: `oldConfig`, `newConfig`, `oldModels`, `newModels`, `oldFilterChainFactory`, `newFilterChainFactory` +- **Why FilterChainFactory is included**: Enables filter-related change detectors to reference the factories for comparison + +```java public record ConfigurationChangeContext( - Configuration oldConfig, - Configuration newConfig, - List oldModels, - List newModels -) {} + Configuration oldConfig, + Configuration newConfig, + List oldModels, + List newModels, + @Nullable FilterChainFactory oldFilterChainFactory, + @Nullable FilterChainFactory newFilterChainFactory) {} ``` -6. **ChangeResult (Record)** - Contains lists of cluster names for each operation type (remove/add/modify) -``` +**ChangeResult** - Result of change detection. +- **What it does**: Contains categorized lists of cluster names for each operation type needed +- **Key fields**: `clustersToRemove`, `clustersToAdd`, `clustersToModify` +- **Utility methods**: `hasChanges()` to check if any changes detected, `getTotalOperations()` to get total count + +```java public record ChangeResult( - List clustersToRemove, - List clustersToAdd, - List clustersToModify -) {} + List clustersToRemove, + List clustersToAdd, + List clustersToModify) { + + public boolean hasChanges() { + return !clustersToRemove.isEmpty() || !clustersToAdd.isEmpty() || !clustersToModify.isEmpty(); + } + + public int getTotalOperations() { + return clustersToRemove.size() + clustersToAdd.size() + clustersToModify.size(); + } +} ``` -## Flow diagram +**ReloadResponse** - HTTP response payload. +- **What it does**: Serializable record that represents the JSON response sent back to HTTP clients +- **Key fields**: `success`, `message`, `clustersModified`, `clustersAdded`, `clustersRemoved`, `timestamp` +- **Factory methods**: `from(ReloadResult)` to convert internal result, `error(message)` for error responses + +```java +public record ReloadResponse( + boolean success, + String message, + int clustersModified, + int clustersAdded, + int clustersRemoved, + String timestamp) { + + public static ReloadResponse from(ReloadResult result) { + return new ReloadResponse( + result.isSuccess(), + result.getMessage(), + result.getClustersModified(), + result.getClustersAdded(), + result.getClustersRemoved(), + result.getTimestamp().toString()); + } + + public static ReloadResponse error(String message) { + return new ReloadResponse(false, message, 0, 0, 0, Instant.now().toString()); + } +} +``` + +**ReloadStateManager** - Tracks reload state and history. +- **What it does**: Maintains the current reload state and a history of recent reload operations for observability +- **Key responsibilities**: Tracks `IDLE`/`IN_PROGRESS` state, records success/failure with `ReloadResult`, maintains bounded history (max 10 entries) +- **Thread safety**: Uses `AtomicReference` for state and `synchronized` blocks for history access + +```java +public class ReloadStateManager { + + private static final int MAX_HISTORY_SIZE = 10; + + private final AtomicReference currentState; + private final Deque reloadHistory; + + public enum ReloadState { + IDLE, + IN_PROGRESS + } + + public void startReload() { + currentState.set(ReloadState.IN_PROGRESS); + } + + public void recordSuccess(ReloadResult result) { + currentState.set(ReloadState.IDLE); + addToHistory(result); + } + + public void recordFailure(Throwable error) { + currentState.set(ReloadState.IDLE); + addToHistory(ReloadResult.failure(error.getMessage())); + } + + public ReloadState getCurrentState() { + return currentState.get(); + } + + public Optional getLastResult() { + synchronized (reloadHistory) { + return reloadHistory.isEmpty() ? Optional.empty() : Optional.of(reloadHistory.peekLast()); + } + } +} +``` + +## Integration with KafkaProxy + +The `KafkaProxy` class initializes the reload orchestrator and passes it to the management endpoint. + +- **What it does**: `KafkaProxy` is the entry point to the proxy app and is responsible for setting up all hot-reload components +- **Key Responsibilities:** + - Creates `ConnectionTracker` and `InFlightMessageTracker` for connection management + - Creates `ConnectionDrainManager` and `VirtualClusterManager` for cluster lifecycle operations + - Creates `ConfigurationChangeHandler` with list of change detectors (`VirtualClusterChangeDetector`, `FilterChangeDetector`) + - Creates `AtomicReference` that is shared between `KafkaProxyInitializer` and `ConfigurationReloadOrchestrator` for atomic factory swaps + - Creates `ConfigurationReloadOrchestrator` and passes it to `ManagementInitializer` for HTTP endpoint registration +- **Why AtomicReference is used**: Both the initializers (which create filter chains for new connections) and the orchestrator (which swaps factories on reload) need access to the current factory. Using `AtomicReference` enables atomic, thread-safe swaps. + +```java +public final class KafkaProxy implements AutoCloseable { + + // Shared mutable reference to FilterChainFactory - enables atomic swaps during hot reload + private AtomicReference filterChainFactoryRef; + + private final ConfigurationChangeHandler configurationChangeHandler; + private final @Nullable ConfigurationReloadOrchestrator reloadOrchestrator; + + public KafkaProxy(PluginFactoryRegistry pfr, Configuration config, Features features, @Nullable Path configFilePath) { + // Initialize connection management components + this.connectionDrainManager = new ConnectionDrainManager(connectionTracker, inFlightTracker); + this.virtualClusterManager = new VirtualClusterManager(endpointRegistry, connectionDrainManager); + + // Initialize configuration change handler with detectors + this.configurationChangeHandler = new ConfigurationChangeHandler( + List.of( + new VirtualClusterChangeDetector(), + new FilterChangeDetector()), + virtualClusterManager); + + // Create AtomicReference for FilterChainFactory + this.filterChainFactoryRef = new AtomicReference<>(); + + // Initialize reload orchestrator for HTTP endpoint + this.reloadOrchestrator = new ConfigurationReloadOrchestrator( + config, + configurationChangeHandler, + pfr, + features, + configFilePath, + filterChainFactoryRef); + } + + public CompletableFuture startup() { + // Create initial FilterChainFactory and store in shared atomic reference + FilterChainFactory initialFactory = new FilterChainFactory(pfr, config.filterDefinitions()); + this.filterChainFactoryRef.set(initialFactory); + + // Pass atomic reference to initializers for dynamic factory swaps + var tlsServerBootstrap = buildServerBootstrap(proxyEventGroup, + new KafkaProxyInitializer(filterChainFactoryRef, ...)); + var plainServerBootstrap = buildServerBootstrap(proxyEventGroup, + new KafkaProxyInitializer(filterChainFactoryRef, ...)); + + // Start management listener with reload orchestrator + var managementFuture = maybeStartManagementListener(managementEventGroup, meterRegistries, reloadOrchestrator); + + // ... + } +} +``` -Image +## Flow Diagram +``` +┌───────────────────────────────────────────────────────────────────────────┐ +│ HTTP POST /admin/config/reload │ +│ Content-Type: application/yaml │ +│ Body: │ +└───────────────────────────────────┬───────────────────────────────────────┘ + │ + ▼ +┌───────────────────────────────────────────────────────────────────────────┐ +│ ConfigurationReloadEndpoint │ +│ - Creates ReloadRequestContext │ +│ - Delegates to ReloadRequestProcessor │ +└───────────────────────────────────┬───────────────────────────────────────┘ + │ + ▼ +┌───────────────────────────────────────────────────────────────────────────┐ +│ ReloadRequestProcessor │ +│ - Chain of Responsibility pattern │ +│ │ +│ ┌─────────────────┐ ┌──────────────────┐ ┌────────────────────────┐ │ +│ │ ContentType │ → │ ContentLength │ → │ ConfigurationParsing │ │ +│ │ Validation │ │ Validation │ │ Handler │ │ +│ └─────────────────┘ └──────────────────┘ └────────────┬───────────┘ │ +│ │ │ +│ ▼ │ +│ ┌────────────────────────────┐ │ +│ │ ConfigurationReload │ │ +│ │ Handler │ │ +│ └────────────┬───────────────┘ │ +└────────────────────────────────────────────────────────┼──────────────────┘ + │ + ▼ +┌───────────────────────────────────────────────────────────────────────────┐ +│ ConfigurationReloadOrchestrator │ +│ - Concurrency control (ReentrantLock) │ +│ - Configuration validation │ +│ - Creates new FilterChainFactory │ +│ - Builds ConfigurationChangeContext │ +└───────────────────────────────────┬───────────────────────────────────────┘ + │ + ▼ +┌───────────────────────────────────────────────────────────────────────────┐ +│ ConfigurationChangeHandler │ +│ - Coordinates change detectors │ +│ - Aggregates change results │ +│ │ +│ ┌──────────────────────────┐ ┌────────────────────────┐ │ +│ │ VirtualClusterChange │ │ FilterChangeDetector │ │ +│ │ Detector │ │ │ │ +│ │ - New clusters │ │ - Modified filters │ │ +│ │ - Removed clusters │ │ - Default filters │ │ +│ │ - Modified clusters │ │ - Impacted clusters │ │ +│ └──────────────┬───────────┘ └───────────┬────────────┘ │ +│ │ │ │ +│ └─────────────┬──────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌──────────────────────┐ │ +│ │ ChangeResult │ │ +│ │ - clustersToRemove │ │ +│ │ - clustersToAdd │ │ +│ │ - clustersToModify │ │ +│ └──────────┬───────────┘ │ +└────────────────────────────────┼──────────────────────────────────────────┘ + │ + ▼ + ┌────────────────────────┐ + │ VirtualClusterManager │ ────► Part 2: Graceful Restart + │ - Remove clusters │ + │ - Add clusters │ + │ - Restart clusters │ + └────────────────────────┘ + │ + ▼ +┌───────────────────────────────────────────────────────────────────────────┐ +│ ON SUCCESS │ +│ │ +│ ┌─────────────────────────────────────────────────────────────────────┐ │ +│ │ filterChainFactoryRef.set(newFactory) ◄── Atomic swap! │ │ +│ │ oldFactory.close() │ │ +│ │ currentConfiguration = newConfig │ │ +│ │ persistConfigurationToDisk(newConfig) │ │ +│ └─────────────────────────────────────────────────────────────────────┘ │ +└───────────────────────────────────┬───────────────────────────────────────┘ + │ + ▼ +┌───────────────────────────────────────────────────────────────────────────┐ +│ HTTP 200 OK │ +│ Content-Type: application/json │ +│ {"success": true, "clustersModified": 1, ...} │ +└───────────────────────────────────────────────────────────────────────────┘ +``` +--- # Part 2: Graceful Virtual Cluster Restart -Part 2 of the hot-reload implementation focuses on gracefully restarting of virtual clusters. This component receives structured change operations from Part 1 and executes them in a carefully orchestrated sequence: **connection draining → resource deregistration → new resource registration → connection restoration.** +Part 2 of the hot-reload implementation focuses on gracefully restarting virtual clusters. This component receives structured change operations from Part 1 and executes them in a carefully orchestrated sequence: **connection draining → resource deregistration → new resource registration → connection restoration.** The design emphasizes minimal service disruption by ensuring all in-flight Kafka requests complete before closing connections (or when a timeout is hit). ## Core Classes & Structure -1. **VirtualClusterManager** - - **What it does** - Acts as the high-level orchestrator for all virtual cluster lifecycle operations during hot-reload. `ConfigurationChangeHandler` calls the `VirtualClusterManager` to restart/add/remove clusters when there is a config change - - **Key Responsibilities:** - - **Cluster Addition**: Takes a new `VirtualClusterModel` and brings it online by registering it using `EndpointRegistry` - - **Cluster Removal**: Safely takes down an existing cluster by first draining all connections gracefully, then deregistering it using EndpointRegistry - - **Cluster Restart**: Performs a complete cluster reconfiguration by removing the old version and adding the new version with updated settings - - **Rollback Integration**: Automatically tracks all successful operations so they can be undone if later operations fail -``` +### 1. VirtualClusterManager + +Acts as the high-level orchestrator for all virtual cluster lifecycle operations during hot-reload. `ConfigurationChangeHandler` calls the `VirtualClusterManager` to restart/add/remove clusters when there is a config change. + +- **What it does**: Manages the complete lifecycle of virtual clusters including addition, removal, and restart operations +- **Key Responsibilities:** + - **Cluster Addition**: Takes a new `VirtualClusterModel` and brings it online by registering all gateways with `EndpointRegistry` + - **Cluster Removal**: Safely takes down an existing cluster by first draining all connections gracefully via `ConnectionDrainManager`, then deregistering from `EndpointRegistry` + - **Cluster Restart**: Performs a complete cluster reconfiguration by orchestrating remove → add sequence with updated settings + - **Rollback Integration**: Automatically tracks all successful operations via `ConfigurationChangeRollbackTracker` so they can be undone if later operations fail + +```java public class VirtualClusterManager { - - ... + + private final EndpointRegistry endpointRegistry; + private final ConnectionDrainManager connectionDrainManager; + public VirtualClusterManager(EndpointRegistry endpointRegistry, ConnectionDrainManager connectionDrainManager) { this.endpointRegistry = endpointRegistry; this.connectionDrainManager = connectionDrainManager; } - + /** * Gracefully removes a virtual cluster by draining connections and deregistering endpoints. */ public CompletableFuture removeVirtualCluster(String clusterName, List oldModels, ConfigurationChangeRollbackTracker rollbackTracker) { - // 1. Find cluster model to remove VirtualClusterModel clusterToRemove = findClusterModel(oldModels, clusterName); - - // 2. Drain connections gracefully (30s timeout) + + // 1. Drain connections gracefully (30s timeout) return connectionDrainManager.gracefullyDrainConnections(clusterName, Duration.ofSeconds(30)) .thenCompose(v -> { - // 3. Deregister all gateways from endpoint registry + // 2. Deregister all gateways from endpoint registry var deregistrationFutures = clusterToRemove.gateways().values().stream() .map(gateway -> endpointRegistry.deregisterVirtualCluster(gateway)) .toArray(CompletableFuture[]::new); - + return CompletableFuture.allOf(deregistrationFutures); }) .thenRun(() -> { - // 4. Track removal for potential rollback + // 3. Track removal for potential rollback rollbackTracker.trackRemoval(clusterName, clusterToRemove); LOGGER.info("Successfully removed virtual cluster '{}'", clusterName); }); } - + /** * Restarts a virtual cluster with new configuration (remove + add). */ public CompletableFuture restartVirtualCluster(String clusterName, + VirtualClusterModel newModel, List oldModels, - List newModels, ConfigurationChangeRollbackTracker rollbackTracker) { VirtualClusterModel oldModel = findClusterModel(oldModels, clusterName); - VirtualClusterModel newModel = findClusterModel(newModels, clusterName); - + // Step 1: Remove existing cluster (drain + deregister) return removeVirtualCluster(clusterName, oldModels, rollbackTracker) .thenCompose(v -> { // Step 2: Add new cluster with updated configuration - return addVirtualCluster(clusterName, List.of(newModel), rollbackTracker); + return addVirtualCluster(newModel, rollbackTracker); }) .thenRun(() -> { // Step 3: Track modification and stop draining @@ -378,15 +1050,14 @@ public class VirtualClusterManager { LOGGER.info("Successfully restarted virtual cluster '{}' with new configuration", clusterName); }); } - + /** * Adds a new virtual cluster by registering endpoints and enabling connections. */ - public CompletableFuture addVirtualCluster(String clusterName, - List newModels, + public CompletableFuture addVirtualCluster(VirtualClusterModel newModel, ConfigurationChangeRollbackTracker rollbackTracker) { - VirtualClusterModel newModel = findClusterModel(newModels, clusterName); - + String clusterName = newModel.getClusterName(); + return registerVirtualCluster(newModel) .thenRun(() -> { // Stop draining to allow new connections @@ -395,224 +1066,205 @@ public class VirtualClusterManager { LOGGER.info("Successfully added new virtual cluster '{}'", clusterName); }); } - + /** * Registers all gateways for a virtual cluster with the endpoint registry. */ private CompletableFuture registerVirtualCluster(VirtualClusterModel model) { - LOGGER.info("Registering virtual cluster '{}' with {} gateways", - model.getClusterName(), model.gateways().size()); - var registrationFutures = model.gateways().values().stream() .map(gateway -> endpointRegistry.registerVirtualCluster(gateway)) .toArray(CompletableFuture[]::new); - - return CompletableFuture.allOf(registrationFutures) - .thenRun(() -> LOGGER.info("Successfully registered virtual cluster '{}' with all gateways", - model.getClusterName())); + + return CompletableFuture.allOf(registrationFutures); } } ``` -2. **ConnectionDrainManager** - - **What it does** - Implements the graceful connection draining strategy during cluster restarts. This is what makes hot-reload "graceful" - it ensures that client requests in progress are completed rather than dropped. - - **Key Responsibilities:** - - **Draining Mode Control**: Starts/stops "draining mode" where new connections are rejected but existing ones continue - - **Backpressure Strategy**: Applies intelligent backpressure by disabling the channel`autoRead` on downstream channels while keeping upstream channels active. This is done so that any “new” client messages are rejected, while the upstream channel is kept open so that the existing inflight requests are delivered to kafka and their response are successfully delivered back to the client. - - **In-Flight Monitoring**: Continuously monitors pending Kafka requests and waits for them to complete before closing connections. This is done using `InFlightMessageTracker` class. - - **Explanation of the draining strategy** - - **Phase 1: Initiate Draining Mode** - Set cluster to "draining mode" in which any new connection attempts will be rejected. Then we proceed to gracefully closing the connection. - ``` - public CompletableFuture gracefullyDrainConnections(String clusterName, Duration totalTimeout) { - // 1. Get current connection and message state - int totalConnections = connectionTracker.getTotalConnectionCount(clusterName); - int totalInFlight = inFlightTracker.getTotalPendingRequestCount(clusterName); - - LOGGER.info("Starting graceful drain for cluster '{}' with {} connections and {} in-flight requests ({}s timeout)", - clusterName, totalConnections, totalInFlight, totalTimeout.getSeconds()); - - // 2. Enter draining mode - reject new connections - return startDraining(clusterName) - .thenCompose(v -> { - if (totalConnections == 0) { - // Fast path: no connections to drain - return CompletableFuture.completedFuture(null); - } else { - // Proceed with connection closure - return gracefullyCloseConnections(clusterName, totalTimeout); - } - }); - } - - public CompletableFuture startDraining(String clusterName) { - drainingClusters.put(clusterName, new AtomicBoolean(true)); - return CompletableFuture.completedFuture(null); - } - ``` - - **Phase 2: Apply Backpressure Strategy** - we set `autoRead = false` only on the downstream channel to reject any new client messages. `ConnectionTracker` class tracks which downstream/upstream channels are active for a given cluster name. - - **Downstream (Client→Proxy)** - `autoRead = false` - Prevents clients from sending NEW requests while allowing existing requests to complete - - **Upstream (Proxy→Kafka)** - `autoRead = true` - Allows Kafka responses to flow back to complete pending requests. In-flight request count decreases naturally as responses arrive - ``` - public CompletableFuture gracefullyCloseConnections(String clusterName, Duration timeout) { - // 1. Get separate channel collections - Set downstreamChannels = connectionTracker.getDownstreamActiveChannels(clusterName); - Set upstreamChannels = connectionTracker.getUpstreamActiveChannels(clusterName); - - // 2. Apply different strategies to different channel types - var allCloseFutures = new ArrayList>(); - - // Add downstream channel close futures - downstreamChannels.stream() - .map(this::disableAutoReadOnDownstreamChannel) - .map(channel -> gracefullyCloseChannel(channel, clusterName, timeout, "DOWNSTREAM")) - .forEach(allCloseFutures::add); - - // Add upstream channel close futures - upstreamChannels.stream() - .map(channel -> gracefullyCloseChannel(channel, clusterName, timeout, "UPSTREAM")) - .forEach(allCloseFutures::add); - - return CompletableFuture.allOf(allCloseFutures.toArray(new CompletableFuture[0])); - } - - private Channel disableAutoReadOnDownstreamChannel(Channel downstreamChannel) { - try { - if (downstreamChannel.isActive()) { - // Get the KafkaProxyFrontendHandler from the channel pipeline - KafkaProxyFrontendHandler frontendHandler = downstreamChannel.pipeline().get(KafkaProxyFrontendHandler.class); - if (frontendHandler != null) { - frontendHandler.applyBackpressure(); - LOGGER.debug("Applied backpressure via frontend handler for channel: L:/{}, R:/{}", - downstreamChannel.localAddress(), downstreamChannel.remoteAddress()); - } - else { - LOGGER.debug("Manually applying backpressure for channel: L:/{}, R:/{}", - downstreamChannel.localAddress(), downstreamChannel.remoteAddress()); - // Fallback to manual method if handler not found - downstreamChannel.config().setAutoRead(false); - } - } +### 2. ConnectionDrainManager + +Implements the graceful connection draining strategy during cluster restarts. This is what makes hot-reload "graceful" - it ensures that client requests in progress are completed rather than dropped. + +- **What it does**: Manages the graceful shutdown of connections during cluster restart, ensuring no in-flight messages are lost +- **Key Responsibilities:** + - **Draining Mode Control**: Maintains a map of draining clusters; when a cluster enters drain mode, `shouldAcceptConnection()` returns false to reject new connections + - **Backpressure Strategy**: Sets `autoRead = false` only on downstream (client→proxy) channels to prevent new requests, while keeping upstream (proxy→Kafka) channels reading to allow responses to complete naturally + - **In-Flight Monitoring**: Uses a scheduled executor to periodically check `InFlightMessageTracker` (every 100ms) and closes channels when pending requests reach zero + - **Timeout Handling**: If in-flight count doesn't reach zero within the timeout (default 30s), force-closes the channel to prevent indefinite hangs + - **Resource Cleanup**: Implements `AutoCloseable` to properly shut down the scheduler on proxy shutdown +- **Explanation of the Draining Strategy:** + - **Phase 1**: Enter draining mode → new connection attempts are rejected + - **Phase 2**: Apply backpressure → downstream `autoRead=false`, upstream `autoRead=true` + - **Phase 3**: Monitor in-flight messages → wait for count to reach zero or timeout, then close channel + +```java +public class ConnectionDrainManager implements AutoCloseable { + + private final ConnectionTracker connectionTracker; + private final InFlightMessageTracker inFlightTracker; + private final Map drainingClusters = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler; + + public ConnectionDrainManager(ConnectionTracker connectionTracker, + InFlightMessageTracker inFlightTracker) { + this.connectionTracker = connectionTracker; + this.inFlightTracker = inFlightTracker; + this.scheduler = new ScheduledThreadPoolExecutor(2, r -> { + Thread t = new Thread(r, "connection-drain-manager"); + t.setDaemon(true); + return t; + }); + } + + /** + * Determines if a new connection should be accepted for the specified virtual cluster. + */ + public boolean shouldAcceptConnection(String clusterName) { + return !isDraining(clusterName); + } + + /** + * Performs a complete graceful drain operation by stopping new connections + * and immediately closing existing connections after in-flight messages complete. + */ + public CompletableFuture gracefullyDrainConnections(String clusterName, Duration totalTimeout) { + int totalActiveConnections = connectionTracker.getTotalConnectionCount(clusterName); + int totalInFlightRequests = inFlightTracker.getTotalPendingRequestCount(clusterName); + + LOGGER.info("Starting graceful drain for cluster '{}' with {} connections and {} in-flight requests", + clusterName, totalActiveConnections, totalInFlightRequests); + + return startDraining(clusterName) + .thenCompose(v -> { + if (totalActiveConnections == 0) { + return CompletableFuture.completedFuture(null); } - catch (Exception e) { - LOGGER.warn("Failed to disable autoRead for downstream channel L:/{}, R:/{} - continuing with drain", - downstreamChannel.localAddress(), downstreamChannel.remoteAddress(), e); + else { + return gracefullyCloseConnections(clusterName, totalTimeout); } - return downstreamChannel; + }); + } + + /** + * Starts draining - new connections will be rejected. + */ + public CompletableFuture startDraining(String clusterName) { + drainingClusters.put(clusterName, new AtomicBoolean(true)); + return CompletableFuture.completedFuture(null); + } + + /** + * Gracefully closes all active connections for the specified virtual cluster. + * Strategy: Disable autoRead on downstream channels to prevent new requests, + * but keep upstream channels reading to allow responses to complete naturally. + */ + public CompletableFuture gracefullyCloseConnections(String clusterName, Duration timeout) { + Set downstreamChannels = connectionTracker.getDownstreamActiveChannels(clusterName); + Set upstreamChannels = connectionTracker.getUpstreamActiveChannels(clusterName); + + var allCloseFutures = new ArrayList>(); + + // STRATEGY: + // - Downstream (autoRead=false): Prevents new client requests from being processed + // - Upstream (autoRead=true): Allows Kafka responses to be processed normally + + // Add downstream channel close futures + downstreamChannels.stream() + .map(this::disableAutoReadOnDownstreamChannel) + .map(channel -> gracefullyCloseChannel(channel, clusterName, timeout, "DOWNSTREAM")) + .forEach(allCloseFutures::add); + + // Add upstream channel close futures + upstreamChannels.stream() + .map(channel -> gracefullyCloseChannel(channel, clusterName, timeout, "UPSTREAM")) + .forEach(allCloseFutures::add); + + return CompletableFuture.allOf(allCloseFutures.toArray(new CompletableFuture[0])); + } + + private Channel disableAutoReadOnDownstreamChannel(Channel downstreamChannel) { + try { + if (downstreamChannel.isActive()) { + KafkaProxyFrontendHandler frontendHandler = + downstreamChannel.pipeline().get(KafkaProxyFrontendHandler.class); + if (frontendHandler != null) { + frontendHandler.applyBackpressure(); } - ``` - - - **Phase 3: Monitor In-Flight Message Completion and close channel** - Monitor in-flight count every 100ms for draining while waiting for in-flight count to reach zero naturally. If for some reason, the in-flight count does not reach zero (hangs, could be due to underlying kafka going down), force close after timeout to prevent indefinite hangs. Once in-flight count reaches zero (or after the timeout), close the channel immediately. - ``` - private CompletableFuture gracefullyCloseChannel(Channel channel, String clusterName, - String channelType, Duration timeout) { - CompletableFuture future = new CompletableFuture<>(); - long startTime = System.currentTimeMillis(); - - // Schedule timeout - ScheduledFuture timeoutTask = scheduler.schedule(() -> { - if (!future.isDone()) { - LOGGER.warn("Graceful shutdown timeout exceeded for {} channel L:/{}, R:/{} in cluster '{}' - forcing immediate closure", - channelType, channel.localAddress(), channel.remoteAddress(), clusterName); - closeChannelImmediately(channel, future); - } - }, timeoutMillis, TimeUnit.MILLISECONDS); - - // Schedule periodic checks for in-flight messages - ScheduledFuture checkTask = scheduler.scheduleAtFixedRate(() -> { - try { - if (future.isDone()) { - return; - } - - int pendingRequests = inFlightTracker.getPendingRequestCount(clusterName, channel); - long elapsed = System.currentTimeMillis() - startTime; - - if (pendingRequests == 0) { - LOGGER.info("In-flight messages cleared for {} channel L:/{}, R:/{} in cluster '{}' - proceeding with connection closure ({}ms elapsed)", - channelType, channel.localAddress(), channel.remoteAddress(), clusterName, elapsed); - closeChannelImmediately(channel, future); - } - else { - // Just wait for existing in-flight messages to complete naturally - // Do NOT call channel.read() as it would trigger processing of new messages - int totalPending = inFlightTracker.getTotalPendingRequestCount(clusterName); - LOGGER.debug("Waiting for {} channel L:/{}, R:/{} in cluster '{}' to drain: {} pending requests (cluster total: {}, {}ms elapsed)", - channelType, channel.localAddress(), channel.remoteAddress(), clusterName, pendingRequests, totalPending, elapsed); - } - } - catch (Exception e) { - LOGGER.error("Unexpected error during graceful shutdown monitoring for channel L:/{}, R:/{} in cluster '{}'", - channel.localAddress(), channel.remoteAddress(), clusterName, e); - future.completeExceptionally(e); - } - }, 50, 100, TimeUnit.MILLISECONDS); // Check every 100ms for faster response - - // Cancel scheduled tasks when future completes and log final result - future.whenComplete((result, throwable) -> { - timeoutTask.cancel(false); - checkTask.cancel(false); - - if (throwable == null) { - LOGGER.info("Successfully completed graceful shutdown of {} channel L:/{}, R:/{} in cluster '{}'", - channelType, channel.localAddress(), channel.remoteAddress(), clusterName); - } - else { - LOGGER.error("Graceful shutdown failed for {} channel L:/{}, R:/{} in cluster '{}': {}", - channelType, channel.localAddress(), channel.remoteAddress(), clusterName, throwable.getMessage()); - } - }); - - return future; - } - - private void closeChannelImmediately(Channel channel, CompletableFuture future) { - if (future.isDone()) { - return; - } - - channel.close().addListener(channelFuture -> { - if (channelFuture.isSuccess()) { - future.complete(null); - } - else { - future.completeExceptionally(channelFuture.cause()); - } - }); - } - ``` - - **How will drain mode reject new client connections ?** - For this, we will put a check in KafkaProxyFrontendHandler#channelActive method to reject new connections, if the particular cluster is in drain mode. - ``` - public class KafkaProxyFrontendHandler - extends ChannelInboundHandlerAdapter - implements NetFilter.NetFilterContext { - .... - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - this.clientCtx = ctx; - - // Check if we should accept this connection (not draining) - String clusterName = virtualClusterModel.getClusterName(); - if (connectionDrainManager != null && !connectionDrainManager.shouldAcceptConnection(clusterName)) { - LOGGER.info("Rejecting new connection for draining cluster '{}'", clusterName); - ctx.close(); - return; + else { + downstreamChannel.config().setAutoRead(false); } - - this.proxyChannelStateMachine.onClientActive(this); - super.channelActive(this.clientCtx); } - .... } - ``` + catch (Exception e) { + LOGGER.warn("Failed to disable autoRead for downstream channel - continuing", e); + } + return downstreamChannel; + } + + /** + * Gracefully closes a single channel. + * Monitors in-flight count every 100ms, closes when zero or on timeout. + */ + private CompletableFuture gracefullyCloseChannel(Channel channel, String clusterName, + Duration timeout, String channelType) { + CompletableFuture future = new CompletableFuture<>(); + long timeoutMillis = timeout.toMillis(); + long startTime = System.currentTimeMillis(); + + // Schedule timeout + ScheduledFuture timeoutTask = scheduler.schedule(() -> { + if (!future.isDone()) { + LOGGER.warn("Graceful shutdown timeout - forcing closure"); + closeChannelImmediately(channel, future); + } + }, timeoutMillis, TimeUnit.MILLISECONDS); + + // Schedule periodic checks for in-flight messages + ScheduledFuture checkTask = scheduler.scheduleAtFixedRate(() -> { + if (future.isDone()) return; + + int pendingRequests = inFlightTracker.getPendingRequestCount(clusterName, channel); + if (pendingRequests == 0) { + closeChannelImmediately(channel, future); + } + }, 50, 100, TimeUnit.MILLISECONDS); + + // Cancel tasks when done + future.whenComplete((result, throwable) -> { + timeoutTask.cancel(false); + checkTask.cancel(false); + }); + + return future; + } + + private void closeChannelImmediately(Channel channel, CompletableFuture future) { + if (future.isDone()) return; -3. **ConnectionTracker** - - **What it does** - Maintains real-time inventory of all active network connections per virtual cluster. You can't gracefully drain connections if you don't know what connections exist - this class provides that visibility. - - **Key Responsibilities:** - - **Bidirectional Tracking**: Separately tracks downstream connections (client→proxy) and upstream connections (proxy→Kafka) - - **Channel Management**: Maintains collections of active `Channel` objects for bulk operations like graceful closure - - **Lifecycle Integration**: Integrates with `ProxyChannelStateMachine` to automatically track connection establishment and closure - - **Cleanup Logic**: Automatically removes references to closed channels and cleans up empty cluster entries + channel.close().addListener(channelFuture -> { + if (channelFuture.isSuccess()) { + future.complete(null); + } + else { + future.completeExceptionally(channelFuture.cause()); + } + }); + } +} ``` + +### 3. ConnectionTracker + +Maintains real-time inventory of all active network connections per virtual cluster. + +- **What it does**: Provides real-time visibility into all active connections (downstream and upstream) for each virtual cluster +- **Key Responsibilities:** + - **Bidirectional Tracking**: Separately tracks downstream connections (client→proxy) and upstream connections (proxy→Kafka) using `ConcurrentHashMap` + - **Channel Management**: Maintains collections of active `Channel` objects for bulk operations like graceful closure + - **Lifecycle Integration**: Integrates with `ProxyChannelStateMachine` to automatically track connection establishment and closure events + - **Cleanup Logic**: Automatically removes references to closed channels and cleans up empty cluster entries to prevent memory leaks + - **Thread Safety**: Uses `ConcurrentHashMap` and `AtomicInteger` for thread-safe operations from multiple Netty event loops + +```java public class ConnectionTracker { // Downstream connections (client → proxy) @@ -632,49 +1284,20 @@ public class ConnectionTracker { onConnectionClosed(clusterName, channel, downstreamConnections, downstreamChannelsByCluster); } - - /** - Called by ConnectionDrainManager - */ public Set getDownstreamActiveChannels(String clusterName) { Set channels = downstreamChannelsByCluster.get(clusterName); return channels != null ? Set.copyOf(channels) : Set.of(); } - // === UPSTREAM CONNECTION TRACKING === - public void onUpstreamConnectionEstablished(String clusterName, Channel channel) { - upstreamConnections.computeIfAbsent(clusterName, k -> new AtomicInteger(0)).incrementAndGet(); - upstreamChannelsByCluster.computeIfAbsent(clusterName, k -> ConcurrentHashMap.newKeySet()).add(channel); - } - - public void onUpstreamConnectionClosed(String clusterName, Channel channel) { - onConnectionClosed(clusterName, channel, upstreamConnections, upstreamChannelsByCluster); - } + // Similar methods for upstream connections... - /** - Called by ConnectionDrainManager - */ - public Set getUpstreamActiveChannels(String clusterName) { - Set channels = upstreamChannelsByCluster.get(clusterName); - return channels != null ? Set.copyOf(channels) : Set.of(); - } - - /** - Called by ConnectionDrainManager - */ public int getTotalConnectionCount(String clusterName) { return getDownstreamActiveConnectionCount(clusterName) + getUpstreamActiveConnectionCount(clusterName); } - /** - * Common method to remove a connection and clean up empty entries. - * This method decrements the connection counter and removes the channel from the set, - * cleaning up empty entries to prevent memory leaks. - */ private void onConnectionClosed(String clusterName, Channel channel, Map connectionCounters, Map> channelsByCluster) { - // Decrement counter and remove if zero or negative AtomicInteger counter = connectionCounters.get(clusterName); if (counter != null) { counter.decrementAndGet(); @@ -683,7 +1306,6 @@ public class ConnectionTracker { } } - // Remove channel from set and remove empty sets Set channels = channelsByCluster.get(clusterName); if (channels != null) { channels.remove(channel); @@ -695,13 +1317,20 @@ public class ConnectionTracker { } ``` -4. **InFlightMessageTracker** - - **What it does** - Tracks **pending Kafka requests** to ensure no messages are lost during connection closure. This enables the "wait for completion" strategy - connections are only closed after all pending requests have received responses. - - **Key Responsibilities:** - - **Request Tracking**: Increments counters when Kafka requests are sent upstream in `ProxyChannelStateMachine` - - **Response Tracking**: Decrements counters when Kafka responses are received in `ProxyChannelStateMachine` - - **Channel Cleanup**: Handles cleanup when channels close unexpectedly, adjusting counts appropriately -``` +### 4. InFlightMessageTracker + +Tracks pending Kafka requests to ensure no messages are lost during connection closure. + +- **What it does**: Maintains counters of pending Kafka requests per channel and cluster to enable "wait for completion" strategy during graceful shutdown +- **Key Responsibilities:** + - **Request Tracking**: Increments counters when Kafka requests are sent upstream (called from `ProxyChannelStateMachine.messageFromClient()`) + - **Response Tracking**: Decrements counters when Kafka responses are received (called from `ProxyChannelStateMachine.messageFromServer()`) + - **Per-Channel Counts**: Maintains a two-level map: `cluster name → channel → pending count` for granular tracking + - **Cluster Totals**: Maintains a separate map for quick cluster-wide total lookup without iterating all channels + - **Channel Cleanup**: When a channel closes unexpectedly, adjusts counts appropriately to prevent stuck counters + - **Thread Safety**: Uses `ConcurrentHashMap` and `AtomicInteger` for thread-safe concurrent access + +```java public class InFlightMessageTracker { // Map from cluster name to channel to pending request count @@ -712,9 +1341,6 @@ public class InFlightMessageTracker { /** * Records that a request has been sent to the upstream cluster. - * - * @param clusterName The name of the virtual cluster. - * @param channel The channel handling the request. */ public void onRequestSent(String clusterName, Channel channel) { pendingRequests.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>()) @@ -727,9 +1353,6 @@ public class InFlightMessageTracker { /** * Records that a response has been received from the upstream cluster. - * - * @param clusterName The name of the virtual cluster. - * @param channel The channel handling the response. */ public void onResponseReceived(String clusterName, Channel channel) { Map clusterRequests = pendingRequests.get(clusterName); @@ -756,10 +1379,7 @@ public class InFlightMessageTracker { } /** - * Records that a channel has been closed, clearing all pending requests for that channel. - * - * @param clusterName The name of the virtual cluster. - * @param channel The channel that was closed. + * Records that a channel has been closed, clearing all pending requests. */ public void onChannelClosed(String clusterName, Channel channel) { Map clusterRequests = pendingRequests.get(clusterName); @@ -768,7 +1388,6 @@ public class InFlightMessageTracker { if (channelCounter != null) { int pendingCount = channelCounter.get(); if (pendingCount > 0) { - // Subtract from total AtomicInteger totalCounter = totalPendingByCluster.get(clusterName); if (totalCounter != null) { int newTotal = totalCounter.addAndGet(-pendingCount); @@ -785,13 +1404,6 @@ public class InFlightMessageTracker { } } - /** - * Gets the number of pending requests for a specific channel in a virtual cluster. - * - * @param clusterName The name of the virtual cluster. - * @param channel The channel. - * @return The number of pending requests. - */ public int getPendingRequestCount(String clusterName, Channel channel) { Map clusterRequests = pendingRequests.get(clusterName); if (clusterRequests != null) { @@ -801,12 +1413,6 @@ public class InFlightMessageTracker { return 0; } - /** - * Gets the total number of pending requests for a virtual cluster across all channels. - * - * @param clusterName The name of the virtual cluster. - * @return The total number of pending requests. - */ public int getTotalPendingRequestCount(String clusterName) { AtomicInteger counter = totalPendingByCluster.get(clusterName); return counter != null ? Math.max(0, counter.get()) : 0; @@ -814,45 +1420,77 @@ public class InFlightMessageTracker { } ``` -5. **Changes in ProxyChannelStateMachine** - We need to enhance the existing state machine for - - **Connection Lifecycle**: Automatically notifies ConnectionTracker when connections are established/closed - - **In-flight Message Tracking**: Automatically notifies InFlightMessageTracker when requests/responses flow through +### 5. ConfigurationChangeRollbackTracker -Example code changes for existing ProxyChannelStateMachine methods -``` -void messageFromServer(Object msg) { - // Track responses received from upstream Kafka (completing in-flight requests) - if (inFlightTracker != null && msg instanceof ResponseFrame && backendHandler != null) { - inFlightTracker.onResponseReceived(clusterName, backendHandler.serverCtx().channel()); - } +Maintains a record of all cluster operations so they can be reversed if the overall configuration change fails. + +- **What it does**: Records all successful cluster operations during a configuration change so they can be undone if a later operation fails +- **Key Responsibilities:** + - **Removal Tracking**: Stores the cluster name and original `VirtualClusterModel` for each removed cluster, enabling re-addition on rollback + - **Modification Tracking**: Stores both the original and new `VirtualClusterModel` for each modified cluster, enabling revert to original state + - **Addition Tracking**: Stores the cluster name and `VirtualClusterModel` for each added cluster, enabling removal on rollback + - **Rollback Order**: Provides ordered lists to enable rollback in reverse order: Added → Modified → Removed + +```java +public class ConfigurationChangeRollbackTracker { - .... + private final List removedClusters = new ArrayList<>(); + private final List modifiedClusters = new ArrayList<>(); + private final List addedClusters = new ArrayList<>(); - // Track responses being sent to client on downstream channel - if (inFlightTracker != null && msg instanceof ResponseFrame) { - inFlightTracker.onResponseReceived(clusterName, frontendHandler.clientCtx().channel()); + private final Map removedModels = new HashMap<>(); + private final Map originalModels = new HashMap<>(); + private final Map modifiedModels = new HashMap<>(); + private final Map addedModels = new HashMap<>(); + + public void trackRemoval(String clusterName, VirtualClusterModel removedModel) { + removedClusters.add(clusterName); + removedModels.put(clusterName, removedModel); } -} -void messageFromClient(Object msg) { - // Track requests being sent upstream (creating in-flight messages) - if (inFlightTracker != null && msg instanceof RequestFrame && backendHandler != null) { - inFlightTracker.onRequestSent(clusterName, backendHandler.serverCtx().channel()); + public void trackModification(String clusterName, VirtualClusterModel originalModel, + VirtualClusterModel newModel) { + modifiedClusters.add(clusterName); + originalModels.put(clusterName, originalModel); + modifiedModels.put(clusterName, newModel); + } + + public void trackAddition(String clusterName, VirtualClusterModel addedModel) { + addedClusters.add(clusterName); + addedModels.put(clusterName, addedModel); } - .... + // Getter methods for rollback operations... } +``` + +### 6. Integration with ProxyChannelStateMachine -void onClientRequest(SaslDecodePredicate dp, - Object msg) { - .... +The existing `ProxyChannelStateMachine` is enhanced to integrate with connection tracking and in-flight message tracking. - // Track requests received from client on downstream channel - if (inFlightTracker != null && msg instanceof RequestFrame) { - inFlightTracker.onRequestSent(clusterName, frontendHandler.clientCtx().channel()); +- **What it does**: Adds hooks into the existing state machine to notify `ConnectionTracker` and `InFlightMessageTracker` of connection and message lifecycle events +- **Key Responsibilities:** + - **Connection Lifecycle**: Automatically notifies `ConnectionTracker` when connections are established (`toClientActive`, `toForwarding`) and closed (`onServerInactive`, `onClientInactive`) + - **In-flight Message Tracking**: Automatically notifies `InFlightMessageTracker` when requests are sent upstream (`messageFromClient`) and responses received (`messageFromServer`) + - **Cleanup on Close**: Ensures `InFlightMessageTracker.onChannelClosed()` is called when channels close to clear any pending counts + +```java +// Example integration points in ProxyChannelStateMachine + +void messageFromServer(Object msg) { + // Track responses received from upstream Kafka + if (inFlightTracker != null && msg instanceof ResponseFrame && backendHandler != null) { + inFlightTracker.onResponseReceived(clusterName, backendHandler.serverCtx().channel()); } + // ... existing logic +} - .... +void messageFromClient(Object msg) { + // Track requests being sent upstream + if (inFlightTracker != null && msg instanceof RequestFrame && backendHandler != null) { + inFlightTracker.onRequestSent(clusterName, backendHandler.serverCtx().channel()); + } + // ... existing logic } void onServerInactive() { @@ -860,12 +1498,11 @@ void onServerInactive() { if (connectionTracker != null && backendHandler != null) { connectionTracker.onUpstreamConnectionClosed(clusterName, backendHandler.serverCtx().channel()); } - // Clear any pending in-flight messages for this upstream channel + // Clear any pending in-flight messages if (inFlightTracker != null && backendHandler != null) { inFlightTracker.onChannelClosed(clusterName, backendHandler.serverCtx().channel()); } - - .... + // ... existing logic } void onClientInactive() { @@ -873,33 +1510,178 @@ void onClientInactive() { if (connectionTracker != null && frontendHandler != null) { connectionTracker.onDownstreamConnectionClosed(clusterName, frontendHandler.clientCtx().channel()); } - // Clear any pending in-flight messages for this downstream channel if (inFlightTracker != null && frontendHandler != null) { inFlightTracker.onChannelClosed(clusterName, frontendHandler.clientCtx().channel()); } - - .... + // ... existing logic } private void toClientActive(ProxyChannelState.ClientActive clientActive, KafkaProxyFrontendHandler frontendHandler) { - .... // Track downstream connection establishment if (connectionTracker != null) { connectionTracker.onDownstreamConnectionEstablished(clusterName, frontendHandler.clientCtx().channel()); } + // ... existing logic } private void toForwarding(Forwarding forwarding) { - .... // Track upstream connection establishment if (connectionTracker != null && backendHandler != null) { connectionTracker.onUpstreamConnectionEstablished(clusterName, backendHandler.serverCtx().channel()); } + // ... existing logic } ``` -# Challenges/Open questions -- If for some reason, loading of the new cluster configs fails, the code will automatically rollback to the previous state. However this will put the app in such a state that the current config file content does not match with the actual running cluster config. -- What if the rollback fails (for some unforeseen reason), the only way for the operator to know this is via Logs. In such cases, a full app restart might be required. -- If there are multiple gateway nodes running, and if there is failure in few nodes, we may have to introduce some sort of status co-ordinator rather than relying that all instances will behave the same. +### 7. Rejecting New Connections During Drain + +The `KafkaProxyFrontendHandler` checks if a cluster is draining before accepting new connections. + +- **What it does**: Adds a guard check in `channelActive()` to reject new connections when a cluster is being drained +- **How it works**: Calls `connectionDrainManager.shouldAcceptConnection(clusterName)` before allowing the connection to proceed. If the cluster is draining, immediately closes the channel with a log message. +- **Why this is needed**: Without this check, new connections could be established while we're trying to drain existing connections, making the drain process take longer or never complete. + +```java +public class KafkaProxyFrontendHandler + extends ChannelInboundHandlerAdapter + implements NetFilter.NetFilterContext { + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + this.clientCtx = ctx; + + // Check if we should accept this connection (not draining) + String clusterName = virtualClusterModel.getClusterName(); + if (connectionDrainManager != null && !connectionDrainManager.shouldAcceptConnection(clusterName)) { + LOGGER.info("Rejecting new connection for draining cluster '{}'", clusterName); + ctx.close(); + return; + } + + this.proxyChannelStateMachine.onClientActive(this); + super.channelActive(this.clientCtx); + } +} +``` + +## Graceful Restart Flow Diagram + +``` +┌───────────────────────────────────────────────────────────────────────────┐ +│ PHASE 1: INITIATE DRAINING │ +│ │ +│ VirtualClusterManager.restartVirtualCluster() │ +│ │ │ +│ ▼ │ +│ ConnectionDrainManager.startDraining(clusterName) │ +│ - drainingClusters.put(clusterName, true) │ +│ - New connections will be REJECTED │ +└───────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌───────────────────────────────────────────────────────────────────────────┐ +│ PHASE 2: APPLY BACKPRESSURE │ +│ │ +│ ConnectionDrainManager.gracefullyCloseConnections() │ +│ │ +│ For each DOWNSTREAM channel (client → proxy): │ +│ - channel.config().setAutoRead(false) │ +│ - Stops receiving NEW client requests │ +│ │ +│ For each UPSTREAM channel (proxy → Kafka): │ +│ - autoRead remains TRUE │ +│ - Continues receiving Kafka responses │ +│ │ +│ Result: In-flight count decreases naturally as responses arrive │ +└───────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌───────────────────────────────────────────────────────────────────────────┐ +│ PHASE 3: MONITOR & CLOSE CHANNELS │ +│ │ +│ For each channel: │ +│ ┌─────────────────────────────────────────────────────────────────────┐ │ +│ │ scheduler.scheduleAtFixedRate(() -> { │ │ +│ │ pendingRequests = inFlightTracker.getPendingRequestCount() │ │ +│ │ │ │ +│ │ if (pendingRequests == 0) { │ │ +│ │ channel.close() ◄── Safe to close! │ │ +│ │ } │ │ +│ │ }, 50ms, 100ms) │ │ +│ │ │ │ +│ │ scheduler.schedule(() -> { │ │ +│ │ if (!done) channel.close() ◄── Force close on timeout │ │ +│ │ }, 30 seconds) │ │ +│ └─────────────────────────────────────────────────────────────────────┘ │ +└───────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌───────────────────────────────────────────────────────────────────────────┐ +│ PHASE 4: DEREGISTER & REGISTER │ +│ │ +│ endpointRegistry.deregisterVirtualCluster(oldGateway) │ +│ - Unbinds network ports │ +│ │ +│ endpointRegistry.registerVirtualCluster(newGateway) │ +│ - Binds network ports with new configuration │ +└───────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌───────────────────────────────────────────────────────────────────────────┐ +│ PHASE 5: STOP DRAINING │ +│ │ +│ ConnectionDrainManager.stopDraining(clusterName) │ +│ - drainingClusters.remove(clusterName) │ +│ - New connections now ACCEPTED │ +│ - Cluster is fully operational with new configuration │ +└───────────────────────────────────────────────────────────────────────────┘ +``` + +--- + +# Example Usage + +## Triggering a Reload with curl + +```bash +curl -X POST http://localhost:9190/admin/config/reload \ + -H "Content-Type: application/yaml" \ + --data-binary @new-config.yaml +``` + +**Response:** +```json +{ + "success": true, + "message": "Configuration reloaded successfully", + "clustersModified": 1, + "clustersAdded": 1, + "clustersRemoved": 0, + "timestamp": "2024-01-15T10:30:00.123456Z" +} +``` + +## Example Configuration with Reload Endpoint + +```yaml +management: + bindAddress: 0.0.0.0 + port: 9190 + endpoints: + prometheus: {} + configReload: + enabled: true + timeout: 60s + +virtualClusters: +- name: "demo-cluster" + targetCluster: + bootstrapServers: "broker:9092" + gateways: + - name: "default-gateway" + portIdentifiesNode: + bootstrapAddress: "localhost:9092" +``` + +---