diff --git a/.dev/dev.yaml b/.dev/dev.yaml index 47149ed92..555e303d6 100644 --- a/.dev/dev.yaml +++ b/.dev/dev.yaml @@ -3,28 +3,6 @@ name: "kafbat-ui-dev" services: - kafbat-ui: - container_name: kafbat-ui - image: ghcr.io/kafbat/kafka-ui:latest - ports: - - 8080:8080 - depends_on: - - kafka0 - - schema-registry0 - - kafka-connect0 - - ksqldb0 - environment: - KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092 - KAFKA_CLUSTERS_0_METRICS_PORT: 9997 - KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry0:8085 - KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first - KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083 - KAFKA_CLUSTERS_0_KSQLDBSERVER: http://ksqldb0:8088 - DYNAMIC_CONFIG_ENABLED: 'true' - KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true' - KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true' - kafka0: image: confluentinc/cp-kafka:7.8.0 hostname: kafka0 @@ -48,8 +26,8 @@ services: KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' KAFKA_JMX_PORT: 9997 -# KAFKA_JMX_HOSTNAME: localhost # uncomment this line and comment the next one if running with kafka-ui as a jar - KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997 + KAFKA_JMX_HOSTNAME: localhost # uncomment this line and comment the next one if running with kafka-ui as a jar +# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997 CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' schema-registry0: @@ -91,7 +69,9 @@ services: CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0 - CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/kafka/plugins,/usr/share/filestream-connectors" + CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/share/filestream-connectors,/tmp/connectors" + volumes: + - /tmp/connectors:/tmp/connectors ksqldb0: image: confluentinc/cp-ksqldb-server:7.8.0 diff --git a/api/src/main/java/io/kafbat/ui/config/auth/RoleBasedAccessControlProperties.java b/api/src/main/java/io/kafbat/ui/config/auth/RoleBasedAccessControlProperties.java index 8ecf12b99..ee8f65c58 100644 --- a/api/src/main/java/io/kafbat/ui/config/auth/RoleBasedAccessControlProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/auth/RoleBasedAccessControlProperties.java @@ -10,6 +10,8 @@ public class RoleBasedAccessControlProperties { private final List roles = new ArrayList<>(); +// private String haha; + @PostConstruct public void init() { diff --git a/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java b/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java new file mode 100644 index 000000000..e75117720 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java @@ -0,0 +1,149 @@ +package io.kafbat.ui.service.app; + +import io.kafbat.ui.config.auth.RoleBasedAccessControlProperties; +import io.kafbat.ui.service.rbac.AccessControlService; +import io.kafbat.ui.util.MultiFileWatcher; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.LinkedHashSet; +import java.util.Objects; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import lombok.Cleanup; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry; +import org.springframework.boot.context.properties.bind.Binder; +import org.springframework.boot.env.OriginTrackedMapPropertySource; +import org.springframework.boot.origin.Origin; +import org.springframework.boot.origin.OriginTrackedValue; +import org.springframework.boot.origin.TextResourceOrigin; +import org.springframework.context.ApplicationContext; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.MutablePropertySources; +import org.springframework.core.env.PropertiesPropertySource; +import org.springframework.core.env.PropertySource; +import org.springframework.core.io.Resource; +import org.springframework.stereotype.Service; + +@Service +//@ConditionalOnProperty(value = "dynamic.config.autoreload", havingValue = "true") +@RequiredArgsConstructor +@Slf4j +public class ConfigReloadService { + + private static final String THREAD_NAME = "config-watcher-thread"; + + private final ConfigurableEnvironment environment; + private final ApplicationContext appContext; + + private Thread watcherThread; + private MultiFileWatcher multiFileWatcher; + + private final ObjectProvider accessControlService; + private final ObjectProvider roleBasedAccessControlProperties; + + @PostConstruct + public void init() { + var propertySourcePaths = StreamSupport.stream(environment.getPropertySources().spliterator(), false) + .filter(OriginTrackedMapPropertySource.class::isInstance) + .map(OriginTrackedMapPropertySource.class::cast) + .flatMap(ps -> ps.getSource().values().stream()) + .map(v -> (v instanceof OriginTrackedValue otv) ? otv.getOrigin() : null) + .filter(Objects::nonNull) + .flatMap(o -> Stream.iterate(o, Objects::nonNull, Origin::getParent)) + .filter(TextResourceOrigin.class::isInstance) + .map(TextResourceOrigin.class::cast) + .map(TextResourceOrigin::getResource) + .filter(Objects::nonNull) + .filter(Resource::exists) + .filter(Resource::isReadable) + .filter(Resource::isFile) + .map(r -> { + try { + return r.getURI(); + } catch (IOException e) { + log.error("can't retrieve resource URL", e); + return null; + } + }) + .filter(Objects::nonNull) + .map(Paths::get) + .collect(Collectors.toCollection(LinkedHashSet::new)); + + if (propertySourcePaths.isEmpty()) { + log.debug("No config files found, auto reload is disabled"); + return; + } + + log.debug("Auto reload is enabled, will watch for config changes"); + + try { + this.multiFileWatcher = new MultiFileWatcher(propertySourcePaths, path -> { + System.out.println(path); + var propertySources = environment.getPropertySources(); + + + + Properties properties = new Properties(); + try { + @Cleanup InputStream inputStream = Files.newInputStream(Paths.get("/tmp/kek.yaml")); + properties.load(inputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + + PropertySource origin = + propertySources.stream().filter(ps -> ps.getName().contains("tmp/kek")).findFirst().get(); + environment.getPropertySources().replace(origin.getName(), new PropertiesPropertySource(origin.getName(), properties)); + + System.out.println(); + var kekw = appContext.getBean(AccessControlService.class); + return null; + }); + this.watcherThread = new Thread(multiFileWatcher::watchLoop, THREAD_NAME); + this.watcherThread.start(); + } catch (IOException e) { + log.error("Error while registering watch service", e); + } + } + + @PreDestroy + public void shutdown() { + try { + if (multiFileWatcher != null) { + multiFileWatcher.close(); + } + } catch (IOException ignored) { + } + if (watcherThread != null) { + this.watcherThread.interrupt(); + } + } + + private void reload() { + var registry = (DefaultSingletonBeanRegistry) appContext.getAutowireCapableBeanFactory(); + + registry.destroySingleton("AccessControlService"); + + Binder.get(environment) + .bind("rbac", RoleBasedAccessControlProperties.class) + .orElseThrow(() -> new IllegalStateException("no rbac config")); + + var newProps = appContext.getBean(AccessControlService.class); + newProps.init(); +// accessControlService.init(); + System.out.println(); + + + } + + +} diff --git a/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java b/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java index 0686de2c4..dd24636a6 100644 --- a/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java +++ b/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java @@ -30,9 +30,7 @@ import org.springframework.http.codec.multipart.FilePart; import org.springframework.stereotype.Component; import org.yaml.snakeyaml.DumperOptions; -import org.yaml.snakeyaml.LoaderOptions; import org.yaml.snakeyaml.Yaml; -import org.yaml.snakeyaml.constructor.Constructor; import org.yaml.snakeyaml.introspector.BeanAccess; import org.yaml.snakeyaml.introspector.Property; import org.yaml.snakeyaml.introspector.PropertyUtils; @@ -67,7 +65,7 @@ public boolean dynamicConfigEnabled() { return "true".equalsIgnoreCase(ctx.getEnvironment().getProperty(DYNAMIC_CONFIG_ENABLED_ENV_PROPERTY)); } - private Path dynamicConfigFilePath() { + public Path dynamicConfigFilePath() { return Paths.get( Optional.ofNullable(ctx.getEnvironment().getProperty(DYNAMIC_CONFIG_PATH_ENV_PROPERTY)) .orElse(DYNAMIC_CONFIG_PATH_ENV_PROPERTY_DEFAULT) @@ -79,7 +77,7 @@ public Optional> loadDynamicPropertySource() { if (dynamicConfigEnabled()) { Path configPath = dynamicConfigFilePath(); if (!Files.exists(configPath) || !Files.isReadable(configPath)) { - log.warn("Dynamic config file {} doesnt exist or not readable", configPath); + log.warn("Dynamic config file {} doesnt exist or is not readable", configPath); return Optional.empty(); } var propertySource = new CompositePropertySource("dynamicProperties"); diff --git a/api/src/main/java/io/kafbat/ui/util/MultiFileWatcher.java b/api/src/main/java/io/kafbat/ui/util/MultiFileWatcher.java new file mode 100644 index 000000000..7d31023e9 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/util/MultiFileWatcher.java @@ -0,0 +1,158 @@ +package io.kafbat.ui.util; + +import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; +import static java.nio.file.StandardWatchEventKinds.OVERFLOW; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.ClosedWatchServiceException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import org.springframework.util.Assert; + +@Slf4j +public final class MultiFileWatcher implements AutoCloseable { + + private static final long DEBOUNCE_MS = Duration.ofMillis(1000).toMillis(); + + private final WatchService watchService = FileSystems.getDefault().newWatchService(); + private final Set watchedFiles = ConcurrentHashMap.newKeySet(); + private final Map watchDirsByKey = new HashMap<>(); + private final Function reloader; + + private long lastTriggerAt = 0; + + public MultiFileWatcher(Collection filesToWatch, Function reloader) throws IOException { + Assert.notNull(reloader, "reloader must not be null"); + this.reloader = reloader; + + if (filesToWatch.isEmpty()) { + log.warn("No files to watch, aborting"); + } + + watchedFiles.addAll(filesToWatch.stream() + .map(p -> { + try { + return Files.exists(p) ? p.toRealPath() : p.toAbsolutePath().normalize(); + } catch (IOException e) { + return p.toAbsolutePath().normalize(); + } + }) + .toList()); + + if (watchedFiles.isEmpty()) { + log.warn("No files to watch resolved, aborting"); + return; + } + + log.debug("Going to watch {} files", watchedFiles.size()); + log.trace("Watching files: {}", watchedFiles.stream().map(Path::toString).toList()); + + watchedFiles.stream() +// .map(getParentPath()) + .distinct() + .forEach(file -> { + try { + var key = getParentPath().apply(file).register(watchService, + ENTRY_MODIFY, + ENTRY_CREATE, ENTRY_DELETE // watch these for atomic replacements + ); + watchDirsByKey.put(key, file); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + log.trace("Watching directories: {}", watchDirsByKey.values().stream().map(a -> getParentPath().apply(a)).map(Path::toString).toList()); + } + + public void watchLoop() { + while (true) { + try { + var key = watchService.take(); + var dir = getParentPath().apply(watchDirsByKey.get(key)); + if (dir == null) { + continue; + } + + var hit = key.pollEvents() + .stream() + .filter(ev -> ev.kind() != OVERFLOW) + .map(ev -> dir.resolve((Path) ev.context()).normalize().toAbsolutePath()) + .anyMatch(this::matchesTarget); + + if (hit && shouldTrigger()) { + var filePath = watchDirsByKey.get(key); // TODO + reloader.apply(filePath); + } + + if (!key.reset()) { + watchDirsByKey.remove(key); + } + if (watchDirsByKey.isEmpty()) { + break; + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } catch (ClosedWatchServiceException e) { + log.trace("Watch service closed, exiting watcher thread"); + break; + } catch (Exception e) { + log.error("Error while calling the reloader", e); + break; + } + } + } + + private boolean matchesTarget(Path changed) { + if (watchedFiles.contains(changed)) { + return true; + } + try { + return watchedFiles.contains(changed.toRealPath()); + } catch (IOException ignored) { + return false; + } + } + + private boolean shouldTrigger() { + var now = System.currentTimeMillis(); + + if (now - lastTriggerAt < DEBOUNCE_MS) { + return false; + } + + lastTriggerAt = now; + return true; + } + + @Override + public void close() throws IOException { + watchService.close(); + } + + @NotNull + private static Function getParentPath() { + return p -> Optional.ofNullable(p.getParent()).orElse(Path.of(".")); + } +} + diff --git a/api/src/main/resources/application-local.yml b/api/src/main/resources/application-local.yml index 0c40ff079..996291a80 100644 --- a/api/src/main/resources/application-local.yml +++ b/api/src/main/resources/application-local.yml @@ -22,7 +22,8 @@ spring: user-filter-search-filter: "(&(uid={0})(objectClass=inetOrgPerson))" group-filter-search-base: "ou=people,dc=planetexpress,dc=com" -kafka: + +kaf1ka: clusters: - name: local bootstrapServers: localhost:9092 @@ -81,7 +82,7 @@ auth: type: github rbac: - roles: + ro1les: - name: "memelords" clusters: - local diff --git a/frontend/build.gradle b/frontend/build.gradle index dc982ad73..3efb5331a 100644 --- a/frontend/build.gradle +++ b/frontend/build.gradle @@ -27,14 +27,3 @@ tasks.register('generateContract', PnpmTask) { outputs.dir(project.layout.projectDirectory.dir("src/generated-sources")) args = ['gen:sources'] } - -tasks.register('buildFrontend', PnpmTask) { - dependsOn generateContract - inputs.files(fileTree("src/")) - outputs.dir(project.layout.buildDirectory.dir("vite/static")) - args = ['build'] - environment = System.getenv() + [ - "VITE_TAG" : project.version, - "VITE_COMMIT": "git rev-parse --short HEAD".execute().text.trim() - ] -}