Skip to content

Commit 8fd6286

Browse files
committed
[FLINK-37504] Refactor + Unit test LocalFSWatchSingleton
1 parent 71313d4 commit 8fd6286

File tree

12 files changed

+762
-91
lines changed

12 files changed

+762
-91
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.core.security.watch;
20+
21+
import java.io.IOException;
22+
import java.nio.file.Path;
23+
24+
public interface LocalFSDirectoryWatcher {
25+
26+
void registerDirectory(Path[] dirsToWatch, LocalFSWatchServiceListener listener)
27+
throws IOException;
28+
}

flink-core/src/main/java/org/apache/flink/core/security/watch/LocalFSWatchService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.flink.core.security.watch;
2020

21+
import org.apache.flink.annotation.VisibleForTesting;
22+
2123
import org.slf4j.Logger;
2224
import org.slf4j.LoggerFactory;
2325

@@ -26,6 +28,7 @@
2628
import java.nio.file.WatchKey;
2729
import java.nio.file.WatchService;
2830
import java.util.Map;
31+
import java.util.concurrent.atomic.AtomicBoolean;
2932

3033
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
3134
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
@@ -35,8 +38,11 @@
3538
public class LocalFSWatchService extends Thread {
3639
private static final Logger LOG = LoggerFactory.getLogger(LocalFSWatchService.class);
3740

41+
@VisibleForTesting AtomicBoolean running = new AtomicBoolean(false);
42+
3843
public void run() {
3944
try {
45+
running.set(true);
4046
while (true) {
4147
for (Map.Entry<WatchService, LocalFSWatchServiceListener> entry :
4248
LocalFSWatchSingleton.getInstance().watchers.entrySet()) {
@@ -47,8 +53,6 @@ public void run() {
4753
}
4854
LOG.debug("Watch key arrived");
4955
for (WatchEvent<?> watchEvent : watchKey.pollEvents()) {
50-
System.out.println(watchEvent.kind());
51-
System.out.println(watchEvent.context());
5256
if (watchEvent.kind() == OVERFLOW) {
5357
LOG.error("Filesystem events may have been lost or discarded");
5458
Thread.yield();
@@ -67,6 +71,7 @@ public void run() {
6771
}
6872
} catch (Exception e) {
6973
LOG.error("Filesystem watcher received exception and stopped: ", e);
74+
running.set(false);
7075
throw new RuntimeException(e);
7176
}
7277
}
Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
1-
package org.apache.flink.core.security.watch;
2-
31
/*
4-
* Licensed to the Apache Software Foundation (ASF) under one or more
5-
* contributor license agreements. See the NOTICE file distributed with
6-
* this work for additional information regarding copyright ownership.
7-
* The ASF licenses this file to You under the Apache License, Version 2.0
8-
* (the "License"); you may not use this file except in compliance with
9-
* the License. You may obtain a copy of the License at
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
109
*
11-
* http://www.apache.org/licenses/LICENSE-2.0
10+
* http://www.apache.org/licenses/LICENSE-2.0
1211
*
1312
* Unless required by applicable law or agreed to in writing, software
1413
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,14 +16,79 @@
1716
* limitations under the License.
1817
*/
1918

19+
package org.apache.flink.core.security.watch;
20+
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
2024
import java.nio.file.Path;
25+
import java.util.concurrent.atomic.AtomicReference;
2126

2227
public interface LocalFSWatchServiceListener {
28+
29+
enum ReloadState {
30+
CLEAN, // Context is up to date
31+
DIRTY, // Context needs reloading
32+
RELOADING // Context is currently being reloaded
33+
}
34+
35+
@FunctionalInterface
36+
interface ContextLoader {
37+
38+
void loadContext() throws Exception;
39+
}
40+
41+
Logger LOG = LoggerFactory.getLogger(LocalFSWatchServiceListener.class);
42+
43+
/**
44+
* Get the current reload state. Implementations should provide their own state management.
45+
*
46+
* @return the current reload state
47+
*/
48+
AtomicReference<ReloadState> getReloadStateReference();
49+
2350
default void onWatchStarted(Path realDirectoryPath) {}
2451

2552
default void onFileOrDirectoryCreated(Path relativePath) {}
2653

2754
default void onFileOrDirectoryDeleted(Path relativePath) {}
2855

29-
default void onFileOrDirectoryModified(Path relativePath) {}
56+
default void onFileOrDirectoryModified(Path relativePath) {
57+
getReloadStateReference().compareAndSet(ReloadState.CLEAN, ReloadState.DIRTY);
58+
}
59+
60+
default boolean reloadContextIfNeeded(ContextLoader loader) {
61+
AtomicReference<ReloadState> reloadState = getReloadStateReference();
62+
// Only one thread can transition from DIRTY to RELOADING
63+
if (reloadState.compareAndSet(ReloadState.DIRTY, ReloadState.RELOADING)) {
64+
try {
65+
loader.loadContext();
66+
// Successfully loaded, mark as clean
67+
reloadState.set(ReloadState.CLEAN);
68+
return true;
69+
} catch (Exception e) {
70+
LOG.warn("Failed to reload context", e);
71+
// Failed to load, mark as dirty for retry
72+
reloadState.set(ReloadState.DIRTY);
73+
}
74+
}
75+
return false;
76+
// If state is CLEAN, do nothing
77+
// If state is RELOADING, another thread is handling it, so we can proceed with current
78+
// context
79+
}
80+
81+
/**
82+
* Abstract base class that provides a default implementation of LocalFSWatchServiceListener
83+
* with instance-level reload state management.
84+
*/
85+
abstract class AbstractLocalFSWatchServiceListener implements LocalFSWatchServiceListener {
86+
private final AtomicReference<ReloadState> reloadState =
87+
new AtomicReference<>(ReloadState.CLEAN);
88+
89+
@Override
90+
public final AtomicReference<ReloadState> getReloadStateReference() {
91+
return reloadState;
92+
}
93+
}
3094
}

flink-core/src/main/java/org/apache/flink/core/security/watch/LocalFSWatchSingleton.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,18 @@
2828
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
2929
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
3030

31-
public final class LocalFSWatchSingleton {
31+
public final class LocalFSWatchSingleton implements LocalFSDirectoryWatcher {
3232
// The field must be declared volatile so that double check lock would work
3333
// correctly.
34-
private static volatile LocalFSWatchSingleton instance;
34+
private static volatile LocalFSDirectoryWatcher instance;
3535

3636
ConcurrentHashMap<WatchService, LocalFSWatchServiceListener> watchers =
3737
new ConcurrentHashMap<>();
3838

3939
private LocalFSWatchSingleton() {}
4040

4141
public static LocalFSWatchSingleton getInstance() {
42-
LocalFSWatchSingleton result = instance;
42+
LocalFSDirectoryWatcher result = instance;
4343
if (result != null) {
4444
return result;
4545
}
@@ -51,15 +51,16 @@ public static LocalFSWatchSingleton getInstance() {
5151
}
5252
}
5353

54-
public void registerPath(Path[] pathsToWatch, LocalFSWatchServiceListener callback)
54+
@Override
55+
public void registerDirectory(Path[] dirsToWatch, LocalFSWatchServiceListener listener)
5556
throws IOException {
5657

5758
WatchService watcher = FileSystems.getDefault().newWatchService();
58-
for (Path pathToWatch : pathsToWatch) {
59+
for (Path pathToWatch : dirsToWatch) {
5960
Path realDirectoryPath = pathToWatch.toRealPath();
6061
realDirectoryPath.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
6162
}
62-
callback.onWatchStarted(pathsToWatch[0]);
63-
watchers.put(watcher, callback);
63+
listener.onWatchStarted(dirsToWatch[0]);
64+
watchers.put(watcher, listener);
6465
}
6566
}

0 commit comments

Comments
 (0)