From 4ff3b486339e0c2bbf0dd994ffce92e109c376f4 Mon Sep 17 00:00:00 2001 From: Roman Zabaluev Date: Wed, 25 Jun 2025 19:02:07 +0900 Subject: [PATCH 1/6] Implement app restart on dynamic config change. Resolves #616 --- .../ui/service/app/ConfigReloadService.java | 114 ++++++++++++++++++ .../ui/util/DynamicConfigOperations.java | 4 +- 2 files changed, 115 insertions(+), 3 deletions(-) create mode 100644 api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java diff --git a/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java b/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java new file mode 100644 index 000000000..970da2253 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java @@ -0,0 +1,114 @@ +package io.kafbat.ui.service.app; + +import io.kafbat.ui.util.ApplicationRestarter; +import io.kafbat.ui.util.DynamicConfigOperations; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import java.io.IOException; +import java.nio.file.ClosedWatchServiceException; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +@Service +@ConditionalOnProperty(value = {"dynamic.config.enabled", "dynamic.config.autoreload"}, havingValue = "true") +@RequiredArgsConstructor +@Slf4j +public class ConfigReloadService { + + private static final String THREAD_NAME = "config-watcher-thread"; + private static final long STARTUP_SUPPRESSION_MS = 1000; + private final long appStartedAt = System.currentTimeMillis(); + + private final DynamicConfigOperations dynamicConfigOperations; + private final ApplicationRestarter restarter; + + private WatchService watchService; + private Thread watcherThread; + + @PostConstruct + public void init() { + log.debug("Auto reload is enabled, will watch for config changes"); + + try { + registerWatchService(); + startWatching(); + } catch (IOException e) { + log.error("Error while registering watch service", e); + } + } + + @PreDestroy + public void shutdown() { + try { + if (watchService != null) { + watchService.close(); + } + } catch (IOException ignored) { + } + if (watcherThread != null) { + this.watcherThread.interrupt(); + } + } + + private void registerWatchService() throws IOException { + this.watchService = FileSystems.getDefault().newWatchService(); + dynamicConfigOperations.dynamicConfigFilePath() + .getParent() + .register(watchService, StandardWatchEventKinds.ENTRY_MODIFY); + } + + private void startWatching() { + watcherThread = new Thread(this::watchLoop, THREAD_NAME); + watcherThread.start(); + } + + private void watchLoop() { + final var watchedDir = dynamicConfigOperations.dynamicConfigFilePath().getParent(); + + while (true) { + try { + WatchKey key = watchService.take(); + for (WatchEvent event : key.pollEvents()) { + WatchEvent.Kind kind = event.kind(); + Path changed = watchedDir.resolve((Path) event.context()); + + if (kind != StandardWatchEventKinds.ENTRY_MODIFY) { + continue; + } + if (!changed.equals(dynamicConfigOperations.dynamicConfigFilePath())) { + continue; + } + + var now = System.currentTimeMillis(); + if (now - appStartedAt < STARTUP_SUPPRESSION_MS) { + continue; + } + + restart(); + } + key.reset(); + } catch (ClosedWatchServiceException e) { + log.trace("Watch service closed, exiting watcher thread"); + break; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + + private void restart() { + log.info("Application config change detected, restarting"); + restarter.requestRestart(); + } + + +} diff --git a/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java b/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java index 0686de2c4..1223266c8 100644 --- a/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java +++ b/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java @@ -30,9 +30,7 @@ import org.springframework.http.codec.multipart.FilePart; import org.springframework.stereotype.Component; import org.yaml.snakeyaml.DumperOptions; -import org.yaml.snakeyaml.LoaderOptions; import org.yaml.snakeyaml.Yaml; -import org.yaml.snakeyaml.constructor.Constructor; import org.yaml.snakeyaml.introspector.BeanAccess; import org.yaml.snakeyaml.introspector.Property; import org.yaml.snakeyaml.introspector.PropertyUtils; @@ -67,7 +65,7 @@ public boolean dynamicConfigEnabled() { return "true".equalsIgnoreCase(ctx.getEnvironment().getProperty(DYNAMIC_CONFIG_ENABLED_ENV_PROPERTY)); } - private Path dynamicConfigFilePath() { + public Path dynamicConfigFilePath() { return Paths.get( Optional.ofNullable(ctx.getEnvironment().getProperty(DYNAMIC_CONFIG_PATH_ENV_PROPERTY)) .orElse(DYNAMIC_CONFIG_PATH_ENV_PROPERTY_DEFAULT) From 04b4b193af2e6729408fbf9bb29ab7aa099261f2 Mon Sep 17 00:00:00 2001 From: Roman Zabaluev Date: Sat, 28 Jun 2025 01:30:53 +0900 Subject: [PATCH 2/6] Allow enabling auto reload without using dynamic config itself --- .../io/kafbat/ui/service/app/ConfigReloadService.java | 9 ++++++++- .../java/io/kafbat/ui/util/DynamicConfigOperations.java | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java b/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java index 970da2253..1e07ba076 100644 --- a/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java +++ b/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java @@ -7,6 +7,7 @@ import java.io.IOException; import java.nio.file.ClosedWatchServiceException; import java.nio.file.FileSystems; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchEvent; @@ -18,7 +19,7 @@ import org.springframework.stereotype.Service; @Service -@ConditionalOnProperty(value = {"dynamic.config.enabled", "dynamic.config.autoreload"}, havingValue = "true") +@ConditionalOnProperty(value = "dynamic.config.autoreload", havingValue = "true") @RequiredArgsConstructor @Slf4j public class ConfigReloadService { @@ -35,6 +36,12 @@ public class ConfigReloadService { @PostConstruct public void init() { + var configPath = dynamicConfigOperations.dynamicConfigFilePath(); + if (!Files.exists(configPath) || !Files.isReadable(configPath)) { + log.warn("Dynamic config file {} doesnt exist or is not readable. Auto reload is disabled", configPath); + return; + } + log.debug("Auto reload is enabled, will watch for config changes"); try { diff --git a/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java b/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java index 1223266c8..dd24636a6 100644 --- a/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java +++ b/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java @@ -77,7 +77,7 @@ public Optional> loadDynamicPropertySource() { if (dynamicConfigEnabled()) { Path configPath = dynamicConfigFilePath(); if (!Files.exists(configPath) || !Files.isReadable(configPath)) { - log.warn("Dynamic config file {} doesnt exist or not readable", configPath); + log.warn("Dynamic config file {} doesnt exist or is not readable", configPath); return Optional.empty(); } var propertySource = new CompositePropertySource("dynamicProperties"); From b008d1b0a9fd669fef7a5ea498695faaa3a38aa8 Mon Sep 17 00:00:00 2001 From: Roman Zabaluev Date: Thu, 7 Aug 2025 03:19:12 +0300 Subject: [PATCH 3/6] WIP --- .dev/dev.yaml | 30 ++----- .../ui/service/app/ConfigReloadService.java | 79 ++++++++++++++++++- api/src/main/resources/application-local.yml | 5 +- frontend/build.gradle | 11 --- 4 files changed, 86 insertions(+), 39 deletions(-) diff --git a/.dev/dev.yaml b/.dev/dev.yaml index 47149ed92..555e303d6 100644 --- a/.dev/dev.yaml +++ b/.dev/dev.yaml @@ -3,28 +3,6 @@ name: "kafbat-ui-dev" services: - kafbat-ui: - container_name: kafbat-ui - image: ghcr.io/kafbat/kafka-ui:latest - ports: - - 8080:8080 - depends_on: - - kafka0 - - schema-registry0 - - kafka-connect0 - - ksqldb0 - environment: - KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092 - KAFKA_CLUSTERS_0_METRICS_PORT: 9997 - KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry0:8085 - KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first - KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083 - KAFKA_CLUSTERS_0_KSQLDBSERVER: http://ksqldb0:8088 - DYNAMIC_CONFIG_ENABLED: 'true' - KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true' - KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true' - kafka0: image: confluentinc/cp-kafka:7.8.0 hostname: kafka0 @@ -48,8 +26,8 @@ services: KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' KAFKA_JMX_PORT: 9997 -# KAFKA_JMX_HOSTNAME: localhost # uncomment this line and comment the next one if running with kafka-ui as a jar - KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997 + KAFKA_JMX_HOSTNAME: localhost # uncomment this line and comment the next one if running with kafka-ui as a jar +# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997 CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' schema-registry0: @@ -91,7 +69,9 @@ services: CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0 - CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/kafka/plugins,/usr/share/filestream-connectors" + CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/share/filestream-connectors,/tmp/connectors" + volumes: + - /tmp/connectors:/tmp/connectors ksqldb0: image: confluentinc/cp-ksqldb-server:7.8.0 diff --git a/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java b/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java index 1e07ba076..767653a98 100644 --- a/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java +++ b/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java @@ -13,13 +13,29 @@ import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.env.OriginTrackedMapPropertySource; +import org.springframework.boot.origin.Origin; +import org.springframework.boot.origin.OriginLookup; +import org.springframework.context.ApplicationContext; +import org.springframework.core.env.AbstractEnvironment; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.EnumerablePropertySource; +import org.springframework.core.env.MapPropertySource; +import org.springframework.core.env.PropertySource; import org.springframework.stereotype.Service; +import org.stringtemplate.v4.ST; @Service -@ConditionalOnProperty(value = "dynamic.config.autoreload", havingValue = "true") +//@ConditionalOnProperty(value = "dynamic.config.autoreload", havingValue = "true") @RequiredArgsConstructor @Slf4j public class ConfigReloadService { @@ -34,8 +50,69 @@ public class ConfigReloadService { private WatchService watchService; private Thread watcherThread; + private final ApplicationContext context; + private final ConfigurableEnvironment environment; + @PostConstruct public void init() { + +/* environment.getPropertySources() + .stream() + .filter(ps -> ps instanceof OriginTrackedMapPropertySource) + .map(ps -> (OriginTrackedMapPropertySource)ps) + .map(ps -> ps.getSource()) + .map(source -> source.values()) + .map(values -> { + return (HashMap) values; + }) +// .map(sourceValues -> sourceValues.) + .collect(Collectors.toUnmodifiableList());*/ + + + // ============= + +/* environment.getPropertySources().stream() + .filter(ps -> ps instanceof EnumerablePropertySource) + .filter(ps -> ps instanceof OriginLookup) + .flatMap(ps -> { + EnumerablePropertySource eps = (EnumerablePropertySource) ps; + OriginLookup lookup = (OriginLookup) ps; + return Arrays.stream(eps.getPropertyNames()) + .map(name -> { + Origin origin = lookup.getOrigin(name); + return origin != null ? origin.toString() : null; + }); + }) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toUnmodifiableList());*/ + + // =============== + +/* Map map = new HashMap(); + for(Iterator it = ((AbstractEnvironment) environment).getPropertySources().iterator(); it.hasNext(); ) { + PropertySource propertySource = (PropertySource) it.next(); + if (propertySource instanceof MapPropertySource) { + map.putAll(((MapPropertySource) propertySource).getSource()); + } + }*/ + + // ==== + + SpringConfigurableEnvironment properties = new SpringConfigurableEnvironment(springEnv); + SpringConfigurableEnvironment.PropertyInfo info = properties.get("profile.env"); + assertEquals("default", properties.get(info.getValue()); + assertEquals( + "Config resource 'class path resource [application.properties]' via location 'optional:classpath:/'", + info.getSourceList.get(0)); + + + + + System.out.println(); +// environment.getPropertySources() +// .stream() + var configPath = dynamicConfigOperations.dynamicConfigFilePath(); if (!Files.exists(configPath) || !Files.isReadable(configPath)) { log.warn("Dynamic config file {} doesnt exist or is not readable. Auto reload is disabled", configPath); diff --git a/api/src/main/resources/application-local.yml b/api/src/main/resources/application-local.yml index 0c40ff079..996291a80 100644 --- a/api/src/main/resources/application-local.yml +++ b/api/src/main/resources/application-local.yml @@ -22,7 +22,8 @@ spring: user-filter-search-filter: "(&(uid={0})(objectClass=inetOrgPerson))" group-filter-search-base: "ou=people,dc=planetexpress,dc=com" -kafka: + +kaf1ka: clusters: - name: local bootstrapServers: localhost:9092 @@ -81,7 +82,7 @@ auth: type: github rbac: - roles: + ro1les: - name: "memelords" clusters: - local diff --git a/frontend/build.gradle b/frontend/build.gradle index dc982ad73..3efb5331a 100644 --- a/frontend/build.gradle +++ b/frontend/build.gradle @@ -27,14 +27,3 @@ tasks.register('generateContract', PnpmTask) { outputs.dir(project.layout.projectDirectory.dir("src/generated-sources")) args = ['gen:sources'] } - -tasks.register('buildFrontend', PnpmTask) { - dependsOn generateContract - inputs.files(fileTree("src/")) - outputs.dir(project.layout.buildDirectory.dir("vite/static")) - args = ['build'] - environment = System.getenv() + [ - "VITE_TAG" : project.version, - "VITE_COMMIT": "git rev-parse --short HEAD".execute().text.trim() - ] -} From b99a1d63af03af7b7dc798771019ad3ce1135f2b Mon Sep 17 00:00:00 2001 From: Roman Zabaluev Date: Thu, 7 Aug 2025 12:43:07 +0300 Subject: [PATCH 4/6] wip --- .../ui/service/app/ConfigReloadService.java | 34 ++++++++++++++++--- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java b/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java index 767653a98..70cedfff6 100644 --- a/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java +++ b/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java @@ -25,6 +25,8 @@ import org.springframework.boot.env.OriginTrackedMapPropertySource; import org.springframework.boot.origin.Origin; import org.springframework.boot.origin.OriginLookup; +import org.springframework.boot.origin.OriginTrackedValue; +import org.springframework.boot.origin.TextResourceOrigin; import org.springframework.context.ApplicationContext; import org.springframework.core.env.AbstractEnvironment; import org.springframework.core.env.ConfigurableEnvironment; @@ -56,7 +58,28 @@ public class ConfigReloadService { @PostConstruct public void init() { -/* environment.getPropertySources() + var o = environment.getPropertySources() + .stream() + .filter(ps -> ps instanceof OriginTrackedMapPropertySource) + .map(ps -> (OriginTrackedMapPropertySource) ps) + .collect(Collectors.toUnmodifiableList()) + .stream() + .findFirst() + .get() + .getSource() + .values() + .stream() + .findFirst() + .map(a -> (OriginTrackedValue) a) + .get() + .getOrigin(); + + var origin = (TextResourceOrigin) o; + + origin.getResource(); + +/* + environment.getPropertySources() .stream() .filter(ps -> ps instanceof OriginTrackedMapPropertySource) .map(ps -> (OriginTrackedMapPropertySource)ps) @@ -66,12 +89,13 @@ public void init() { return (HashMap) values; }) // .map(sourceValues -> sourceValues.) - .collect(Collectors.toUnmodifiableList());*/ + .collect(Collectors.toUnmodifiableList()); +*/ // ============= -/* environment.getPropertySources().stream() +/* environment.getPropertySources().stream() .filter(ps -> ps instanceof EnumerablePropertySource) .filter(ps -> ps instanceof OriginLookup) .flatMap(ps -> { @@ -99,12 +123,12 @@ public void init() { // ==== - SpringConfigurableEnvironment properties = new SpringConfigurableEnvironment(springEnv); +/* SpringConfigurableEnvironment properties = new SpringConfigurableEnvironment(springEnv); SpringConfigurableEnvironment.PropertyInfo info = properties.get("profile.env"); assertEquals("default", properties.get(info.getValue()); assertEquals( "Config resource 'class path resource [application.properties]' via location 'optional:classpath:/'", - info.getSourceList.get(0)); + info.getSourceList.get(0));*/ From 59824d1b9a1d5d2d2cf490c8b098321f8cf4cc3c Mon Sep 17 00:00:00 2001 From: Roman Zabaluev Date: Wed, 13 Aug 2025 04:11:06 +0300 Subject: [PATCH 5/6] wip --- .../ui/service/app/ConfigReloadService.java | 202 ++++-------------- .../io/kafbat/ui/util/MultiFileWatcher.java | 148 +++++++++++++ 2 files changed, 187 insertions(+), 163 deletions(-) create mode 100644 api/src/main/java/io/kafbat/ui/util/MultiFileWatcher.java diff --git a/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java b/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java index 70cedfff6..f7712d489 100644 --- a/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java +++ b/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java @@ -1,40 +1,24 @@ package io.kafbat.ui.service.app; -import io.kafbat.ui.util.ApplicationRestarter; -import io.kafbat.ui.util.DynamicConfigOperations; +import io.kafbat.ui.util.MultiFileWatcher; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import java.io.IOException; -import java.nio.file.ClosedWatchServiceException; -import java.nio.file.FileSystems; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardWatchEventKinds; -import java.nio.file.WatchEvent; -import java.nio.file.WatchKey; -import java.nio.file.WatchService; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; +import java.nio.file.Paths; +import java.util.LinkedHashSet; import java.util.Objects; import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.env.OriginTrackedMapPropertySource; import org.springframework.boot.origin.Origin; -import org.springframework.boot.origin.OriginLookup; import org.springframework.boot.origin.OriginTrackedValue; import org.springframework.boot.origin.TextResourceOrigin; -import org.springframework.context.ApplicationContext; -import org.springframework.core.env.AbstractEnvironment; import org.springframework.core.env.ConfigurableEnvironment; -import org.springframework.core.env.EnumerablePropertySource; -import org.springframework.core.env.MapPropertySource; -import org.springframework.core.env.PropertySource; +import org.springframework.core.io.Resource; import org.springframework.stereotype.Service; -import org.stringtemplate.v4.ST; @Service //@ConditionalOnProperty(value = "dynamic.config.autoreload", havingValue = "true") @@ -43,111 +27,51 @@ public class ConfigReloadService { private static final String THREAD_NAME = "config-watcher-thread"; - private static final long STARTUP_SUPPRESSION_MS = 1000; - private final long appStartedAt = System.currentTimeMillis(); - private final DynamicConfigOperations dynamicConfigOperations; - private final ApplicationRestarter restarter; + private final ConfigurableEnvironment environment; - private WatchService watchService; private Thread watcherThread; - - private final ApplicationContext context; - private final ConfigurableEnvironment environment; + private MultiFileWatcher multiFileWatcher; @PostConstruct public void init() { - - var o = environment.getPropertySources() - .stream() - .filter(ps -> ps instanceof OriginTrackedMapPropertySource) - .map(ps -> (OriginTrackedMapPropertySource) ps) - .collect(Collectors.toUnmodifiableList()) - .stream() - .findFirst() - .get() - .getSource() - .values() - .stream() - .findFirst() - .map(a -> (OriginTrackedValue) a) - .get() - .getOrigin(); - - var origin = (TextResourceOrigin) o; - - origin.getResource(); - -/* - environment.getPropertySources() - .stream() - .filter(ps -> ps instanceof OriginTrackedMapPropertySource) - .map(ps -> (OriginTrackedMapPropertySource)ps) - .map(ps -> ps.getSource()) - .map(source -> source.values()) - .map(values -> { - return (HashMap) values; - }) -// .map(sourceValues -> sourceValues.) - .collect(Collectors.toUnmodifiableList()); -*/ - - - // ============= - -/* environment.getPropertySources().stream() - .filter(ps -> ps instanceof EnumerablePropertySource) - .filter(ps -> ps instanceof OriginLookup) - .flatMap(ps -> { - EnumerablePropertySource eps = (EnumerablePropertySource) ps; - OriginLookup lookup = (OriginLookup) ps; - return Arrays.stream(eps.getPropertyNames()) - .map(name -> { - Origin origin = lookup.getOrigin(name); - return origin != null ? origin.toString() : null; - }); + var propertySourcePaths = StreamSupport.stream(environment.getPropertySources().spliterator(), false) + .filter(OriginTrackedMapPropertySource.class::isInstance) + .map(OriginTrackedMapPropertySource.class::cast) + .flatMap(ps -> ps.getSource().values().stream()) + .map(v -> (v instanceof OriginTrackedValue otv) ? otv.getOrigin() : null) + .filter(Objects::nonNull) + .flatMap(o -> Stream.iterate(o, Objects::nonNull, Origin::getParent)) + .filter(TextResourceOrigin.class::isInstance) + .map(TextResourceOrigin.class::cast) + .map(TextResourceOrigin::getResource) + .filter(Objects::nonNull) + .filter(Resource::exists) + .filter(Resource::isReadable) + .filter(Resource::isFile) + .map(r -> { + try { + return r.getURI(); + } catch (IOException e) { + log.error("can't retrieve resource URL", e); + return null; + } }) .filter(Objects::nonNull) - .distinct() - .collect(Collectors.toUnmodifiableList());*/ - - // =============== - -/* Map map = new HashMap(); - for(Iterator it = ((AbstractEnvironment) environment).getPropertySources().iterator(); it.hasNext(); ) { - PropertySource propertySource = (PropertySource) it.next(); - if (propertySource instanceof MapPropertySource) { - map.putAll(((MapPropertySource) propertySource).getSource()); - } - }*/ - - // ==== - -/* SpringConfigurableEnvironment properties = new SpringConfigurableEnvironment(springEnv); - SpringConfigurableEnvironment.PropertyInfo info = properties.get("profile.env"); - assertEquals("default", properties.get(info.getValue()); - assertEquals( - "Config resource 'class path resource [application.properties]' via location 'optional:classpath:/'", - info.getSourceList.get(0));*/ + .map(Paths::get) + .collect(Collectors.toCollection(LinkedHashSet::new)); - - - - System.out.println(); -// environment.getPropertySources() -// .stream() - - var configPath = dynamicConfigOperations.dynamicConfigFilePath(); - if (!Files.exists(configPath) || !Files.isReadable(configPath)) { - log.warn("Dynamic config file {} doesnt exist or is not readable. Auto reload is disabled", configPath); + if (propertySourcePaths.isEmpty()) { + log.debug("No config files found, auto reload is disabled"); return; } log.debug("Auto reload is enabled, will watch for config changes"); try { - registerWatchService(); - startWatching(); + this.multiFileWatcher = new MultiFileWatcher(propertySourcePaths, this::reload); + this.watcherThread = new Thread(multiFileWatcher::watchLoop, THREAD_NAME); + this.watcherThread.start(); } catch (IOException e) { log.error("Error while registering watch service", e); } @@ -156,8 +80,8 @@ public void init() { @PreDestroy public void shutdown() { try { - if (watchService != null) { - watchService.close(); + if (multiFileWatcher != null) { + multiFileWatcher.close(); } } catch (IOException ignored) { } @@ -166,56 +90,8 @@ public void shutdown() { } } - private void registerWatchService() throws IOException { - this.watchService = FileSystems.getDefault().newWatchService(); - dynamicConfigOperations.dynamicConfigFilePath() - .getParent() - .register(watchService, StandardWatchEventKinds.ENTRY_MODIFY); - } - - private void startWatching() { - watcherThread = new Thread(this::watchLoop, THREAD_NAME); - watcherThread.start(); - } - - private void watchLoop() { - final var watchedDir = dynamicConfigOperations.dynamicConfigFilePath().getParent(); - - while (true) { - try { - WatchKey key = watchService.take(); - for (WatchEvent event : key.pollEvents()) { - WatchEvent.Kind kind = event.kind(); - Path changed = watchedDir.resolve((Path) event.context()); - - if (kind != StandardWatchEventKinds.ENTRY_MODIFY) { - continue; - } - if (!changed.equals(dynamicConfigOperations.dynamicConfigFilePath())) { - continue; - } - - var now = System.currentTimeMillis(); - if (now - appStartedAt < STARTUP_SUPPRESSION_MS) { - continue; - } - - restart(); - } - key.reset(); - } catch (ClosedWatchServiceException e) { - log.trace("Watch service closed, exiting watcher thread"); - break; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - } - } + private void reload() { - private void restart() { - log.info("Application config change detected, restarting"); - restarter.requestRestart(); } diff --git a/api/src/main/java/io/kafbat/ui/util/MultiFileWatcher.java b/api/src/main/java/io/kafbat/ui/util/MultiFileWatcher.java new file mode 100644 index 000000000..246732ba5 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/util/MultiFileWatcher.java @@ -0,0 +1,148 @@ +package io.kafbat.ui.util; + +import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; +import static java.nio.file.StandardWatchEventKinds.OVERFLOW; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.ClosedWatchServiceException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.Assert; + +@Slf4j +public final class MultiFileWatcher implements AutoCloseable { + + private static final long DEBOUNCE_MS = Duration.ofMillis(1000).toMillis(); + + private final WatchService watchService = FileSystems.getDefault().newWatchService(); + private final Set watchedFiles = ConcurrentHashMap.newKeySet(); + private final Map watchDirsByKey = new HashMap<>(); + private final Runnable reloader; + + private long lastTriggerAt = 0; + + public MultiFileWatcher(Collection filesToWatch, Runnable reloader) throws IOException { + Assert.notNull(reloader, "reloader must not be null"); + this.reloader = reloader; + + if (filesToWatch.isEmpty()) { + log.warn("No files to watch, aborting"); + } + + watchedFiles.addAll(filesToWatch.stream() + .map(p -> { + try { + return Files.exists(p) ? p.toRealPath() : p.toAbsolutePath().normalize(); + } catch (IOException e) { + return p.toAbsolutePath().normalize(); + } + }) + .toList()); + + if (watchedFiles.isEmpty()) { + log.warn("No files to watch resolved, aborting"); + return; + } + + log.debug("Going to watch {} files", watchedFiles.size()); + log.trace("Watching files: {}", watchedFiles.stream().map(Path::toString).toList()); + + watchedFiles.stream() + .map(p -> Optional.ofNullable(p.getParent()).orElse(Path.of("."))) + .distinct() + .forEach(dir -> { + try { + var key = dir.register(watchService, + ENTRY_MODIFY, + ENTRY_CREATE, ENTRY_DELETE // watch these for atomic replacements + ); + watchDirsByKey.put(key, dir); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + log.trace("Watching directories: {}", watchDirsByKey.values().stream().map(Path::toString).toList()); + } + + public void watchLoop() { + while (true) { + try { + var key = watchService.take(); + var dir = watchDirsByKey.get(key); + if (dir == null) { + continue; + } + + var hit = key.pollEvents() + .stream() + .filter(ev -> ev.kind() != OVERFLOW) + .map(ev -> dir.resolve((Path) ev.context()).normalize().toAbsolutePath()) + .anyMatch(this::matchesTarget); + + if (hit && shouldTrigger()) { + reloader.run(); + } + + if (!key.reset()) { + watchDirsByKey.remove(key); + } + if (watchDirsByKey.isEmpty()) { + break; + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } catch (ClosedWatchServiceException e) { + log.trace("Watch service closed, exiting watcher thread"); + break; + } catch (Exception e) { + log.error("Error while calling the reloader", e); + break; + } + } + } + + private boolean matchesTarget(Path changed) { + if (watchedFiles.contains(changed)) { + return true; + } + try { + return watchedFiles.contains(changed.toRealPath()); + } catch (IOException ignored) { + return false; + } + } + + private boolean shouldTrigger() { + var now = System.currentTimeMillis(); + + if (now - lastTriggerAt < DEBOUNCE_MS) { + return false; + } + + lastTriggerAt = now; + return true; + } + + @Override + public void close() throws IOException { + watchService.close(); + } +} + From 146937cdeed2c2093d4f988d93ffd944da0c392c Mon Sep 17 00:00:00 2001 From: Roman Zabaluev Date: Fri, 22 Aug 2025 12:06:20 +0300 Subject: [PATCH 6/6] wip --- .../RoleBasedAccessControlProperties.java | 2 + .../ui/service/app/ConfigReloadService.java | 53 ++++++++++++++++++- .../io/kafbat/ui/util/MultiFileWatcher.java | 28 ++++++---- 3 files changed, 73 insertions(+), 10 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/config/auth/RoleBasedAccessControlProperties.java b/api/src/main/java/io/kafbat/ui/config/auth/RoleBasedAccessControlProperties.java index 8ecf12b99..ee8f65c58 100644 --- a/api/src/main/java/io/kafbat/ui/config/auth/RoleBasedAccessControlProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/auth/RoleBasedAccessControlProperties.java @@ -10,6 +10,8 @@ public class RoleBasedAccessControlProperties { private final List roles = new ArrayList<>(); +// private String haha; + @PostConstruct public void init() { diff --git a/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java b/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java index f7712d489..e75117720 100644 --- a/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java +++ b/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java @@ -1,22 +1,35 @@ package io.kafbat.ui.service.app; +import io.kafbat.ui.config.auth.RoleBasedAccessControlProperties; +import io.kafbat.ui.service.rbac.AccessControlService; import io.kafbat.ui.util.MultiFileWatcher; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; import java.nio.file.Paths; import java.util.LinkedHashSet; import java.util.Objects; +import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import lombok.Cleanup; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry; +import org.springframework.boot.context.properties.bind.Binder; import org.springframework.boot.env.OriginTrackedMapPropertySource; import org.springframework.boot.origin.Origin; import org.springframework.boot.origin.OriginTrackedValue; import org.springframework.boot.origin.TextResourceOrigin; +import org.springframework.context.ApplicationContext; import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.MutablePropertySources; +import org.springframework.core.env.PropertiesPropertySource; +import org.springframework.core.env.PropertySource; import org.springframework.core.io.Resource; import org.springframework.stereotype.Service; @@ -29,10 +42,14 @@ public class ConfigReloadService { private static final String THREAD_NAME = "config-watcher-thread"; private final ConfigurableEnvironment environment; + private final ApplicationContext appContext; private Thread watcherThread; private MultiFileWatcher multiFileWatcher; + private final ObjectProvider accessControlService; + private final ObjectProvider roleBasedAccessControlProperties; + @PostConstruct public void init() { var propertySourcePaths = StreamSupport.stream(environment.getPropertySources().spliterator(), false) @@ -69,7 +86,28 @@ public void init() { log.debug("Auto reload is enabled, will watch for config changes"); try { - this.multiFileWatcher = new MultiFileWatcher(propertySourcePaths, this::reload); + this.multiFileWatcher = new MultiFileWatcher(propertySourcePaths, path -> { + System.out.println(path); + var propertySources = environment.getPropertySources(); + + + + Properties properties = new Properties(); + try { + @Cleanup InputStream inputStream = Files.newInputStream(Paths.get("/tmp/kek.yaml")); + properties.load(inputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + + PropertySource origin = + propertySources.stream().filter(ps -> ps.getName().contains("tmp/kek")).findFirst().get(); + environment.getPropertySources().replace(origin.getName(), new PropertiesPropertySource(origin.getName(), properties)); + + System.out.println(); + var kekw = appContext.getBean(AccessControlService.class); + return null; + }); this.watcherThread = new Thread(multiFileWatcher::watchLoop, THREAD_NAME); this.watcherThread.start(); } catch (IOException e) { @@ -91,6 +129,19 @@ public void shutdown() { } private void reload() { + var registry = (DefaultSingletonBeanRegistry) appContext.getAutowireCapableBeanFactory(); + + registry.destroySingleton("AccessControlService"); + + Binder.get(environment) + .bind("rbac", RoleBasedAccessControlProperties.class) + .orElseThrow(() -> new IllegalStateException("no rbac config")); + + var newProps = appContext.getBean(AccessControlService.class); + newProps.init(); +// accessControlService.init(); + System.out.println(); + } diff --git a/api/src/main/java/io/kafbat/ui/util/MultiFileWatcher.java b/api/src/main/java/io/kafbat/ui/util/MultiFileWatcher.java index 246732ba5..7d31023e9 100644 --- a/api/src/main/java/io/kafbat/ui/util/MultiFileWatcher.java +++ b/api/src/main/java/io/kafbat/ui/util/MultiFileWatcher.java @@ -11,6 +11,7 @@ import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.nio.file.WatchKey; import java.nio.file.WatchService; import java.time.Duration; @@ -19,8 +20,11 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; import org.springframework.util.Assert; @Slf4j @@ -31,11 +35,11 @@ public final class MultiFileWatcher implements AutoCloseable { private final WatchService watchService = FileSystems.getDefault().newWatchService(); private final Set watchedFiles = ConcurrentHashMap.newKeySet(); private final Map watchDirsByKey = new HashMap<>(); - private final Runnable reloader; + private final Function reloader; private long lastTriggerAt = 0; - public MultiFileWatcher(Collection filesToWatch, Runnable reloader) throws IOException { + public MultiFileWatcher(Collection filesToWatch, Function reloader) throws IOException { Assert.notNull(reloader, "reloader must not be null"); this.reloader = reloader; @@ -62,28 +66,28 @@ public MultiFileWatcher(Collection filesToWatch, Runnable reloader) throws log.trace("Watching files: {}", watchedFiles.stream().map(Path::toString).toList()); watchedFiles.stream() - .map(p -> Optional.ofNullable(p.getParent()).orElse(Path.of("."))) +// .map(getParentPath()) .distinct() - .forEach(dir -> { + .forEach(file -> { try { - var key = dir.register(watchService, + var key = getParentPath().apply(file).register(watchService, ENTRY_MODIFY, ENTRY_CREATE, ENTRY_DELETE // watch these for atomic replacements ); - watchDirsByKey.put(key, dir); + watchDirsByKey.put(key, file); } catch (IOException e) { throw new UncheckedIOException(e); } }); - log.trace("Watching directories: {}", watchDirsByKey.values().stream().map(Path::toString).toList()); + log.trace("Watching directories: {}", watchDirsByKey.values().stream().map(a -> getParentPath().apply(a)).map(Path::toString).toList()); } public void watchLoop() { while (true) { try { var key = watchService.take(); - var dir = watchDirsByKey.get(key); + var dir = getParentPath().apply(watchDirsByKey.get(key)); if (dir == null) { continue; } @@ -95,7 +99,8 @@ public void watchLoop() { .anyMatch(this::matchesTarget); if (hit && shouldTrigger()) { - reloader.run(); + var filePath = watchDirsByKey.get(key); // TODO + reloader.apply(filePath); } if (!key.reset()) { @@ -144,5 +149,10 @@ private boolean shouldTrigger() { public void close() throws IOException { watchService.close(); } + + @NotNull + private static Function getParentPath() { + return p -> Optional.ofNullable(p.getParent()).orElse(Path.of(".")); + } }