From c7027942767e0ca8768a1ac873ffbb5f1a04146d Mon Sep 17 00:00:00 2001 From: Devesh Kumar Singh Date: Fri, 20 Mar 2026 18:30:04 +0530 Subject: [PATCH 1/5] HDDS-14871. DataNode: tolerate per-volume health-check latch timeouts before marking volumes failed. --- .../src/main/resources/ozone-default.xml | 20 +++++ .../statemachine/DatanodeConfiguration.java | 35 +++++++++ .../common/volume/StorageVolume.java | 60 ++++++++++++++ .../common/volume/StorageVolumeChecker.java | 44 ++++++++--- .../volume/TestStorageVolumeHealthChecks.java | 78 +++++++++++++++++++ 5 files changed, 227 insertions(+), 10 deletions(-) diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index dc52800642ae..aea424404d93 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -4972,4 +4972,24 @@ 5m Interval for cleaning up orphan snapshot local data versions corresponding to snapshots + + + hdds.datanode.disk.check.timeout.tolerated + 1 + + Number of consecutive checkAllVolumes latch timeouts that a single + DataNode volume may accumulate before it is marked as FAILED. + + A latch timeout occurs when the volume's async health check does not + complete within hdds.datanode.disk.check.timeout (default 10 minutes). + Common transient causes include kernel I/O scheduler saturation or JVM + GC pressure, neither of which indicates real disk failure. + + With the default value of 1: the first timeout per volume is tolerated + (a WARN is logged); the volume is only marked FAILED if a second + consecutive timeout occurs without an intervening successful check. + + Minimum valid value is 1. + + diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index 41f6d36971ff..e166fb5c9ec6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -61,6 +61,8 @@ public class DatanodeConfiguration extends ReconfigurableConfig { public static final String FAILED_DB_VOLUMES_TOLERATED_KEY = "hdds.datanode.failed.db.volumes.tolerated"; public static final String DISK_CHECK_MIN_GAP_KEY = "hdds.datanode.disk.check.min.gap"; public static final String DISK_CHECK_TIMEOUT_KEY = "hdds.datanode.disk.check.timeout"; + public static final String DISK_CHECK_TIMEOUT_TOLERATED_KEY = + "hdds.datanode.disk.check.timeout.tolerated"; // Minimum space should be left on volume. // Ex: If volume has 1000GB and minFreeSpace is configured as 10GB, @@ -99,6 +101,12 @@ public class DatanodeConfiguration extends ReconfigurableConfig { static final Duration DISK_CHECK_TIMEOUT_DEFAULT = Duration.ofMinutes(10); + /** + * Default number of consecutive latch timeouts tolerated per volume before + * it is marked as failed. Value 0 restores the legacy zero-tolerance behavior. + */ + public static final int DISK_CHECK_TIMEOUT_TOLERATED_DEFAULT = 1; + static final boolean CONTAINER_SCHEMA_V3_ENABLED_DEFAULT = true; static final long ROCKSDB_LOG_MAX_FILE_SIZE_BYTES_DEFAULT = 32 * 1024 * 1024; static final int ROCKSDB_LOG_MAX_FILE_NUM_DEFAULT = 64; @@ -404,6 +412,18 @@ public class DatanodeConfiguration extends ReconfigurableConfig { ) private Duration diskCheckTimeout = DISK_CHECK_TIMEOUT_DEFAULT; + @Config(key = "hdds.datanode.disk.check.timeout.tolerated", + defaultValue = "1", + type = ConfigType.INT, + tags = { DATANODE }, + description = "Number of consecutive checkAllVolumes latch timeouts" + + " that a single volume may accumulate before it is marked as" + + " FAILED. A timeout occurs when the volume's async health check" + + " does not complete within hdds.datanode.disk.check.timeout." + + " Minimum value is 1." + ) + private int diskCheckTimeoutTolerated = DISK_CHECK_TIMEOUT_TOLERATED_DEFAULT; + @Config(key = "hdds.datanode.chunk.data.validation.check", defaultValue = "false", type = ConfigType.BOOLEAN, @@ -688,6 +708,13 @@ public void validate() { diskCheckTimeout = DISK_CHECK_TIMEOUT_DEFAULT; } + if (diskCheckTimeoutTolerated < 1) { + LOG.warn("{} must be >= 1 but was set to {}. Defaulting to {}", + DISK_CHECK_TIMEOUT_TOLERATED_KEY, diskCheckTimeoutTolerated, + DISK_CHECK_TIMEOUT_TOLERATED_DEFAULT); + diskCheckTimeoutTolerated = DISK_CHECK_TIMEOUT_TOLERATED_DEFAULT; + } + if (blockDeleteCommandWorkerInterval.isNegative()) { LOG.warn(BLOCK_DELETE_COMMAND_WORKER_INTERVAL + " must be greater than zero and was set to {}. Defaulting to {}", @@ -907,6 +934,14 @@ public void setDiskCheckTimeout(Duration duration) { diskCheckTimeout = duration; } + public int getDiskCheckTimeoutTolerated() { + return diskCheckTimeoutTolerated; + } + + public void setDiskCheckTimeoutTolerated(int tolerance) { + this.diskCheckTimeoutTolerated = tolerance; + } + public int getBlockDeleteThreads() { return blockDeleteThreads; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java index b39468318311..efc20b987162 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java @@ -113,6 +113,16 @@ public abstract class StorageVolume implements Checkable ioTestSlidingWindow; private int healthCheckFileSize; + /* + Fields used to implement latch-timeout tolerance (Option C). + When checkAllVolumes() times out and this volume has not yet reported a + result, consecutiveTimeoutCount is incremented. The volume is only marked + FAILED by a timeout when consecutiveTimeoutCount > timeoutTolerance. + The counter is reset to 0 each time the volume completes a healthy check. + */ + private final int timeoutTolerance; + private final AtomicInteger consecutiveTimeoutCount; + /** * Type for StorageVolume. */ @@ -164,6 +174,8 @@ protected StorageVolume(Builder b) throws IOException { this.ioTestSlidingWindow = new LinkedList<>(); this.currentIOFailureCount = new AtomicInteger(0); this.healthCheckFileSize = dnConf.getVolumeHealthCheckFileSize(); + this.timeoutTolerance = dnConf.getDiskCheckTimeoutTolerated(); + this.consecutiveTimeoutCount = new AtomicInteger(0); } else { storageDir = new File(b.volumeRootStr); volumeUsage = null; @@ -174,6 +186,8 @@ protected StorageVolume(Builder b) throws IOException { this.ioFailureTolerance = 0; this.conf = null; this.dnConf = null; + this.timeoutTolerance = 0; + this.consecutiveTimeoutCount = new AtomicInteger(0); } this.storageDirStr = storageDir.getAbsolutePath(); } @@ -740,6 +754,52 @@ public synchronized VolumeCheckResult check(@Nullable Boolean unused) return VolumeCheckResult.HEALTHY; } + /** + * Called by {@link StorageVolumeChecker} when {@code checkAllVolumes()} + * latch times out and this volume has not yet reported a result. + * + *

