Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,15 @@ public static Configuration forProvider(Configuration configuration, String prov
+ "forcibly. (-1 = use system default)")
.withDeprecatedKeys("security.ssl.close-notify-flush-timeout");

/** Indicate if changes on keystore/truststore should leads to reload of the certificate. */
@Documentation.Section(Documentation.Sections.SECURITY_SSL)
public static final ConfigOption<Boolean> SSL_RELOAD =
key("security.ssl.reload")
.booleanType()
.defaultValue(false)
.withDescription(
"If enabled, the application will monitor the keystore and truststore files for any changes. When a change is detected, internal network components (like Netty, Pekko, or BlobServer) will automatically reload the keystore/truststore certificates.");

/**
* Checks whether SSL for internal communication (rpc, data transport, blob server) is enabled.
*/
Expand All @@ -635,4 +644,9 @@ public static boolean isRestSSLAuthenticationEnabled(Configuration sslConfig) {
checkNotNull(sslConfig, "sslConfig");
return isRestSSLEnabled(sslConfig) && sslConfig.get(SSL_REST_AUTHENTICATION_ENABLED);
}

/** Checks whether certificates must be reloaded in case of keytstore or trusttore changes. */
public static boolean isReloadCertificate(Configuration sslConfig) {
return sslConfig.get(SSL_RELOAD);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.security.watch;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.WatchService;
import java.util.Map;
import java.util.Set;

public interface LocalFSDirectoryWatcher {

Set<Map.Entry<WatchService, LocalFSWatchServiceListener>> getWatchers();

void registerDirectory(Path[] dirsToWatch, LocalFSWatchServiceListener listener)
throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.security.watch;

import org.apache.flink.annotation.VisibleForTesting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Path;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;

public class LocalFSWatchService extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(LocalFSWatchService.class);

@VisibleForTesting AtomicBoolean running = new AtomicBoolean(false);

public void run() {
try {
running.set(true);
while (true) {
for (Map.Entry<WatchService, LocalFSWatchServiceListener> entry :
LocalFSWatchSingleton.getInstance().getWatchers()) {
LOG.debug("Taking watch key");
WatchKey watchKey = entry.getKey().poll();
if (watchKey == null) {
continue;
}
LOG.debug("Watch key arrived");
for (WatchEvent<?> watchEvent : watchKey.pollEvents()) {
if (watchEvent.kind() == OVERFLOW) {
LOG.error("Filesystem events may have been lost or discarded");
Thread.yield();
} else if (watchEvent.kind() == ENTRY_CREATE) {
entry.getValue().onFileOrDirectoryCreated((Path) watchEvent.context());
} else if (watchEvent.kind() == ENTRY_DELETE) {
entry.getValue().onFileOrDirectoryDeleted((Path) watchEvent.context());
} else if (watchEvent.kind() == ENTRY_MODIFY) {
entry.getValue().onFileOrDirectoryModified((Path) watchEvent.context());
} else {
LOG.warn("Unhandled watch event {}", watchEvent.kind());
}
}
watchKey.reset();
}
}
} catch (Exception e) {
LOG.error("Filesystem watcher received exception and stopped: ", e);
running.set(false);
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.security.watch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicReference;

public interface LocalFSWatchServiceListener {

enum ReloadState {
CLEAN, // Context is up to date
DIRTY, // Context needs reloading
RELOADING // Context is currently being reloaded
}

@FunctionalInterface
interface ContextLoader {

void loadContext() throws Exception;
}

Logger LOG = LoggerFactory.getLogger(LocalFSWatchServiceListener.class);

/**
* Get the current reload state. Implementations should provide their own state management.
*
* @return the current reload state
*/
AtomicReference<ReloadState> getReloadStateReference();

default void onWatchStarted(Path realDirectoryPath) {}

default void onFileOrDirectoryCreated(Path relativePath) {}

default void onFileOrDirectoryDeleted(Path relativePath) {}

default void onFileOrDirectoryModified(Path relativePath) {
getReloadStateReference().compareAndSet(ReloadState.CLEAN, ReloadState.DIRTY);
}

default boolean reloadContextIfNeeded(ContextLoader loader) {
AtomicReference<ReloadState> reloadState = getReloadStateReference();
// Only one thread can transition from DIRTY to RELOADING
if (reloadState.compareAndSet(ReloadState.DIRTY, ReloadState.RELOADING)) {
try {
loader.loadContext();
// Successfully loaded, mark as clean
reloadState.set(ReloadState.CLEAN);
return true;
} catch (Exception e) {
LOG.warn("Failed to reload context", e);
// Failed to load, mark as dirty for retry
reloadState.set(ReloadState.DIRTY);
}
}
return false;
// If state is CLEAN, do nothing
// If state is RELOADING, another thread is handling it, so we can proceed with current
// context
}

/**
* Abstract base class that provides a default implementation of LocalFSWatchServiceListener
* with instance-level reload state management.
*/
abstract class AbstractLocalFSWatchServiceListener implements LocalFSWatchServiceListener {
private final AtomicReference<ReloadState> reloadState =
new AtomicReference<>(ReloadState.CLEAN);

@Override
public final AtomicReference<ReloadState> getReloadStateReference() {
return reloadState;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.security.watch;

import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.WatchService;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;

public final class LocalFSWatchSingleton implements LocalFSDirectoryWatcher {
// The field must be declared volatile so that double check lock would work
// correctly.
private static volatile LocalFSDirectoryWatcher instance;

ConcurrentHashMap<WatchService, LocalFSWatchServiceListener> watchers =
new ConcurrentHashMap<>();

private LocalFSWatchSingleton() {}

public static LocalFSDirectoryWatcher getInstance() {
LocalFSDirectoryWatcher result = instance;
if (result != null) {
return result;
}
synchronized (LocalFSWatchSingleton.class) {
if (instance == null) {
instance = new LocalFSWatchSingleton();
}
return instance;
}
}

public Set<Map.Entry<WatchService, LocalFSWatchServiceListener>> getWatchers() {
return watchers.entrySet();
}

@Override
public void registerDirectory(Path[] dirsToWatch, LocalFSWatchServiceListener listener)
throws IOException {

WatchService watcher = FileSystems.getDefault().newWatchService();
for (Path pathToWatch : dirsToWatch) {
Path realDirectoryPath = pathToWatch.toRealPath();
realDirectoryPath.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
}
listener.onWatchStarted(dirsToWatch[0]);
watchers.put(watcher, listener);
}
}
Loading