Skip to content

BE: Implement app restart on dynamic config change #1151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 5 additions & 25 deletions .dev/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,6 @@ name: "kafbat-ui-dev"

services:

kafbat-ui:
container_name: kafbat-ui
image: ghcr.io/kafbat/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- kafka0
- schema-registry0
- kafka-connect0
- ksqldb0
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry0:8085
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
KAFKA_CLUSTERS_0_KSQLDBSERVER: http://ksqldb0:8088
DYNAMIC_CONFIG_ENABLED: 'true'
KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true'
KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'

kafka0:
image: confluentinc/cp-kafka:7.8.0
hostname: kafka0
Expand All @@ -48,8 +26,8 @@ services:
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_JMX_PORT: 9997
# KAFKA_JMX_HOSTNAME: localhost # uncomment this line and comment the next one if running with kafka-ui as a jar
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
KAFKA_JMX_HOSTNAME: localhost # uncomment this line and comment the next one if running with kafka-ui as a jar
# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

schema-registry0:
Expand Down Expand Up @@ -91,7 +69,9 @@ services:
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/kafka/plugins,/usr/share/filestream-connectors"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/share/filestream-connectors,/tmp/connectors"
volumes:
- /tmp/connectors:/tmp/connectors

ksqldb0:
image: confluentinc/cp-ksqldb-server:7.8.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
public class RoleBasedAccessControlProperties {

private final List<Role> roles = new ArrayList<>();
// private String haha;


@PostConstruct
public void init() {
Expand Down
149 changes: 149 additions & 0 deletions api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package io.kafbat.ui.service.app;

import io.kafbat.ui.config.auth.RoleBasedAccessControlProperties;
import io.kafbat.ui.service.rbac.AccessControlService;
import io.kafbat.ui.util.MultiFileWatcher;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import lombok.Cleanup;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.env.OriginTrackedMapPropertySource;
import org.springframework.boot.origin.Origin;
import org.springframework.boot.origin.OriginTrackedValue;
import org.springframework.boot.origin.TextResourceOrigin;
import org.springframework.context.ApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MutablePropertySources;
import org.springframework.core.env.PropertiesPropertySource;
import org.springframework.core.env.PropertySource;
import org.springframework.core.io.Resource;
import org.springframework.stereotype.Service;

@Service
//@ConditionalOnProperty(value = "dynamic.config.autoreload", havingValue = "true")
@RequiredArgsConstructor
@Slf4j
public class ConfigReloadService {

private static final String THREAD_NAME = "config-watcher-thread";

private final ConfigurableEnvironment environment;
private final ApplicationContext appContext;

private Thread watcherThread;
private MultiFileWatcher multiFileWatcher;

private final ObjectProvider<AccessControlService> accessControlService;
private final ObjectProvider<RoleBasedAccessControlProperties> roleBasedAccessControlProperties;

@PostConstruct
public void init() {
var propertySourcePaths = StreamSupport.stream(environment.getPropertySources().spliterator(), false)
.filter(OriginTrackedMapPropertySource.class::isInstance)
.map(OriginTrackedMapPropertySource.class::cast)
.flatMap(ps -> ps.getSource().values().stream())
.map(v -> (v instanceof OriginTrackedValue otv) ? otv.getOrigin() : null)
.filter(Objects::nonNull)
.flatMap(o -> Stream.iterate(o, Objects::nonNull, Origin::getParent))
.filter(TextResourceOrigin.class::isInstance)
.map(TextResourceOrigin.class::cast)
.map(TextResourceOrigin::getResource)
.filter(Objects::nonNull)
.filter(Resource::exists)
.filter(Resource::isReadable)
.filter(Resource::isFile)
.map(r -> {
try {
return r.getURI();
} catch (IOException e) {
log.error("can't retrieve resource URL", e);
return null;
}
})
.filter(Objects::nonNull)
.map(Paths::get)
.collect(Collectors.toCollection(LinkedHashSet::new));

if (propertySourcePaths.isEmpty()) {
log.debug("No config files found, auto reload is disabled");
return;
}

log.debug("Auto reload is enabled, will watch for config changes");

try {
this.multiFileWatcher = new MultiFileWatcher(propertySourcePaths, path -> {
System.out.println(path);
var propertySources = environment.getPropertySources();



Properties properties = new Properties();
try {
@Cleanup InputStream inputStream = Files.newInputStream(Paths.get("/tmp/kek.yaml"));
properties.load(inputStream);
} catch (IOException e) {
throw new RuntimeException(e);
}

PropertySource<?> origin =
propertySources.stream().filter(ps -> ps.getName().contains("tmp/kek")).findFirst().get();
environment.getPropertySources().replace(origin.getName(), new PropertiesPropertySource(origin.getName(), properties));

System.out.println();
var kekw = appContext.getBean(AccessControlService.class);
return null;
});
this.watcherThread = new Thread(multiFileWatcher::watchLoop, THREAD_NAME);
this.watcherThread.start();
} catch (IOException e) {
log.error("Error while registering watch service", e);
}
}

@PreDestroy
public void shutdown() {
try {
if (multiFileWatcher != null) {
multiFileWatcher.close();
}
} catch (IOException ignored) {
}
if (watcherThread != null) {
this.watcherThread.interrupt();
}
}

private void reload() {
var registry = (DefaultSingletonBeanRegistry) appContext.getAutowireCapableBeanFactory();

registry.destroySingleton("AccessControlService");

Binder.get(environment)
.bind("rbac", RoleBasedAccessControlProperties.class)
.orElseThrow(() -> new IllegalStateException("no rbac config"));

var newProps = appContext.getBean(AccessControlService.class);
newProps.init();
// accessControlService.init();
System.out.println();


}


}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.stereotype.Component;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.LoaderOptions;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.Constructor;
import org.yaml.snakeyaml.introspector.BeanAccess;
import org.yaml.snakeyaml.introspector.Property;
import org.yaml.snakeyaml.introspector.PropertyUtils;
Expand Down Expand Up @@ -67,7 +65,7 @@ public boolean dynamicConfigEnabled() {
return "true".equalsIgnoreCase(ctx.getEnvironment().getProperty(DYNAMIC_CONFIG_ENABLED_ENV_PROPERTY));
}

private Path dynamicConfigFilePath() {
public Path dynamicConfigFilePath() {
return Paths.get(
Optional.ofNullable(ctx.getEnvironment().getProperty(DYNAMIC_CONFIG_PATH_ENV_PROPERTY))
.orElse(DYNAMIC_CONFIG_PATH_ENV_PROPERTY_DEFAULT)
Expand All @@ -79,7 +77,7 @@ public Optional<PropertySource<?>> loadDynamicPropertySource() {
if (dynamicConfigEnabled()) {
Path configPath = dynamicConfigFilePath();
if (!Files.exists(configPath) || !Files.isReadable(configPath)) {
log.warn("Dynamic config file {} doesnt exist or not readable", configPath);
log.warn("Dynamic config file {} doesnt exist or is not readable", configPath);
return Optional.empty();
}
var propertySource = new CompositePropertySource("dynamicProperties");
Expand Down
158 changes: 158 additions & 0 deletions api/src/main/java/io/kafbat/ui/util/MultiFileWatcher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package io.kafbat.ui.util;

import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.util.Assert;

@Slf4j
public final class MultiFileWatcher implements AutoCloseable {

private static final long DEBOUNCE_MS = Duration.ofMillis(1000).toMillis();

private final WatchService watchService = FileSystems.getDefault().newWatchService();
private final Set<Path> watchedFiles = ConcurrentHashMap.newKeySet();
private final Map<WatchKey, Path> watchDirsByKey = new HashMap<>();
private final Function<Path, Void> reloader;

private long lastTriggerAt = 0;

public MultiFileWatcher(Collection<Path> filesToWatch, Function<Path, Void> reloader) throws IOException {
Assert.notNull(reloader, "reloader must not be null");
this.reloader = reloader;

if (filesToWatch.isEmpty()) {
log.warn("No files to watch, aborting");
}

watchedFiles.addAll(filesToWatch.stream()
.map(p -> {
try {
return Files.exists(p) ? p.toRealPath() : p.toAbsolutePath().normalize();
} catch (IOException e) {
return p.toAbsolutePath().normalize();
}
})
.toList());

if (watchedFiles.isEmpty()) {
log.warn("No files to watch resolved, aborting");
return;
}

log.debug("Going to watch {} files", watchedFiles.size());
log.trace("Watching files: {}", watchedFiles.stream().map(Path::toString).toList());

watchedFiles.stream()
// .map(getParentPath())
.distinct()
.forEach(file -> {
try {
var key = getParentPath().apply(file).register(watchService,
ENTRY_MODIFY,
ENTRY_CREATE, ENTRY_DELETE // watch these for atomic replacements
);
watchDirsByKey.put(key, file);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});

log.trace("Watching directories: {}", watchDirsByKey.values().stream().map(a -> getParentPath().apply(a)).map(Path::toString).toList());
}

public void watchLoop() {
while (true) {
try {
var key = watchService.take();
var dir = getParentPath().apply(watchDirsByKey.get(key));
if (dir == null) {
continue;
}

var hit = key.pollEvents()
.stream()
.filter(ev -> ev.kind() != OVERFLOW)
.map(ev -> dir.resolve((Path) ev.context()).normalize().toAbsolutePath())
.anyMatch(this::matchesTarget);

if (hit && shouldTrigger()) {
var filePath = watchDirsByKey.get(key); // TODO
reloader.apply(filePath);
}

if (!key.reset()) {
watchDirsByKey.remove(key);
}
if (watchDirsByKey.isEmpty()) {
break;
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (ClosedWatchServiceException e) {
log.trace("Watch service closed, exiting watcher thread");
break;
} catch (Exception e) {
log.error("Error while calling the reloader", e);
break;
}
}
}

private boolean matchesTarget(Path changed) {
if (watchedFiles.contains(changed)) {
return true;
}
try {
return watchedFiles.contains(changed.toRealPath());
} catch (IOException ignored) {
return false;
}
}

private boolean shouldTrigger() {
var now = System.currentTimeMillis();

if (now - lastTriggerAt < DEBOUNCE_MS) {
return false;
}

lastTriggerAt = now;
return true;
}

@Override
public void close() throws IOException {
watchService.close();
}

@NotNull
private static Function<Path, Path> getParentPath() {
return p -> Optional.ofNullable(p.getParent()).orElse(Path.of("."));
}
}

Loading
Loading