Increments the consecutive latch-timeout counter. Returns {@code true} + * if the counter now exceeds the configured tolerance, meaning the volume + * should be added to the failed set. Returns {@code false} if the timeout + * is still within tolerance and the volume should be spared this round. + * + * @return true if the volume should be considered FAILED; false otherwise. + */ + public synchronized boolean recordCheckTimeout() { + int count = consecutiveTimeoutCount.incrementAndGet(); + if (count > timeoutTolerance) { + LOG.error("Volume {} has exceeded consecutive latch-timeout tolerance" + + " (count: {} > tolerance: {}). Marking FAILED.", + this, count, timeoutTolerance); + return true; + } + LOG.warn("Volume {} health check timed out (consecutive latch timeouts:" + + " {} / tolerance: {}). Disk I/O may be transiently saturated" + + " or a JVM GC burst may have contributed." + + " Volume will be failed if the next check also times out.", + this, count, timeoutTolerance); + return false; + } + + /** + * Resets the consecutive latch-timeout counter to 0. + * + *

Called by {@link StorageVolumeChecker} when a volume completes a + * healthy check round, indicating the transient condition has resolved. + */ + public synchronized void resetTimeoutCount() { + int prev = consecutiveTimeoutCount.getAndSet(0); + if (prev > 0 && LOG.isDebugEnabled()) { + LOG.debug("Volume {} completed healthy check. Consecutive" + + " latch-timeout counter reset from {} to 0.", this, prev); + } + } + + @VisibleForTesting + public int getConsecutiveTimeoutCount() { + return consecutiveTimeoutCount.get(); + } + @Override public int hashCode() { return Objects.hash(storageDir); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java index b48b0dac1180..f83277a64d8a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java @@ -246,18 +246,42 @@ public Set checkAllVolumes( // Wait until our timeout elapses, after which we give up on // the remaining volumes. - if (!latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) { - LOG.warn("checkAllVolumes timed out after {} ms", - maxAllowedTimeForCheckMs); - } + boolean completedOnTime = + latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS); synchronized (this) { - // All volumes that have not been detected as healthy should be - // considered failed. This is a superset of 'failedVolumes'. - // - // Make a copy under the mutex as Sets.difference() returns a view - // of a potentially changing set. - return new HashSet<>(Sets.difference(allVolumes, healthyVolumes)); + if (!completedOnTime) { + LOG.warn("checkAllVolumes timed out after {} ms." + + " Evaluating per-volume latch-timeout tolerance.", + maxAllowedTimeForCheckMs); + } + + // Volumes that explicitly reported FAILED via check() are always + // returned — the IO-failure sliding window in StorageVolume.check() + // already applied its own tolerance. + final Set result = new HashSet<>(failedVolumes); + + // Volumes that completed as healthy: reset their timeout counter so a + // single transient timeout does not combine with an unrelated future one. + for (StorageVolume v : healthyVolumes) { + v.resetTimeoutCount(); + } + + // Volumes still pending (neither healthy nor explicitly failed): + // these timed out. Apply per-volume consecutive-timeout tolerance. + final Set pendingVolumes = + new HashSet<>(Sets.difference(allVolumes, + Sets.union(healthyVolumes, failedVolumes))); + + for (StorageVolume v : pendingVolumes) { + if (v.recordCheckTimeout()) { + // Tolerance exceeded — mark as failed. + result.add(v); + } + // else: within tolerance this round — omit from failed set. + } + + return result; } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java index 1c9b8bec8c8f..bc5683dbd418 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java @@ -19,6 +19,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.File; import java.nio.file.Path; @@ -341,6 +343,82 @@ public void testCorrectDirectoryChecked(StorageVolume.Builder builder) volume.check(false); } + /** + * With the default tolerance of 1, the first simulated latch timeout must + * NOT increment the volume's counter beyond tolerance (counter = 1 which + * is not > 1), so {@code recordCheckTimeout()} returns false. + */ + @ParameterizedTest + @MethodSource("volumeBuilders") + public void testFirstTimeoutIsTolerated(StorageVolume.Builder builder) + throws Exception { + DatanodeConfiguration dnConf = CONF.getObject(DatanodeConfiguration.class); + dnConf.setDiskCheckTimeoutTolerated(1); + CONF.setFromObject(dnConf); + builder.conf(CONF); + StorageVolume volume = builder.build(); + volume.format(CLUSTER_ID); + volume.createTmpDirs(CLUSTER_ID); + + assertEquals(0, volume.getConsecutiveTimeoutCount()); + // First simulated latch timeout: tolerance not exceeded. + assertFalse(volume.recordCheckTimeout(), + "First timeout should be tolerated"); + assertEquals(1, volume.getConsecutiveTimeoutCount()); + } + + /** + * With tolerance = 1, the second consecutive latch timeout must cause + * {@code recordCheckTimeout()} to return true (counter = 2 > 1). + */ + @ParameterizedTest + @MethodSource("volumeBuilders") + public void testSecondConsecutiveTimeoutFails(StorageVolume.Builder builder) + throws Exception { + DatanodeConfiguration dnConf = CONF.getObject(DatanodeConfiguration.class); + dnConf.setDiskCheckTimeoutTolerated(1); + CONF.setFromObject(dnConf); + builder.conf(CONF); + StorageVolume volume = builder.build(); + volume.format(CLUSTER_ID); + volume.createTmpDirs(CLUSTER_ID); + + assertFalse(volume.recordCheckTimeout(), "First timeout should be tolerated"); + assertTrue(volume.recordCheckTimeout(), + "Second consecutive timeout should exceed tolerance and return true"); + assertEquals(2, volume.getConsecutiveTimeoutCount()); + } + + /** + * A successful healthy check resets the timeout counter so a subsequent + * single timeout is tolerated again (not combined with the earlier one). + */ + @ParameterizedTest + @MethodSource("volumeBuilders") + public void testSuccessfulCheckResetsTimeoutCounter(StorageVolume.Builder builder) + throws Exception { + DatanodeConfiguration dnConf = CONF.getObject(DatanodeConfiguration.class); + dnConf.setDiskCheckTimeoutTolerated(1); + CONF.setFromObject(dnConf); + builder.conf(CONF); + StorageVolume volume = builder.build(); + volume.format(CLUSTER_ID); + volume.createTmpDirs(CLUSTER_ID); + + // Simulate one tolerated timeout. + assertFalse(volume.recordCheckTimeout(), "First timeout should be tolerated"); + assertEquals(1, volume.getConsecutiveTimeoutCount()); + + // Simulate a successful check round — counter must reset. + volume.resetTimeoutCount(); + assertEquals(0, volume.getConsecutiveTimeoutCount()); + + // A new single timeout after reset is tolerated again. + assertFalse(volume.recordCheckTimeout(), + "Timeout after successful reset should be tolerated again"); + assertEquals(1, volume.getConsecutiveTimeoutCount()); + } + /** * Asserts that the disk checks are being done on the correct directory for * each volume type. From d91cf3df4f251c7e1d7508964da2b02684c9c0ba Mon Sep 17 00:00:00 2001 From: Devesh Kumar Singh Date: Mon, 23 Mar 2026 10:05:56 +0530 Subject: [PATCH 2/5] HDDS-14871. Fixed test case failure. --- .../org/apache/hadoop/ozone/TestOzoneConfigurationFields.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java index 98ccd8fac8be..c82a825f28da 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java @@ -59,6 +59,9 @@ public void initializeMemberVariables() { xmlPrefixToSkipCompare.add("ipc.client.rpc-timeout.ms"); xmlPropsToSkipCompare.add("ozone.om.leader.election.minimum.timeout" + ".duration"); // Deprecated config + // DatanodeConfiguration is not in configurationClasses; skip its XML entries + xmlPropsToSkipCompare.add( + DatanodeConfiguration.DISK_CHECK_TIMEOUT_TOLERATED_KEY); // Currently replication and type configs moved to server side. configurationPropsToSkipCompare .add(OzoneConfigKeys.OZONE_REPLICATION); From de0a0279406525d21a903c2560e19717b79e0e9d Mon Sep 17 00:00:00 2001 From: Devesh Kumar Singh Date: Tue, 24 Mar 2026 18:20:47 +0530 Subject: [PATCH 3/5] HDDS-14871. Fixed review comments. --- .../src/main/resources/ozone-default.xml | 20 ---- .../statemachine/DatanodeConfiguration.java | 36 ------ .../common/volume/StorageVolume.java | 111 ++++++++---------- .../common/volume/StorageVolumeChecker.java | 33 ++++-- .../volume/TestStorageVolumeHealthChecks.java | 85 ++++++++------ .../ozone/TestOzoneConfigurationFields.java | 3 - 6 files changed, 118 insertions(+), 170 deletions(-) diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index aea424404d93..dc52800642ae 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -4972,24 +4972,4 @@ 5m Interval for cleaning up orphan snapshot local data versions corresponding to snapshots - - - hdds.datanode.disk.check.timeout.tolerated - 1 - - Number of consecutive checkAllVolumes latch timeouts that a single - DataNode volume may accumulate before it is marked as FAILED. - - A latch timeout occurs when the volume's async health check does not - complete within hdds.datanode.disk.check.timeout (default 10 minutes). - Common transient causes include kernel I/O scheduler saturation or JVM - GC pressure, neither of which indicates real disk failure. - - With the default value of 1: the first timeout per volume is tolerated - (a WARN is logged); the volume is only marked FAILED if a second - consecutive timeout occurs without an intervening successful check. - - Minimum valid value is 1. - - diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index e166fb5c9ec6..482dcb6bff5d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -61,9 +61,6 @@ public class DatanodeConfiguration extends ReconfigurableConfig { public static final String FAILED_DB_VOLUMES_TOLERATED_KEY = "hdds.datanode.failed.db.volumes.tolerated"; public static final String DISK_CHECK_MIN_GAP_KEY = "hdds.datanode.disk.check.min.gap"; public static final String DISK_CHECK_TIMEOUT_KEY = "hdds.datanode.disk.check.timeout"; - public static final String DISK_CHECK_TIMEOUT_TOLERATED_KEY = - "hdds.datanode.disk.check.timeout.tolerated"; - // Minimum space should be left on volume. // Ex: If volume has 1000GB and minFreeSpace is configured as 10GB, // In this case when availableSpace is 10GB or below, volume is assumed as full @@ -101,12 +98,6 @@ public class DatanodeConfiguration extends ReconfigurableConfig { static final Duration DISK_CHECK_TIMEOUT_DEFAULT = Duration.ofMinutes(10); - /** - * Default number of consecutive latch timeouts tolerated per volume before - * it is marked as failed. Value 0 restores the legacy zero-tolerance behavior. - */ - public static final int DISK_CHECK_TIMEOUT_TOLERATED_DEFAULT = 1; - static final boolean CONTAINER_SCHEMA_V3_ENABLED_DEFAULT = true; static final long ROCKSDB_LOG_MAX_FILE_SIZE_BYTES_DEFAULT = 32 * 1024 * 1024; static final int ROCKSDB_LOG_MAX_FILE_NUM_DEFAULT = 64; @@ -412,18 +403,6 @@ public class DatanodeConfiguration extends ReconfigurableConfig { ) private Duration diskCheckTimeout = DISK_CHECK_TIMEOUT_DEFAULT; - @Config(key = "hdds.datanode.disk.check.timeout.tolerated", - defaultValue = "1", - type = ConfigType.INT, - tags = { DATANODE }, - description = "Number of consecutive checkAllVolumes latch timeouts" - + " that a single volume may accumulate before it is marked as" - + " FAILED. A timeout occurs when the volume's async health check" - + " does not complete within hdds.datanode.disk.check.timeout." - + " Minimum value is 1." - ) - private int diskCheckTimeoutTolerated = DISK_CHECK_TIMEOUT_TOLERATED_DEFAULT; - @Config(key = "hdds.datanode.chunk.data.validation.check", defaultValue = "false", type = ConfigType.BOOLEAN, @@ -708,13 +687,6 @@ public void validate() { diskCheckTimeout = DISK_CHECK_TIMEOUT_DEFAULT; } - if (diskCheckTimeoutTolerated < 1) { - LOG.warn("{} must be >= 1 but was set to {}. Defaulting to {}", - DISK_CHECK_TIMEOUT_TOLERATED_KEY, diskCheckTimeoutTolerated, - DISK_CHECK_TIMEOUT_TOLERATED_DEFAULT); - diskCheckTimeoutTolerated = DISK_CHECK_TIMEOUT_TOLERATED_DEFAULT; - } - if (blockDeleteCommandWorkerInterval.isNegative()) { LOG.warn(BLOCK_DELETE_COMMAND_WORKER_INTERVAL + " must be greater than zero and was set to {}. Defaulting to {}", @@ -934,14 +906,6 @@ public void setDiskCheckTimeout(Duration duration) { diskCheckTimeout = duration; } - public int getDiskCheckTimeoutTolerated() { - return diskCheckTimeoutTolerated; - } - - public void setDiskCheckTimeoutTolerated(int tolerance) { - this.diskCheckTimeoutTolerated = tolerance; - } - public int getBlockDeleteThreads() { return blockDeleteThreads; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java index efc20b987162..728f5b64ccfd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java @@ -113,16 +113,6 @@ public abstract class StorageVolume implements Checkable ioTestSlidingWindow; private int healthCheckFileSize; - /* - Fields used to implement latch-timeout tolerance (Option C). - When checkAllVolumes() times out and this volume has not yet reported a - result, consecutiveTimeoutCount is incremented. The volume is only marked - FAILED by a timeout when consecutiveTimeoutCount > timeoutTolerance. - The counter is reset to 0 each time the volume completes a healthy check. - */ - private final int timeoutTolerance; - private final AtomicInteger consecutiveTimeoutCount; - /** * Type for StorageVolume. */ @@ -174,8 +164,6 @@ protected StorageVolume(Builder b) throws IOException { this.ioTestSlidingWindow = new LinkedList<>(); this.currentIOFailureCount = new AtomicInteger(0); this.healthCheckFileSize = dnConf.getVolumeHealthCheckFileSize(); - this.timeoutTolerance = dnConf.getDiskCheckTimeoutTolerated(); - this.consecutiveTimeoutCount = new AtomicInteger(0); } else { storageDir = new File(b.volumeRootStr); volumeUsage = null; @@ -186,8 +174,6 @@ protected StorageVolume(Builder b) throws IOException { this.ioFailureTolerance = 0; this.conf = null; this.dnConf = null; - this.timeoutTolerance = 0; - this.consecutiveTimeoutCount = new AtomicInteger(0); } this.storageDirStr = storageDir.getAbsolutePath(); } @@ -722,23 +708,11 @@ public synchronized VolumeCheckResult check(@Nullable Boolean unused) return VolumeCheckResult.HEALTHY; } - // Move the sliding window of IO test results forward 1 by adding the - // latest entry and removing the oldest entry from the window. - // Update the failure counter for the new window. - ioTestSlidingWindow.add(diskChecksPassed); - if (!diskChecksPassed) { - currentIOFailureCount.incrementAndGet(); - } - if (ioTestSlidingWindow.size() > ioTestCount && - Objects.equals(ioTestSlidingWindow.poll(), Boolean.FALSE)) { - currentIOFailureCount.decrementAndGet(); - } - - // If the failure threshold has been crossed, fail the volume without - // further scans. - // Once the volume is failed, it will not be checked anymore. - // The failure counts can be left as is. - if (currentIOFailureCount.get() > ioFailureTolerance) { + // Move the sliding window of IO test results forward 1 and check threshold. + if (advanceIOWindow(diskChecksPassed)) { + // If the failure threshold has been crossed, fail the volume without + // further scans. Once the volume is failed, it will not be checked + // anymore. The failure counts can be left as is. LOG.error("Failed IO test for volume {}: the last {} runs " + "encountered {} out of {} tolerated failures.", this, ioTestSlidingWindow.size(), currentIOFailureCount, @@ -755,49 +729,62 @@ public synchronized VolumeCheckResult check(@Nullable Boolean unused) } /** - * Called by {@link StorageVolumeChecker} when {@code checkAllVolumes()} - * latch times out and this volume has not yet reported a result. + * Called by {@link StorageVolumeChecker} when a volume check times out — + * either because the global {@code checkAllVolumes()} latch expired before + * this volume's async check completed, or because the per-check timeout + * inside {@link ThrottledAsyncChecker} fired. * - *

Increments the consecutive latch-timeout counter. Returns {@code true} - * if the counter now exceeds the configured tolerance, meaning the volume - * should be added to the failed set. Returns {@code false} if the timeout - * is still within tolerance and the volume should be spared this round. + *

Records a synthetic IO-test failure in the existing sliding window, + * making latch timeouts subject to the same {@code ioFailureTolerance} + * threshold as genuine read/write failures. No separate configuration key + * is required: the existing + * {@code hdds.datanode.disk.check.io.failures.tolerated} governs both. * - * @return true if the volume should be considered FAILED; false otherwise. + *

Recovery is automatic: each successful {@link #check} call records a + * {@code true} entry in the window, gradually evicting the synthetic + * failure once {@code ioTestCount} healthy results have accumulated. + * + * @return {@code true} if {@code currentIOFailureCount > ioFailureTolerance}, + * meaning the volume should now be marked FAILED; {@code false} if + * the failure is still within tolerance this round. */ - public synchronized boolean recordCheckTimeout() { - int count = consecutiveTimeoutCount.incrementAndGet(); - if (count > timeoutTolerance) { - LOG.error("Volume {} has exceeded consecutive latch-timeout tolerance" - + " (count: {} > tolerance: {}). Marking FAILED.", - this, count, timeoutTolerance); + public synchronized boolean recordTimeoutAsIOFailure() { + if (advanceIOWindow(false)) { + LOG.error("Volume {} check timed out: IO-failure count ({}) exceeds" + + " tolerance ({}). Marking FAILED.", + this, currentIOFailureCount, ioFailureTolerance); return true; } - LOG.warn("Volume {} health check timed out (consecutive latch timeouts:" - + " {} / tolerance: {}). Disk I/O may be transiently saturated" - + " or a JVM GC burst may have contributed." - + " Volume will be failed if the next check also times out.", - this, count, timeoutTolerance); + LOG.warn("Volume {} check timed out. IO-failure count: {} / tolerance: {}." + + " Volume will not be failed until tolerance is exceeded." + + " Common transient causes: kernel I/O scheduler saturation" + + " or JVM GC pressure.", + this, currentIOFailureCount, ioFailureTolerance); return false; } /** - * Resets the consecutive latch-timeout counter to 0. + * Advances the IO-test sliding window by one entry and updates the rolling + * failure counter. + * + *

Called by both {@link #check} (genuine IO test result) and + * {@link #recordTimeoutAsIOFailure} (synthetic failure for a check timeout), + * keeping the window-update logic in a single place. * - *

Called by {@link StorageVolumeChecker} when a volume completes a - * healthy check round, indicating the transient condition has resolved. + * @param passed {@code true} if the IO test passed; {@code false} otherwise. + * @return {@code true} if {@code currentIOFailureCount} now exceeds + * {@code ioFailureTolerance}; {@code false} if still within bounds. */ - public synchronized void resetTimeoutCount() { - int prev = consecutiveTimeoutCount.getAndSet(0); - if (prev > 0 && LOG.isDebugEnabled()) { - LOG.debug("Volume {} completed healthy check. Consecutive" - + " latch-timeout counter reset from {} to 0.", this, prev); + private boolean advanceIOWindow(boolean passed) { + ioTestSlidingWindow.add(passed); + if (!passed) { + currentIOFailureCount.incrementAndGet(); } - } - - @VisibleForTesting - public int getConsecutiveTimeoutCount() { - return consecutiveTimeoutCount.get(); + if (ioTestSlidingWindow.size() > ioTestCount && + Objects.equals(ioTestSlidingWindow.poll(), Boolean.FALSE)) { + currentIOFailureCount.decrementAndGet(); + } + return currentIOFailureCount.get() > ioFailureTolerance; } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java index f83277a64d8a..b3384514d253 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java @@ -42,6 +42,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hdds.conf.ConfigurationSource; @@ -261,20 +262,18 @@ public Set checkAllVolumes( // already applied its own tolerance. final Set result = new HashSet<>(failedVolumes); - // Volumes that completed as healthy: reset their timeout counter so a - // single transient timeout does not combine with an unrelated future one. - for (StorageVolume v : healthyVolumes) { - v.resetTimeoutCount(); - } - // Volumes still pending (neither healthy nor explicitly failed): - // these timed out. Apply per-volume consecutive-timeout tolerance. + // the latch expired before they reported a result. Record a synthetic + // IO failure in each volume's existing sliding window so latch timeouts + // share the same ioFailureTolerance threshold as genuine IO failures. + // Healthy volumes need no special action: their successful check() call + // already recorded TRUE in the sliding window. final Set pendingVolumes = new HashSet<>(Sets.difference(allVolumes, Sets.union(healthyVolumes, failedVolumes))); for (StorageVolume v : pendingVolumes) { - if (v.recordCheckTimeout()) { + if (v.recordTimeoutAsIOFailure()) { // Tolerance exceeded — mark as failed. result.add(v); } @@ -400,10 +399,22 @@ public void onFailure(@Nonnull Throwable t) { volume, exception); // If the scan was interrupted, do not count it as a volume failure. // This should only happen if the volume checker is being shut down. - if (!(t instanceof InterruptedException)) { - markFailed(); - cleanup(); + if (t instanceof InterruptedException) { + return; } + if (exception instanceof TimeoutException) { + // Per-check timeout from ThrottledAsyncChecker: apply the same + // IO-failure tolerance as a failed read/write test, rather than + // failing the volume immediately on the first timeout. + if (!volume.recordTimeoutAsIOFailure()) { + // Within tolerance this round. Still call cleanup() so numVolumes + // decrements correctly and the latch/callback fires on time. + cleanup(); + return; + } + } + markFailed(); + cleanup(); } private void markHealthy() { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java index bc5683dbd418..ea5a54799afc 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java @@ -344,79 +344,88 @@ public void testCorrectDirectoryChecked(StorageVolume.Builder builder) } /** - * With the default tolerance of 1, the first simulated latch timeout must - * NOT increment the volume's counter beyond tolerance (counter = 1 which - * is not > 1), so {@code recordCheckTimeout()} returns false. + * With the default settings (ioTestCount=3, ioFailureTolerance=1), the + * first simulated check timeout must be tolerated: it records one synthetic + * IO failure in the sliding window (count=1, which is NOT > tolerance=1), + * so {@code recordTimeoutAsIOFailure()} returns false. */ @ParameterizedTest @MethodSource("volumeBuilders") public void testFirstTimeoutIsTolerated(StorageVolume.Builder builder) throws Exception { - DatanodeConfiguration dnConf = CONF.getObject(DatanodeConfiguration.class); - dnConf.setDiskCheckTimeoutTolerated(1); - CONF.setFromObject(dnConf); - builder.conf(CONF); StorageVolume volume = builder.build(); volume.format(CLUSTER_ID); volume.createTmpDirs(CLUSTER_ID); - assertEquals(0, volume.getConsecutiveTimeoutCount()); - // First simulated latch timeout: tolerance not exceeded. - assertFalse(volume.recordCheckTimeout(), - "First timeout should be tolerated"); - assertEquals(1, volume.getConsecutiveTimeoutCount()); + // First simulated check timeout: tolerance not exceeded. + assertFalse(volume.recordTimeoutAsIOFailure(), + "First timeout should be tolerated (IO failure count 1 is not > tolerance 1)"); } /** - * With tolerance = 1, the second consecutive latch timeout must cause - * {@code recordCheckTimeout()} to return true (counter = 2 > 1). + * With the default settings (ioTestCount=3, ioFailureTolerance=1), the + * second consecutive check timeout must cause + * {@code recordTimeoutAsIOFailure()} to return true: count=2 which IS + * > tolerance=1. */ @ParameterizedTest @MethodSource("volumeBuilders") public void testSecondConsecutiveTimeoutFails(StorageVolume.Builder builder) throws Exception { - DatanodeConfiguration dnConf = CONF.getObject(DatanodeConfiguration.class); - dnConf.setDiskCheckTimeoutTolerated(1); - CONF.setFromObject(dnConf); - builder.conf(CONF); StorageVolume volume = builder.build(); volume.format(CLUSTER_ID); volume.createTmpDirs(CLUSTER_ID); - assertFalse(volume.recordCheckTimeout(), "First timeout should be tolerated"); - assertTrue(volume.recordCheckTimeout(), + assertFalse(volume.recordTimeoutAsIOFailure(), + "First timeout should be tolerated"); + assertTrue(volume.recordTimeoutAsIOFailure(), "Second consecutive timeout should exceed tolerance and return true"); - assertEquals(2, volume.getConsecutiveTimeoutCount()); } /** - * A successful healthy check resets the timeout counter so a subsequent - * single timeout is tolerated again (not combined with the earlier one). + * After a simulated timeout, {@code ioTestCount} healthy check() calls + * gradually evict the synthetic failure from the sliding window. Once + * evicted, the IO failure count drops back to 0 and a new single timeout + * is tolerated again — no separate reset API is required. + * + *

With the defaults (ioTestCount=3, ioFailureTolerance=1): + *

    + *
  1. 1 timeout: window=[F], failures=1
  2. + *
  3. 3 healthy checks push T, T, T — the 4th entry evicts F: + * window=[T,T,T], failures=0
  4. + *
  5. New timeout: window=[T,T,F] (evicts oldest T), failures=1 + * → 1 is not > 1 → tolerated again
  6. + *
*/ @ParameterizedTest @MethodSource("volumeBuilders") - public void testSuccessfulCheckResetsTimeoutCounter(StorageVolume.Builder builder) - throws Exception { - DatanodeConfiguration dnConf = CONF.getObject(DatanodeConfiguration.class); - dnConf.setDiskCheckTimeoutTolerated(1); - CONF.setFromObject(dnConf); - builder.conf(CONF); + public void testHealthyChecksEvictTimeoutFromSlidingWindow( + StorageVolume.Builder builder) throws Exception { StorageVolume volume = builder.build(); volume.format(CLUSTER_ID); volume.createTmpDirs(CLUSTER_ID); // Simulate one tolerated timeout. - assertFalse(volume.recordCheckTimeout(), "First timeout should be tolerated"); - assertEquals(1, volume.getConsecutiveTimeoutCount()); + assertFalse(volume.recordTimeoutAsIOFailure(), + "First timeout should be tolerated"); - // Simulate a successful check round — counter must reset. - volume.resetTimeoutCount(); - assertEquals(0, volume.getConsecutiveTimeoutCount()); + // Three healthy checks push TRUE entries into the sliding window, + // eventually evicting the synthetic FALSE. + DiskCheckUtil.DiskChecks alwaysPass = new DiskCheckUtil.DiskChecks() { + @Override + public boolean checkReadWrite(File storageDir, File testFileDir, + int numBytesToWrite) { + return true; + } + }; + DiskCheckUtil.setTestImpl(alwaysPass); + assertEquals(VolumeCheckResult.HEALTHY, volume.check(false)); + assertEquals(VolumeCheckResult.HEALTHY, volume.check(false)); + assertEquals(VolumeCheckResult.HEALTHY, volume.check(false)); - // A new single timeout after reset is tolerated again. - assertFalse(volume.recordCheckTimeout(), - "Timeout after successful reset should be tolerated again"); - assertEquals(1, volume.getConsecutiveTimeoutCount()); + // After recovery a new single timeout is tolerated again. + assertFalse(volume.recordTimeoutAsIOFailure(), + "Timeout after recovery should be tolerated again"); } /** diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java index c82a825f28da..98ccd8fac8be 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java @@ -59,9 +59,6 @@ public void initializeMemberVariables() { xmlPrefixToSkipCompare.add("ipc.client.rpc-timeout.ms"); xmlPropsToSkipCompare.add("ozone.om.leader.election.minimum.timeout" + ".duration"); // Deprecated config - // DatanodeConfiguration is not in configurationClasses; skip its XML entries - xmlPropsToSkipCompare.add( - DatanodeConfiguration.DISK_CHECK_TIMEOUT_TOLERATED_KEY); // Currently replication and type configs moved to server side. configurationPropsToSkipCompare .add(OzoneConfigKeys.OZONE_REPLICATION); From 16bdb26b5e08fedcf7d1dab17ba98e5e492a60a0 Mon Sep 17 00:00:00 2001 From: Devesh Kumar Singh Date: Fri, 27 Mar 2026 13:45:01 +0530 Subject: [PATCH 4/5] HDDS-14871. Fixed review comments. --- .../statemachine/DatanodeConfiguration.java | 1 + .../common/volume/StorageVolume.java | 76 ++++++-- .../common/volume/StorageVolumeChecker.java | 98 +++++++--- .../volume/TestStorageVolumeChecker.java | 168 ++++++++++++++++++ .../volume/TestStorageVolumeHealthChecks.java | 70 ++++---- 5 files changed, 330 insertions(+), 83 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index 482dcb6bff5d..41f6d36971ff 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -61,6 +61,7 @@ public class DatanodeConfiguration extends ReconfigurableConfig { public static final String FAILED_DB_VOLUMES_TOLERATED_KEY = "hdds.datanode.failed.db.volumes.tolerated"; public static final String DISK_CHECK_MIN_GAP_KEY = "hdds.datanode.disk.check.min.gap"; public static final String DISK_CHECK_TIMEOUT_KEY = "hdds.datanode.disk.check.timeout"; + // Minimum space should be left on volume. // Ex: If volume has 1000GB and minFreeSpace is configured as 10GB, // In this case when availableSpace is 10GB or below, volume is assumed as full diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java index 728f5b64ccfd..2f29c6985322 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java @@ -113,6 +113,15 @@ public abstract class StorageVolume implements Checkable ioTestSlidingWindow; private int healthCheckFileSize; + /* + Counter for consecutive latch or per-check timeouts. Incremented by + recordTimeoutAsIOFailure() which must NOT be synchronized (check() may be + holding the lock — that is exactly why the timeout fired). AtomicInteger + provides the necessary thread safety without locking. Reset to 0 by + resetTimeoutCount() whenever a check completes successfully. + */ + private final AtomicInteger consecutiveTimeoutCount; + /** * Type for StorageVolume. */ @@ -164,6 +173,7 @@ protected StorageVolume(Builder b) throws IOException { this.ioTestSlidingWindow = new LinkedList<>(); this.currentIOFailureCount = new AtomicInteger(0); this.healthCheckFileSize = dnConf.getVolumeHealthCheckFileSize(); + this.consecutiveTimeoutCount = new AtomicInteger(0); } else { storageDir = new File(b.volumeRootStr); volumeUsage = null; @@ -174,6 +184,7 @@ protected StorageVolume(Builder b) throws IOException { this.ioFailureTolerance = 0; this.conf = null; this.dnConf = null; + this.consecutiveTimeoutCount = new AtomicInteger(0); } this.storageDirStr = storageDir.getAbsolutePath(); } @@ -734,35 +745,62 @@ public synchronized VolumeCheckResult check(@Nullable Boolean unused) * this volume's async check completed, or because the per-check timeout * inside {@link ThrottledAsyncChecker} fired. * - *

Records a synthetic IO-test failure in the existing sliding window, - * making latch timeouts subject to the same {@code ioFailureTolerance} - * threshold as genuine read/write failures. No separate configuration key - * is required: the existing - * {@code hdds.datanode.disk.check.io.failures.tolerated} governs both. + *

Must not be {@code synchronized}. When a timeout fires, + * {@link #check} may still be executing and holding the object lock — that + * is precisely why the timeout occurred. Acquiring the same lock here would + * deadlock or stall {@link StorageVolumeChecker#checkAllVolumes} until the + * hung check finally returns. + * + *

Instead, a dedicated {@link AtomicInteger} ({@code + * consecutiveTimeoutCount}) tracks consecutive timeouts without any locking. + * The threshold reuses the existing {@code ioFailureTolerance} so no + * additional configuration key is required. * - *

Recovery is automatic: each successful {@link #check} call records a - * {@code true} entry in the window, gradually evicting the synthetic - * failure once {@code ioTestCount} healthy results have accumulated. + *

Recovery: call {@link #resetTimeoutCount()} when a check completes + * successfully to break the timeout streak. * - * @return {@code true} if {@code currentIOFailureCount > ioFailureTolerance}, + * @return {@code true} if {@code consecutiveTimeoutCount > ioFailureTolerance}, * meaning the volume should now be marked FAILED; {@code false} if - * the failure is still within tolerance this round. + * the timeout is still within tolerance this round. */ - public synchronized boolean recordTimeoutAsIOFailure() { - if (advanceIOWindow(false)) { - LOG.error("Volume {} check timed out: IO-failure count ({}) exceeds" - + " tolerance ({}). Marking FAILED.", - this, currentIOFailureCount, ioFailureTolerance); + public boolean recordTimeoutAsIOFailure() { + int count = consecutiveTimeoutCount.incrementAndGet(); + if (count > ioFailureTolerance) { + LOG.error("Volume {} check timed out {} consecutive time(s)," + + " exceeding tolerance of {}. Marking FAILED.", + this, count, ioFailureTolerance); return true; } - LOG.warn("Volume {} check timed out. IO-failure count: {} / tolerance: {}." - + " Volume will not be failed until tolerance is exceeded." + LOG.warn("Volume {} check timed out ({}/{} consecutive timeouts tolerated)." + " Common transient causes: kernel I/O scheduler saturation" - + " or JVM GC pressure.", - this, currentIOFailureCount, ioFailureTolerance); + + " or JVM GC pressure. Volume will be failed if the next check" + + " also times out.", + this, count, ioFailureTolerance); return false; } + /** + * Resets the consecutive-timeout counter to 0. + * + *

Called by {@link StorageVolumeChecker} when this volume's check + * completes successfully, indicating that the transient stall has resolved + * and any accumulated timeout count should not carry over to the next cycle. + * + *

No synchronization needed — operates on an {@link AtomicInteger}. + */ + public void resetTimeoutCount() { + int prev = consecutiveTimeoutCount.getAndSet(0); + if (prev > 0 && LOG.isDebugEnabled()) { + LOG.debug("Volume {} completed a healthy check. Consecutive timeout" + + " count reset from {} to 0.", this, prev); + } + } + + @VisibleForTesting + public int getConsecutiveTimeoutCount() { + return consecutiveTimeoutCount.get(); + } + /** * Advances the IO-test sliding window by one entry and updates the rolling * failure counter. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java index b3384514d253..83ca783fc40c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java @@ -225,6 +225,13 @@ public Set checkAllVolumes( final AtomicLong numVolumes = new AtomicLong(volumes.size()); final CountDownLatch latch = new CountDownLatch(1); + // Shared set used to guarantee exactly-one call to + // recordTimeoutAsIOFailure() per volume, regardless of whether the + // per-check timeout (ResultHandler.onFailure) or the global latch timeout + // (pending-volumes loop below) fires first. The first path to CAS-add the + // volume owns the tolerance decision; the other path skips it. + final Set timeoutHandledSet = ConcurrentHashMap.newKeySet(); + for (StorageVolume v : volumes) { Optional> olf = delegateChecker.schedule(v, null); @@ -233,7 +240,8 @@ public Set checkAllVolumes( allVolumes.add(v); Futures.addCallback(olf.get(), new ResultHandler(v, healthyVolumes, failedVolumes, - numVolumes, (ignored1, ignored2) -> latch.countDown()), + numVolumes, (ignored1, ignored2) -> latch.countDown(), + timeoutHandledSet), MoreExecutors.directExecutor()); } else { if (v instanceof HddsVolume) { @@ -262,17 +270,28 @@ public Set checkAllVolumes( // already applied its own tolerance. final Set result = new HashSet<>(failedVolumes); - // Volumes still pending (neither healthy nor explicitly failed): - // the latch expired before they reported a result. Record a synthetic - // IO failure in each volume's existing sliding window so latch timeouts - // share the same ioFailureTolerance threshold as genuine IO failures. - // Healthy volumes need no special action: their successful check() call - // already recorded TRUE in the sliding window. + // Volumes that completed healthy: reset their consecutive-timeout + // counter so a single transient timeout is not combined with an + // unrelated future one after a healthy gap. + for (StorageVolume v : healthyVolumes) { + v.resetTimeoutCount(); + } + + // Volumes still pending (neither healthy nor explicitly failed) at + // latch-timeout time. onFailure() may have already handled some of + // these via timeoutHandledSet; skip those to avoid double-counting. final Set pendingVolumes = new HashSet<>(Sets.difference(allVolumes, Sets.union(healthyVolumes, failedVolumes))); for (StorageVolume v : pendingVolumes) { + if (!timeoutHandledSet.add(v)) { + // onFailure() already handled this volume's timeout (per-check + // timeout fired before the latch). The tolerance decision was + // already made there; nothing left to do. + continue; + } + // Latch fired first — this is the first (and only) handler. if (v.recordTimeoutAsIOFailure()) { // Tolerance exceeded — mark as failed. result.add(v); @@ -321,7 +340,7 @@ public boolean checkVolume(final StorageVolume volume, Callback callback) { Futures.addCallback(olf.get(), new ResultHandler(volume, ConcurrentHashMap.newKeySet(), ConcurrentHashMap.newKeySet(), - new AtomicLong(1), callback), + new AtomicLong(1), callback, null), checkVolumeResultHandlerExecutorService ); return true; @@ -343,23 +362,39 @@ private static class ResultHandler private final Callback callback; /** - * @param healthyVolumes set of healthy volumes. If the disk check is - * successful, add the volume here. - * @param failedVolumes set of failed volumes. If the disk check fails, - * add the volume here. - * @param volumeCounter volumeCounter used to trigger callback invocation. - * @param callback invoked when the volumeCounter reaches 0. + * Shared set used to guarantee exactly-one call to + * {@link StorageVolume#recordTimeoutAsIOFailure()} per volume when both + * the per-check timeout ({@link #onFailure}) and the global latch timeout + * (pending-volumes loop in {@code checkAllVolumes}) can race for the same + * volume. + *

+ * {@code null} for the {@code checkVolume()} path, where no latch exists + * and {@link #onFailure} is the sole timeout handler. + */ + @Nullable + private final Set timeoutHandledSet; + + /** + * @param healthyVolumes set of healthy volumes. + * @param failedVolumes set of failed volumes. + * @param volumeCounter triggers callback when it reaches 0. + * @param callback invoked when volumeCounter reaches 0. + * @param timeoutHandledSet shared CAS set for exactly-once timeout + * handling; {@code null} for + * {@code checkVolume()}. */ ResultHandler(StorageVolume volume, Set healthyVolumes, Set failedVolumes, AtomicLong volumeCounter, - @Nullable Callback callback) { + @Nullable Callback callback, + @Nullable Set timeoutHandledSet) { this.volume = volume; this.healthyVolumes = healthyVolumes; this.failedVolumes = failedVolumes; this.volumeCounter = volumeCounter; this.callback = callback; + this.timeoutHandledSet = timeoutHandledSet; } @Override @@ -402,16 +437,28 @@ public void onFailure(@Nonnull Throwable t) { if (t instanceof InterruptedException) { return; } - if (exception instanceof TimeoutException) { - // Per-check timeout from ThrottledAsyncChecker: apply the same - // IO-failure tolerance as a failed read/write test, rather than - // failing the volume immediately on the first timeout. - if (!volume.recordTimeoutAsIOFailure()) { - // Within tolerance this round. Still call cleanup() so numVolumes - // decrements correctly and the latch/callback fires on time. - cleanup(); - return; + // Detect a per-check timeout from ThrottledAsyncChecker. + // Guava 28+ (including 33.5.0-jre used here) fails the TimeoutFuture + // with TimeoutException on timeout. + boolean isTimeout = exception instanceof TimeoutException; + if (isTimeout) { + // timeoutHandledSet is null for checkVolume() (sole timeout handler). + // For checkAllVolumes(), the set is shared with the pending-volumes + // loop; CAS-add determines which path owns the tolerance decision. + boolean firstToHandle = + (timeoutHandledSet == null) || timeoutHandledSet.add(volume); + if (firstToHandle) { + if (!volume.recordTimeoutAsIOFailure()) { + // Within tolerance: do NOT trigger the failure callback. + // The volume is not marked failed; the next check cycle will + // re-evaluate its health. cleanup() is intentionally not called + // to avoid firing handleVolumeFailures() with an empty failed set. + return; + } + // Tolerance exceeded — fall through to markFailed()/cleanup(). } + // else: the pending-volumes loop already handled this timeout. + // Fall through to markFailed()/cleanup() for counter bookkeeping only. } markFailed(); cleanup(); @@ -419,6 +466,9 @@ public void onFailure(@Nonnull Throwable t) { private void markHealthy() { healthyVolumes.add(volume); + // A successful completion resets any accumulated timeout count so that + // an earlier transient timeout does not carry over to future cycles. + volume.resetTimeoutCount(); } private void markFailed() { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java index 71cb7af04b71..6a9b9b45ef66 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult.FAILED; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.isNull; @@ -28,8 +29,11 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import java.io.File; import java.nio.file.Path; import java.time.Duration; @@ -39,8 +43,13 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -301,6 +310,165 @@ public void testNumScansSkipped() throws Exception { checker.shutdownAndWait(0, TimeUnit.SECONDS); } + /** + * Explicitly captures the {@link Throwable} type that + * {@link Futures#withTimeout} delivers to {@code onFailure()} when the + * deadline fires. + * + *

{@link ThrottledAsyncChecker} uses {@code Futures.withTimeout()} + * internally; this test replicates that exact call to confirm that Guava + * 28+ (including the 33.5.0-jre version used by Ozone) delivers a + * {@link TimeoutException} — NOT a {@link java.util.concurrent.CancellationException}. + * {@code CancellationException} in new Guava means external cancellation + * (e.g. executor shutdown), not a disk-check timeout, so + * {@link StorageVolumeChecker} {@code ResultHandler.onFailure()} only + * treats {@link TimeoutException} as a timeout. + */ + @Test + public void testFuturesWithTimeoutExceptionType() throws Exception { + ScheduledExecutorService scheduler = + Executors.newSingleThreadScheduledExecutor(); + AtomicReference captured = new AtomicReference<>(); + CountDownLatch done = new CountDownLatch(1); + + try { + // A future that never completes on its own — same situation as a + // check() thread blocked inside fsync(). + SettableFuture neverCompletes = SettableFuture.create(); + + // Wrap with a real Futures.withTimeout(), identical to what + // ThrottledAsyncChecker does. + ListenableFuture timedFuture = + Futures.withTimeout(neverCompletes, 100, TimeUnit.MILLISECONDS, + scheduler); + + Futures.addCallback(timedFuture, new FutureCallback() { + @Override + public void onSuccess(VolumeCheckResult result) { + done.countDown(); + } + + @Override + public void onFailure(Throwable t) { + captured.set(t); + done.countDown(); + } + }, MoreExecutors.directExecutor()); + + assertTrue(done.await(2, TimeUnit.SECONDS), + "Timeout should have fired within 2 seconds"); + + Throwable thrown = captured.get(); + LOG.info("Futures.withTimeout() delivered to onFailure(): {}", + thrown.getClass().getName()); + + // Guava 28+ delivers TimeoutException for a timeout. + // CancellationException would mean external cancellation, not a timeout. + assertTrue(thrown instanceof TimeoutException, + "Expected TimeoutException from Futures.withTimeout() but got: " + + thrown.getClass().getName()); + } finally { + scheduler.shutdownNow(); + } + } + + /** + * Verifies that when the per-check timeout inside {@link ThrottledAsyncChecker} + * fires on {@link StorageVolumeChecker#checkVolume}, the first timeout is + * tolerated (callback not invoked, volume not failed) and the second + * consecutive timeout causes the volume to be failed. + * + *

This test uses the real {@link ThrottledAsyncChecker} (not + * {@link DummyChecker}) so that the actual {@code TimeoutException} + * path in {@code ResultHandler.onFailure()} fires. + */ + @Test + public void testCheckVolumeTimeoutTolerance() throws Exception { + setup(); + // Use a very short check timeout so the test completes quickly. + OzoneConfiguration timeoutConf = new OzoneConfiguration(); + DatanodeConfiguration dnConf = timeoutConf.getObject(DatanodeConfiguration.class); + dnConf.setDiskCheckTimeout(Duration.ofMillis(200)); + dnConf.setDiskCheckMinGap(Duration.ZERO); + timeoutConf.setFromObject(dnConf); + + // A latch-controlled mock volume: check() blocks until released. + HddsVolume volume = mock(HddsVolume.class); + CountDownLatch blockLatch = new CountDownLatch(1); + when(volume.check(any())).thenAnswer(inv -> { + blockLatch.await(10, TimeUnit.SECONDS); + return VolumeCheckResult.HEALTHY; + }); + // First timeout returns false (within tolerance), second returns true. + when(volume.recordTimeoutAsIOFailure()).thenReturn(false).thenReturn(true); + + AtomicLong callbackCount = new AtomicLong(0); + StorageVolumeChecker checker = + new StorageVolumeChecker(timeoutConf, new FakeTimer(), "test-"); + + // First checkVolume — should time out and be tolerated (callback NOT fired). + checker.checkVolume(volume, (healthy, failed) -> callbackCount.incrementAndGet()); + + // Wait long enough for the timeout to fire. + Thread.sleep(600); + assertEquals(0, callbackCount.get(), + "Callback must NOT fire for a tolerated timeout"); + verify(volume, times(1)).recordTimeoutAsIOFailure(); + + // Second checkVolume — should time out and exceed tolerance (callback fired). + checker.checkVolume(volume, (healthy, failed) -> callbackCount.incrementAndGet()); + Thread.sleep(600); + + assertEquals(1, callbackCount.get(), + "Callback must fire when tolerance is exceeded"); + + blockLatch.countDown(); + checker.shutdownAndWait(5, TimeUnit.SECONDS); + } + + /** + * Verifies that when the {@code checkAllVolumes()} latch times out and + * pending volumes have not yet reported a result, their consecutive-timeout + * counter is incremented and the first timeout is tolerated. + * + *

This test confirms that {@code recordTimeoutAsIOFailure()} is called on + * pending volumes, and that the volume is NOT in the returned failed set on + * the first timeout. + */ + @Test + public void testCheckAllVolumesLatchTimeoutTolerance() throws Exception { + setup(); + OzoneConfiguration timeoutConf = new OzoneConfiguration(); + DatanodeConfiguration dnConf = timeoutConf.getObject(DatanodeConfiguration.class); + dnConf.setDiskCheckTimeout(Duration.ofMillis(200)); + dnConf.setDiskCheckMinGap(Duration.ZERO); + timeoutConf.setFromObject(dnConf); + + HddsVolume volume = mock(HddsVolume.class); + CountDownLatch blockLatch = new CountDownLatch(1); + when(volume.check(any())).thenAnswer(inv -> { + blockLatch.await(10, TimeUnit.SECONDS); + return VolumeCheckResult.HEALTHY; + }); + // Simulate: first timeout is within tolerance. + when(volume.recordTimeoutAsIOFailure()).thenReturn(false); + when(volume.getVolumeInfoStats()).thenReturn( + new VolumeInfoMetrics("test-vol", volume)); + + StorageVolumeChecker checker = + new StorageVolumeChecker(timeoutConf, new FakeTimer(), "test-"); + + Set failed = + checker.checkAllVolumes(Arrays.asList(volume)); + + assertFalse(failed.contains(volume), + "Volume must NOT be in failed set on first tolerated timeout"); + verify(volume, times(1)).recordTimeoutAsIOFailure(); + + blockLatch.countDown(); + checker.shutdownAndWait(5, TimeUnit.SECONDS); + } + /** * A checker to wraps the result of {@link HddsVolume#check} in * an ImmediateFuture. diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java index ea5a54799afc..fa9d8907b58f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java @@ -344,10 +344,10 @@ public void testCorrectDirectoryChecked(StorageVolume.Builder builder) } /** - * With the default settings (ioTestCount=3, ioFailureTolerance=1), the - * first simulated check timeout must be tolerated: it records one synthetic - * IO failure in the sliding window (count=1, which is NOT > tolerance=1), - * so {@code recordTimeoutAsIOFailure()} returns false. + * With the default settings (ioFailureTolerance=1), the first simulated + * check timeout must be tolerated: {@code consecutiveTimeoutCount} becomes 1 + * which is NOT {@code > 1}, so {@code recordTimeoutAsIOFailure()} returns + * {@code false}. */ @ParameterizedTest @MethodSource("volumeBuilders") @@ -357,16 +357,16 @@ public void testFirstTimeoutIsTolerated(StorageVolume.Builder builder) volume.format(CLUSTER_ID); volume.createTmpDirs(CLUSTER_ID); - // First simulated check timeout: tolerance not exceeded. + assertEquals(0, volume.getConsecutiveTimeoutCount()); assertFalse(volume.recordTimeoutAsIOFailure(), - "First timeout should be tolerated (IO failure count 1 is not > tolerance 1)"); + "First timeout should be tolerated (count 1 is not > tolerance 1)"); + assertEquals(1, volume.getConsecutiveTimeoutCount()); } /** - * With the default settings (ioTestCount=3, ioFailureTolerance=1), the - * second consecutive check timeout must cause - * {@code recordTimeoutAsIOFailure()} to return true: count=2 which IS - * > tolerance=1. + * With the default settings (ioFailureTolerance=1), the second consecutive + * timeout must cause {@code recordTimeoutAsIOFailure()} to return + * {@code true}: count becomes 2 which IS {@code > 1}. */ @ParameterizedTest @MethodSource("volumeBuilders") @@ -376,30 +376,26 @@ public void testSecondConsecutiveTimeoutFails(StorageVolume.Builder builder) volume.format(CLUSTER_ID); volume.createTmpDirs(CLUSTER_ID); - assertFalse(volume.recordTimeoutAsIOFailure(), - "First timeout should be tolerated"); + assertFalse(volume.recordTimeoutAsIOFailure(), "First timeout should be tolerated"); + assertEquals(1, volume.getConsecutiveTimeoutCount()); + assertTrue(volume.recordTimeoutAsIOFailure(), "Second consecutive timeout should exceed tolerance and return true"); + assertEquals(2, volume.getConsecutiveTimeoutCount()); } /** - * After a simulated timeout, {@code ioTestCount} healthy check() calls - * gradually evict the synthetic failure from the sliding window. Once - * evicted, the IO failure count drops back to 0 and a new single timeout - * is tolerated again — no separate reset API is required. + * {@code resetTimeoutCount()} resets the consecutive-timeout counter to 0, + * so a subsequent single timeout is tolerated again — the streak does not + * carry over past a successful check cycle. * - *

With the defaults (ioTestCount=3, ioFailureTolerance=1): - *

    - *
  1. 1 timeout: window=[F], failures=1
  2. - *
  3. 3 healthy checks push T, T, T — the 4th entry evicts F: - * window=[T,T,T], failures=0
  4. - *
  5. New timeout: window=[T,T,F] (evicts oldest T), failures=1 - * → 1 is not > 1 → tolerated again
  6. - *
+ *

{@code resetTimeoutCount()} is called by {@link StorageVolumeChecker} + * whenever a volume completes a healthy check (either via + * {@code checkAllVolumes()} or via {@code checkVolume()}). */ @ParameterizedTest @MethodSource("volumeBuilders") - public void testHealthyChecksEvictTimeoutFromSlidingWindow( + public void testResetTimeoutCountResetsConsecutiveCounter( StorageVolume.Builder builder) throws Exception { StorageVolume volume = builder.build(); volume.format(CLUSTER_ID); @@ -408,24 +404,18 @@ public void testHealthyChecksEvictTimeoutFromSlidingWindow( // Simulate one tolerated timeout. assertFalse(volume.recordTimeoutAsIOFailure(), "First timeout should be tolerated"); + assertEquals(1, volume.getConsecutiveTimeoutCount()); - // Three healthy checks push TRUE entries into the sliding window, - // eventually evicting the synthetic FALSE. - DiskCheckUtil.DiskChecks alwaysPass = new DiskCheckUtil.DiskChecks() { - @Override - public boolean checkReadWrite(File storageDir, File testFileDir, - int numBytesToWrite) { - return true; - } - }; - DiskCheckUtil.setTestImpl(alwaysPass); - assertEquals(VolumeCheckResult.HEALTHY, volume.check(false)); - assertEquals(VolumeCheckResult.HEALTHY, volume.check(false)); - assertEquals(VolumeCheckResult.HEALTHY, volume.check(false)); + // StorageVolumeChecker calls resetTimeoutCount() when the volume's check + // eventually completes successfully. Simulate that here. + volume.resetTimeoutCount(); + assertEquals(0, volume.getConsecutiveTimeoutCount(), + "Counter must be reset to 0 after a successful check"); - // After recovery a new single timeout is tolerated again. + // A new single timeout after reset is tolerated again. assertFalse(volume.recordTimeoutAsIOFailure(), - "Timeout after recovery should be tolerated again"); + "Timeout after reset should be tolerated again"); + assertEquals(1, volume.getConsecutiveTimeoutCount()); } /** From 452cd11cb5c4cddef392a8e235000f0b988ec966 Mon Sep 17 00:00:00 2001 From: Devesh Kumar Singh Date: Fri, 27 Mar 2026 16:11:45 +0530 Subject: [PATCH 5/5] HDDS-14871. Fixed findbugs. --- .../container/common/volume/TestStorageVolumeChecker.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java index 6a9b9b45ef66..2554147b952b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java @@ -396,7 +396,7 @@ public void testCheckVolumeTimeoutTolerance() throws Exception { HddsVolume volume = mock(HddsVolume.class); CountDownLatch blockLatch = new CountDownLatch(1); when(volume.check(any())).thenAnswer(inv -> { - blockLatch.await(10, TimeUnit.SECONDS); + blockLatch.await(); return VolumeCheckResult.HEALTHY; }); // First timeout returns false (within tolerance), second returns true. @@ -447,7 +447,7 @@ public void testCheckAllVolumesLatchTimeoutTolerance() throws Exception { HddsVolume volume = mock(HddsVolume.class); CountDownLatch blockLatch = new CountDownLatch(1); when(volume.check(any())).thenAnswer(inv -> { - blockLatch.await(10, TimeUnit.SECONDS); + blockLatch.await(); return VolumeCheckResult.HEALTHY; }); // Simulate: first timeout is within tolerance.