diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java index 0876bd22ea4f..1e7aece7ed09 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java @@ -138,10 +138,31 @@ public class ScmConfig extends ReconfigurableConfig { ) private int transactionToDNsCommitMapLimit = 5000000; + @Config(key = "hdds.scm.container.pending.allocation.roll.interval", + defaultValue = "5m", + type = ConfigType.TIME, + tags = { ConfigTag.SCM, ConfigTag.CONTAINER }, + description = + "Time interval for rolling the pending container allocation window. " + + "Pending container allocations are tracked in a two-window tumbling bucket " + + "pattern. Each window has this duration. " + + "After 2x this interval, allocations that haven't been confirmed via " + + "container reports will automatically age out. Default is 5 minutes." + ) + private Duration pendingContainerAllocationRollInterval = Duration.ofMinutes(5); + public int getTransactionToDNsCommitMapLimit() { return transactionToDNsCommitMapLimit; } + public Duration getPendingContainerAllocationRollInterval() { + return pendingContainerAllocationRollInterval; + } + + public void setPendingContainerAllocationRollInterval(Duration duration) { + this.pendingContainerAllocationRollInterval = duration; + } + public Duration getBlockDeletionInterval() { return blockDeletionInterval; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index 432c9890e98a..f9df3cd8262c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.concurrent.locks.Lock; @@ -44,6 +45,8 @@ import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.utils.db.Table; @@ -80,6 +83,7 @@ public class ContainerManagerImpl implements ContainerManager { private final Random random = new Random(); private final long maxContainerSize; + private final PendingContainerTracker pendingContainerTracker; /** * @@ -90,7 +94,8 @@ public ContainerManagerImpl( final SequenceIdGenerator sequenceIdGen, final PipelineManager pipelineManager, final Table containerStore, - final ContainerReplicaPendingOps containerReplicaPendingOps) + final ContainerReplicaPendingOps containerReplicaPendingOps, + final NodeManager nodeManager) throws IOException { // Introduce builder for this class? this.lock = new ReentrantLock(); @@ -110,6 +115,8 @@ public ContainerManagerImpl( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create(); + this.pendingContainerTracker = Objects.requireNonNull( + nodeManager.getPendingContainerTracker(), "pendingContainerTracker"); } @Override @@ -278,6 +285,11 @@ private ContainerInfo allocateContainer(final Pipeline pipeline, containerStateManager.addContainer(containerInfoBuilder.build()); scmContainerManagerMetrics.incNumSuccessfulCreateContainers(); + // Record pending allocation - tracks containers scheduled but not yet written + pendingContainerTracker.recordPendingAllocation(pipeline, containerID); + LOG.debug("Allocated container {} on pipeline {}. Recorded as pending on {} DataNodes", + containerID, pipeline.getId(), pipeline.getNodes().size()); + return containerStateManager.getContainer(containerID); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 0cebcb10ef2c..e34305491e41 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer; @@ -175,6 +176,17 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode, if (!alreadyInDn) { // This is a new Container not in the nodeManager -> dn map yet getNodeManager().addContainer(datanodeDetails, cid); + + // Remove from pending tracker when container is added to DN + // This container was just confirmed for the first time on this DN + // No need to remove on subsequent reports (it's already been removed) + if (container != null) { + PendingContainerTracker tracker = + getNodeManager().getPendingContainerTracker(); + if (tracker != null) { + tracker.removePendingAllocation(datanodeDetails, cid); + } + } } if (container == null || ContainerReportValidator .validate(container, datanodeDetails, replica)) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java index 247e3667d9ef..123189887820 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -103,6 +104,12 @@ protected void processICR(IncrementalContainerReportFromDatanode report, } if (ContainerReportValidator.validate(container, dd, replicaProto)) { processContainerReplica(dd, container, replicaProto, publisher, detailsForLogging); + + PendingContainerTracker tracker = + getNodeManager().getPendingContainerTracker(); + if (tracker != null) { + tracker.removePendingAllocation(dd, id); + } } success = true; } catch (ContainerNotFoundException e) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index da57666cb304..dfae649ebd29 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -97,6 +97,10 @@ public void onMessage(final DatanodeDetails datanodeDetails, * action. */ LOG.info("A dead datanode is detected. {}", datanodeDetails); + PendingContainerTracker pending = nodeManager.getPendingContainerTracker(); + if (pending != null) { + pending.clearPendingForDatanode(datanodeDetails); + } closeContainers(datanodeDetails, publisher); destroyPipelines(datanodeDetails); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index e9a019945c1f..29d97100dd0d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -422,4 +422,15 @@ default void removeNode(DatanodeDetails datanodeDetails) throws NodeNotFoundExce } int openContainerLimit(List datanodes); + + /** + * SCM-side tracker for container allocations not yet reported by datanodes. + */ + PendingContainerTracker getPendingContainerTracker(); + + /** + * True if the node can accept another container of the given size, accounting for + * {@link #getPendingContainerTracker()}. + */ + boolean hasSpaceForNewContainerAllocation(DatanodeDetails node, long containerSize); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java new file mode 100644 index 000000000000..253758110dc2 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.ozone.container.common.volume.VolumeUsage; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tracks per-datanode pending container allocations at SCM using a Two Window Tumbling Bucket + * pattern (similar to HDFS HADOOP-3707). + * + * Two Window Tumbling Bucket for automatic aging and cleanup. + * + * How It Works: + *
  • Each DataNode has two sets: currentWindow and previousWindow
  • + *
  • New allocations go into currentWindow
  • + *
  • Every ROLL_INTERVAL (default 5 minutes): + *
      + *
    • previousWindow = currentWindow (shift)
    • + *
    • currentWindow = new empty set (reset)
    • + *
    • Old previousWindow is discarded (automatic aging)
    • + *
    + *
  • + *
  • When checking pending: return union of currentWindow + previousWindow
  • + * + * + * Example Timeline: + *
    + * Time  | Action                    | CurrentWindow | PreviousWindow | Total Pending
    + * ------+---------------------------+---------------+----------------+--------------
    + * 00:00 | Allocate Container-1      | {C1}          | {}             | {C1}
    + * 00:02 | Allocate Container-2      | {C1, C2}      | {}             | {C1, C2}
    + * 00:05 | [ROLL] Window tumbles     | {}            | {C1, C2}       | {C1, C2}
    + * 00:07 | Allocate Container-3      | {C3}          | {C1, C2}       | {C1, C2, C3}
    + * 00:08 | Report confirms C1        | {C3}          | {C2}           | {C2, C3}
    + * 00:10 | [ROLL] Window tumbles     | {}            | {C3}           | {C3}
    + *       | (C2 aged out if not reported)
    + * 
    + * + *

    Thread-safety: all mutations and reads for a given datanode run under + * {@code synchronized (bucket)} on that datanode's {@link TwoWindowBucket}, including + * map insert/remove, so concurrent record/remove/report paths cannot interleave or orphan + * a bucket after it was dropped from the map.

    + */ +public class PendingContainerTracker { + + private static final Logger LOG = LoggerFactory.getLogger(PendingContainerTracker.class); + + /** + * Roll interval in milliseconds. + * Configurable via hdds.scm.container.pending.allocation.roll.interval. + * Default: 5 minutes. + * Containers automatically age out after 2 × rollIntervalMs. + */ + private final long rollIntervalMs; + + /** + * Map of DataNode UUID to TwoWindowBucket. + */ + private final ConcurrentHashMap datanodeBuckets; + + /** + * Maximum container size in bytes. + */ + private final long maxContainerSize; + + /** + * Metrics for tracking pending containers (same instance as {@link SCMNodeManager}'s node metrics). + */ + private final SCMNodeMetrics metrics; + + /** + * Two-window bucket for a single DataNode. + * Contains current and previous window sets, plus last roll timestamp. + */ + private static class TwoWindowBucket { + private Set currentWindow = new HashSet<>(); + private Set previousWindow = new HashSet<>(); + private long lastRollTime = Time.monotonicNow(); + private final long rollIntervalMs; + + TwoWindowBucket(long rollIntervalMs) { + this.rollIntervalMs = rollIntervalMs; + } + + /** + * Roll the windows: previous = current, current = empty. + * Called when current time exceeds lastRollTime + rollIntervalMs. + */ + synchronized void rollIfNeeded() { + long now = Time.monotonicNow(); + if (now - lastRollTime >= rollIntervalMs) { + // Shift: current becomes previous + previousWindow = currentWindow; + // Reset: new empty current window + currentWindow = new HashSet<>(); + lastRollTime = now; + LOG.debug("Rolled window. Previous window size: {}, Current window reset to empty", previousWindow.size()); + } + } + + /** + * Get union of both windows (all pending containers). + */ + synchronized Set getAllPending() { + Set all = new HashSet<>(); + all.addAll(currentWindow); + all.addAll(previousWindow); + return all; + } + + /** + * Add container to current window. + */ + synchronized boolean add(ContainerID containerID) { + return currentWindow.add(containerID); + } + + /** + * Remove container from both windows. + */ + synchronized boolean remove(ContainerID containerID) { + boolean removedFromCurrent = currentWindow.remove(containerID); + boolean removedFromPrevious = previousWindow.remove(containerID); + return removedFromCurrent || removedFromPrevious; + } + + /** + * Check if either window is non-empty. + */ + synchronized boolean isEmpty() { + return currentWindow.isEmpty() && previousWindow.isEmpty(); + } + + /** + * Count of pending containers in both windows. + */ + synchronized int getCount() { + return currentWindow.size() + previousWindow.size(); + } + } + + public PendingContainerTracker(long maxContainerSize) { + this(maxContainerSize, 10 * 60 * 1000, null); // Default 10 minutes + } + + public PendingContainerTracker(long maxContainerSize, long rollIntervalMs, + SCMNodeMetrics metrics) { + this.datanodeBuckets = new ConcurrentHashMap<>(); + this.maxContainerSize = maxContainerSize; + this.rollIntervalMs = rollIntervalMs; + this.metrics = metrics; + LOG.info("PendingContainerTracker initialized with maxContainerSize={}B, rollInterval={}ms", + maxContainerSize, rollIntervalMs); + } + + /** + * Whether the datanode can fit another container of {@code containerSize} after accounting for + * SCM pending allocations for {@code node} (this tracker) and usable space on {@code datanodeInfo}. + * Combines {@link #getPendingAllocationSize} with the per-disk slot check in one call. + * + * @param node identity used to look up pending allocations (same DN as {@code datanodeInfo}) + * @param datanodeInfo storage reports for the datanode + * @param containerSize required container size in bytes (typically SCM max container size) + */ + /** + * Advances the two-window tumbling bucket for this datanode when the roll interval has elapsed. + * Call on periodic paths (e.g. node report) so windows age even when there are no new + * allocations or container reports touching this tracker. + */ + public void rollWindowsIfNeeded(DatanodeDetails node) { + if (node == null) { + return; + } + UUID uuid = node.getUuid(); + TwoWindowBucket bucket = datanodeBuckets.get(uuid); + if (bucket == null) { + return; + } + synchronized (bucket) { + bucket.rollIfNeeded(); + if (bucket.isEmpty()) { + datanodeBuckets.remove(uuid, bucket); + } + } + } + + public boolean hasEffectiveAllocatableSpaceForNewContainer( + DatanodeDetails node, DatanodeInfo datanodeInfo, long containerSize) { + if (node == null || datanodeInfo == null || containerSize <= 0) { + return false; + } + long pendingBytes = getPendingAllocationSize(node); + return hasAllocatableSpaceAfterPending(datanodeInfo, containerSize, pendingBytes); + } + + private static boolean hasAllocatableSpaceAfterPending( + DatanodeInfo datanodeInfo, long containerSize, long pendingAllocationBytes) { + List storageReports = datanodeInfo.getStorageReports(); + if (storageReports == null || storageReports.isEmpty()) { + return false; + } + long effectiveAllocatableSpace = 0L; + for (StorageReportProto report : storageReports) { + long usableSpace = VolumeUsage.getUsableSpace(report); + long containersOnThisDisk = usableSpace / containerSize; + effectiveAllocatableSpace += containersOnThisDisk * containerSize; + if (effectiveAllocatableSpace - pendingAllocationBytes >= containerSize) { + return true; + } + } + return false; + } + + /** + * Drops all pending allocation state for a datanode (e.g. stale/dead cleanup). + */ + public void clearPendingForDatanode(DatanodeDetails node) { + if (node == null) { + return; + } + UUID uuid = node.getUuid(); + TwoWindowBucket bucket = datanodeBuckets.get(uuid); + if (bucket == null) { + return; + } + synchronized (bucket) { + datanodeBuckets.remove(uuid, bucket); + } + LOG.debug("Cleared pending container allocations for datanode {}", + node.getUuidString()); + } + + /** + * Record a pending container allocation for all DataNodes in the pipeline. + * Container is added to the current window. + * + * @param pipeline The pipeline where container is allocated + * @param containerID The container being allocated + */ + public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) { + if (pipeline == null || containerID == null) { + LOG.warn("Ignoring null pipeline or containerID"); + return; + } + + for (DatanodeDetails node : pipeline.getNodes()) { + recordPendingAllocationForDatanode(node, containerID); + } + } + + /** + * Record a pending container allocation for a single DataNode. + * Container is added to the current window. + * + * @param node The DataNode where container is being allocated/replicated + * @param containerID The container being allocated/replicated + */ + public void recordPendingAllocationForDatanode(DatanodeDetails node, ContainerID containerID) { + if (node == null || containerID == null) { + LOG.warn("Ignoring null node or containerID"); + return; + } + + UUID uuid = node.getUuid(); + TwoWindowBucket bucket = datanodeBuckets.computeIfAbsent( + uuid, + k -> new TwoWindowBucket(rollIntervalMs) + ); + + synchronized (bucket) { + bucket.rollIfNeeded(); + boolean added = bucket.add(containerID); + if (!bucket.isEmpty()) { + datanodeBuckets.put(uuid, bucket); + } + LOG.debug("Recorded pending container {} on DataNode {}. Added={}, Total pending={}", + containerID, node.getUuidString(), added, bucket.getCount()); + + if (added && metrics != null) { + metrics.incNumPendingContainersAdded(); + } + } + } + + /** + * Remove a pending container allocation from a specific DataNode. + * Removes from both current and previous windows. + * Called when container is confirmed. + * + * @param node The DataNode + * @param containerID The container to remove from pending + */ + public void removePendingAllocation(DatanodeDetails node, ContainerID containerID) { + if (node == null || containerID == null) { + return; + } + + UUID uuid = node.getUuid(); + TwoWindowBucket bucket = datanodeBuckets.get(uuid); + if (bucket == null) { + return; + } + + synchronized (bucket) { + bucket.rollIfNeeded(); + + boolean removed = bucket.remove(containerID); + LOG.debug("Removed pending container {} from DataNode {}. Removed={}, Remaining={}", + containerID, node.getUuidString(), removed, bucket.getCount()); + + if (removed && metrics != null) { + metrics.incNumPendingContainersRemoved(); + } + + if (bucket.isEmpty()) { + LOG.debug("Cleanup pending bucket containerID {}", containerID); + datanodeBuckets.remove(uuid, bucket); + } + } + } + + /** + * Bytes of SCM-side pending container allocations for this datanode (count × configured max + * container size). For whether a new container can be placed, prefer + * {@link #hasEffectiveAllocatableSpaceForNewContainer}. + * + * @param node The DataNode + * @return Total bytes of pending container allocations + */ + public long getPendingAllocationSize(DatanodeDetails node) { + if (node == null) { + return 0; + } + + TwoWindowBucket bucket = datanodeBuckets.get(node.getUuid()); + if (bucket == null) { + return 0; + } + + synchronized (bucket) { + bucket.rollIfNeeded(); + return (long) bucket.getCount() * maxContainerSize; + } + } + + /** + * Get the set of pending container IDs for a DataNode. + * Returns union of current and previous windows. + * Useful for debugging and monitoring. + * + * @param node The DataNode + * @return Set of pending container IDs + */ + public Set getPendingContainers(DatanodeDetails node) { + if (node == null) { + return Collections.emptySet(); + } + + TwoWindowBucket bucket = datanodeBuckets.get(node.getUuid()); + if (bucket == null) { + return Collections.emptySet(); + } + + synchronized (bucket) { + bucket.rollIfNeeded(); + return bucket.getAllPending(); + } + } + + /** + * Get total number of DataNodes with pending allocations. + * + * @return Count of DataNodes + */ + public int getDataNodeCount() { + return datanodeBuckets.size(); + } + + /** + * Get total number of pending containers across all DataNodes. + * Note: Same container on multiple DataNodes is counted once per DataNode. + * The count may include containers from the previous window (up to 10 minutes old). + * + * @return Total pending container count + */ + public long getTotalPendingCount() { + return datanodeBuckets.values().stream() + .mapToLong(TwoWindowBucket::getCount) + .sum(); + } + + @VisibleForTesting + public SCMNodeMetrics getMetrics() { + return metrics; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 3289e7b312a8..e118e6ab3553 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.VersionInfo; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -143,6 +144,13 @@ public class SCMNodeManager implements NodeManager { private final NonWritableNodeFilter nonWritableNodeFilter; private final int numContainerPerVolume; + /** + * SCM-side pending container allocations per datanode (not yet in container reports). + */ + private final PendingContainerTracker pendingContainerTracker; + + private final long maxContainerSizeBytes; + /** * Lock used to synchronize some operation in Node manager to ensure a * consistent view of the node state. @@ -205,6 +213,14 @@ public SCMNodeManager( this.scmContext = scmContext; this.sendCommandNotifyMap = new HashMap<>(); this.nonWritableNodeFilter = new NonWritableNodeFilter(conf); + + this.maxContainerSizeBytes = (long) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + ScmConfig scmConfig = conf.getObject(ScmConfig.class); + long rollIntervalMs = scmConfig.getPendingContainerAllocationRollInterval().toMillis(); + this.pendingContainerTracker = new PendingContainerTracker( + maxContainerSizeBytes, rollIntervalMs, this.metrics); } @Override @@ -225,6 +241,35 @@ private void unregisterMXBean() { } } + @Override + public PendingContainerTracker getPendingContainerTracker() { + return pendingContainerTracker; + } + + /** + * Effective space check aligned with container allocation: per-disk slot model minus + * SCM pending allocations. + */ + @Override + public boolean hasSpaceForNewContainerAllocation(DatanodeDetails node, + long containerSize) { + if (node == null) { + return false; + } + try { + DatanodeInfo datanodeInfo = getDatanodeInfo(node); + if (datanodeInfo == null) { + LOG.warn("DatanodeInfo not found for node {}", node.getUuidString()); + return false; + } + return pendingContainerTracker.hasEffectiveAllocatableSpaceForNewContainer( + node, datanodeInfo, containerSize); + } catch (Exception e) { + LOG.warn("Error checking allocatable space for node {}", node.getUuidString(), e); + return false; + } + } + protected NodeStateManager getNodeStateManager() { return nodeStateManager; } @@ -706,6 +751,7 @@ public void processNodeReport(DatanodeDetails datanodeDetails, datanodeInfo.updateStorageReports(nodeReport.getStorageReportList()); datanodeInfo.updateMetaDataStorageReports(nodeReport. getMetadataStorageReportList()); + pendingContainerTracker.rollWindowsIfNeeded(datanodeDetails); metrics.incNumNodeReportProcessed(); } } catch (NodeNotFoundException e) { @@ -1099,6 +1145,8 @@ private SCMNodeStat getNodeStatInternal(DatanodeDetails datanodeDetails) { freeSpaceToSpare += reportProto.getFreeSpaceToSpare(); reserved += reportProto.getReserved(); } + // SCM-side pending container allocations (not yet in DN reports) count toward committed. + committed += pendingContainerTracker.getPendingAllocationSize(datanodeDetails); return new SCMNodeStat(capacity, used, remaining, committed, freeSpaceToSpare, reserved); } catch (NodeNotFoundException e) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java index 0dfb1206bb41..649ffb4e9e58 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java @@ -49,6 +49,9 @@ public final class SCMNodeMetrics implements MetricsSource { private @Metric MutableCounterLong numNodeReportProcessingFailed; private @Metric MutableCounterLong numNodeCommandQueueReportProcessed; private @Metric MutableCounterLong numNodeCommandQueueReportProcessingFailed; + // Pending container allocations at SCM (per-DN tracker), not yet on datanodes. + private @Metric MutableCounterLong numPendingContainersAdded; + private @Metric MutableCounterLong numPendingContainersRemoved; private @Metric String textMetric; private final MetricsRegistry registry; @@ -124,6 +127,22 @@ void incNumNodeCommandQueueReportProcessingFailed() { numNodeCommandQueueReportProcessingFailed.incr(); } + void incNumPendingContainersAdded() { + numPendingContainersAdded.incr(); + } + + void incNumPendingContainersRemoved() { + numPendingContainersRemoved.incr(); + } + + public long getNumPendingContainersAdded() { + return numPendingContainersAdded.value(); + } + + public long getNumPendingContainersRemoved() { + return numPendingContainersRemoved.value(); + } + /** * Get aggregated counter and gauge metrics. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java index 60b88e94973e..cb88136d7d88 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java @@ -46,6 +46,10 @@ public StaleNodeHandler(NodeManager nodeManager, @Override public void onMessage(DatanodeDetails datanodeDetails, EventPublisher publisher) { + PendingContainerTracker pending = nodeManager.getPendingContainerTracker(); + if (pending != null) { + pending.clearPendingForDatanode(datanodeDetails); + } Set pipelineIds = nodeManager.getPipelines(datanodeDetails); LOG.info("Datanode {} moved to stale state. Finalizing its pipelines {}", diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 6a448d6c88df..64a32174c13d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.utils.db.CodecException; import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; import org.apache.hadoop.hdds.utils.db.Table; @@ -227,4 +228,11 @@ void reinitialize(Table pipelineStore) * Get the pipeline metrics. */ SCMPipelineMetrics getMetrics(); + + /** + * Get DatanodeInfo for a specific DataNode which includes per-volume storage reports. + * @param datanodeDetails The datanode to get info for + * @return DatanodeInfo containing detailed node information including per-disk stats, or null if not available + */ + DatanodeInfo getDatanodeInfo(DatanodeDetails datanodeDetails); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index 9c529e22e7e1..e25fee87efc6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; @@ -638,17 +637,18 @@ private boolean isOpenWithUnregisteredNodes(Pipeline pipeline) { @Override public boolean hasEnoughSpace(Pipeline pipeline, long containerSize) { for (DatanodeDetails node : pipeline.getNodes()) { - if (!(node instanceof DatanodeInfo)) { - node = nodeManager.getDatanodeInfo(node); - } - if (!SCMCommonPlacementPolicy.hasEnoughSpace(node, 0, containerSize)) { + if (!nodeManager.hasSpaceForNewContainerAllocation(node, containerSize)) { return false; } } - return true; } + @Override + public DatanodeInfo getDatanodeInfo(DatanodeDetails datanodeDetails) { + return nodeManager.getDatanodeInfo(datanodeDetails); + } + /** * Schedules a fixed interval job to create pipelines. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index dcfc6aa52ebc..1836f2be13e5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -825,7 +825,7 @@ private void initializeSystemManagers(OzoneConfiguration conf, } else { containerManager = new ContainerManagerImpl(conf, scmHAManager, sequenceIdGen, pipelineManager, scmMetadataStore.getContainerTable(), - containerReplicaPendingOps); + containerReplicaPendingOps, scmNodeManager); } ScmConfig scmConfig = conf.getObject(ScmConfig.class); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java index 45c947cb00a4..8c390cdc1068 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java @@ -154,7 +154,8 @@ void setUp(@TempDir File tempDir) throws Exception { pipelineManager, scmMetadataStore.getContainerTable(), new ContainerReplicaPendingOps( - Clock.system(ZoneId.systemDefault()), null)); + Clock.system(ZoneId.systemDefault()), null), + nodeManager); SCMSafeModeManager safeModeManager = new SCMSafeModeManager(conf, nodeManager, pipelineManager, containerManager, serviceManager, eventQueue, scmContext); SCMConfigurator configurator = new SCMConfigurator(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 013f14b16504..c7bffa7e1b1a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker; import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; @@ -113,6 +114,7 @@ public class MockNodeManager implements NodeManager { private ConcurrentMap> dnsToUuidMap; private int numHealthyDisksPerDatanode; private int numPipelinePerDatanode; + private PendingContainerTracker pendingContainerTracker; { this.healthyNodes = new LinkedList<>(); @@ -531,6 +533,13 @@ public int getPipelinesCount(DatanodeDetails datanodeDetails) { @Override public void addPipeline(Pipeline pipeline) { node2PipelineMap.addPipeline(pipeline); + // Pipeline creation uses DNs that may not be the pre-registered fake nodes; ensure each + // pipeline member has metrics so {@link #getDatanodeInfo} and space checks work. + for (DatanodeDetails dn : pipeline.getNodes()) { + if (nodeMetricMap.get(dn) == null) { + populateNodeMetric(dn, 0); + } + } } /** @@ -941,6 +950,24 @@ public void setNumHealthyVolumes(int value) { numHealthyDisksPerDatanode = value; } + @Override + public PendingContainerTracker getPendingContainerTracker() { + if (pendingContainerTracker == null) { + pendingContainerTracker = new PendingContainerTracker(5L * 1024 * 1024 * 1024); + } + return pendingContainerTracker; + } + + @Override + public boolean hasSpaceForNewContainerAllocation(DatanodeDetails node, long containerSize) { + DatanodeInfo info = getDatanodeInfo(node); + if (info == null) { + return false; + } + return getPendingContainerTracker() + .hasEffectiveAllocatableSpaceForNewContainer(node, info, containerSize); + } + /** * A class to declare some values for the nodes so that our tests * won't fail. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java index 4f0679470eab..8ab456d3f2d7 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -62,6 +63,7 @@ public class SimpleMockNodeManager implements NodeManager { private Map nodeMap = new ConcurrentHashMap<>(); private Map> pipelineMap = new ConcurrentHashMap<>(); private Map> containerMap = new ConcurrentHashMap<>(); + private PendingContainerTracker pendingContainerTracker; public void register(DatanodeDetails dd, NodeStatus status) { dd.setPersistedOpState(status.getOperationalState()); @@ -435,4 +437,17 @@ public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) { return false; } + @Override + public PendingContainerTracker getPendingContainerTracker() { + if (pendingContainerTracker == null) { + pendingContainerTracker = new PendingContainerTracker(5L * 1024 * 1024 * 1024); + } + return pendingContainerTracker; + } + + @Override + public boolean hasSpaceForNewContainerAllocation(DatanodeDetails node, long containerSize) { + return true; + } + } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java index 218a2137e3e6..7b12a0fc89ed 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java @@ -84,6 +84,7 @@ public class TestContainerManagerImpl { private SequenceIdGenerator sequenceIdGen; private ContainerReplicaPendingOps pendingOpsMock; private PipelineManager pipelineManager; + private NodeManager nodeManager; @BeforeAll static void init() { @@ -96,7 +97,7 @@ void setUp() throws Exception { final OzoneConfiguration conf = SCMTestUtils.getConf(testDir); dbStore = DBStoreBuilder.createDBStore(conf, SCMDBDefinition.get()); scmhaManager = SCMHAManagerStub.getInstance(true); - NodeManager nodeManager = new MockNodeManager(true, 10); + nodeManager = new MockNodeManager(true, 10); sequenceIdGen = new SequenceIdGenerator( conf, scmhaManager, SCMDBDefinition.SEQUENCE_ID.getTable(dbStore)); PipelineManager base = new MockPipelineManager(dbStore, scmhaManager, nodeManager); @@ -111,7 +112,8 @@ void setUp() throws Exception { pendingOpsMock = mock(ContainerReplicaPendingOps.class); containerManager = new ContainerManagerImpl(conf, scmhaManager, sequenceIdGen, pipelineManager, - SCMDBDefinition.CONTAINERS.getTable(dbStore), pendingOpsMock); + SCMDBDefinition.CONTAINERS.getTable(dbStore), pendingOpsMock, + nodeManager); } @AfterEach @@ -173,7 +175,7 @@ public void testGetMatchingContainerReturnsContainerWhenEnoughSpaceInDatanodes() OzoneConfiguration conf = SCMTestUtils.getConf(tempDir); ContainerManager manager = new ContainerManagerImpl(conf, scmhaManager, sequenceIdGen, spyPipelineManager, - SCMDBDefinition.CONTAINERS.getTable(dbStore), pendingOpsMock); + SCMDBDefinition.CONTAINERS.getTable(dbStore), pendingOpsMock, nodeManager); Pipeline pipeline = spyPipelineManager.getPipelines().iterator().next(); // the pipeline has no existing containers, so a new container gets allocated in getMatchingContainer diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index 9b7c5c77b2cd..ea63a81c8fe4 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -96,6 +96,7 @@ public class TestContainerPlacement { private SequenceIdGenerator sequenceIdGen; private OzoneConfiguration conf; private PipelineManager pipelineManager; + private NodeManager nodeManager; @BeforeEach public void setUp() throws Exception { @@ -105,7 +106,7 @@ public void setUp() throws Exception { scmhaManager = SCMHAManagerStub.getInstance(true); sequenceIdGen = new SequenceIdGenerator( conf, scmhaManager, SCMDBDefinition.SEQUENCE_ID.getTable(dbStore)); - NodeManager nodeManager = new MockNodeManager(true, 10); + nodeManager = new MockNodeManager(true, 10); pipelineManager = new MockPipelineManager(dbStore, scmhaManager, nodeManager); pipelineManager.createPipeline(RatisReplicationConfig.getInstance( @@ -166,7 +167,8 @@ ContainerManager createContainerManager() scmhaManager, sequenceIdGen, pipelineManager, SCMDBDefinition.CONTAINERS.getTable(dbStore), new ContainerReplicaPendingOps( - Clock.system(ZoneId.systemDefault()), null)); + Clock.system(ZoneId.systemDefault()), null), + nodeManager); } /** diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java new file mode 100644 index 000000000000..9604a57c3da2 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import java.io.IOException; +import java.util.Set; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +/** + * Tests for PendingContainerTracker. + */ +public class TestPendingContainerTracker { + + private static final long MAX_CONTAINER_SIZE = 5L * 1024 * 1024 * 1024; // 5GB + + private PendingContainerTracker tracker; + private Pipeline pipeline; + private DatanodeDetails dn1; + private DatanodeDetails dn2; + private DatanodeDetails dn3; + private ContainerID container1; + private ContainerID container2; + private ContainerID container3; + + @BeforeEach + public void setUp() throws IOException { + tracker = new PendingContainerTracker(MAX_CONTAINER_SIZE); + + // Create a 3-node Ratis pipeline + pipeline = MockPipeline.createPipeline(3); + dn1 = pipeline.getNodes().get(0); + dn2 = pipeline.getNodes().get(1); + dn3 = pipeline.getNodes().get(2); + + container1 = ContainerID.valueOf(1L); + container2 = ContainerID.valueOf(2L); + container3 = ContainerID.valueOf(3L); + } + + @Test + public void testRecordPendingAllocation() { + // Initially no pending containers + assertEquals(0, tracker.getPendingContainers(dn1).size()); + assertEquals(0, tracker.getPendingAllocationSize(dn1)); + + // Record a pending allocation + tracker.recordPendingAllocation(pipeline, container1); + + // All 3 DNs should have the container pending + assertEquals(1, tracker.getPendingContainers(dn1).size()); + assertEquals(1, tracker.getPendingContainers(dn2).size()); + assertEquals(1, tracker.getPendingContainers(dn3).size()); + + // Size should be MAX_CONTAINER_SIZE for each DN + assertEquals(MAX_CONTAINER_SIZE, tracker.getPendingAllocationSize(dn1)); + assertEquals(MAX_CONTAINER_SIZE, tracker.getPendingAllocationSize(dn2)); + assertEquals(MAX_CONTAINER_SIZE, tracker.getPendingAllocationSize(dn3)); + } + + @Test + public void testRecordMultiplePendingAllocations() { + tracker.recordPendingAllocation(pipeline, container1); + tracker.recordPendingAllocation(pipeline, container2); + tracker.recordPendingAllocation(pipeline, container3); + + // Each DN should have 3 pending containers + assertEquals(3, tracker.getPendingContainers(dn1).size()); + assertEquals(3, tracker.getPendingContainers(dn2).size()); + assertEquals(3, tracker.getPendingContainers(dn3).size()); + + // Size should be 3 × MAX_CONTAINER_SIZE + assertEquals(3 * MAX_CONTAINER_SIZE, tracker.getPendingAllocationSize(dn1)); + } + + @Test + public void testIdempotentRecording() { + tracker.recordPendingAllocation(pipeline, container1); + tracker.recordPendingAllocation(pipeline, container1); // Duplicate + + // Should still be 1 container (Set deduplication) + assertEquals(1, tracker.getPendingContainers(dn1).size()); + assertEquals(MAX_CONTAINER_SIZE, tracker.getPendingAllocationSize(dn1)); + } + + /** + * After one roll interval, pending entries move from currentWindow to previousWindow and remain + * visible. After a second roll (2× interval total), the old previousWindow is discarded and the + * container ages out if not confirmed. + */ + @Test + @Timeout(30) + public void testTwoWindowRollAgesOutContainerAfterTwoIntervals() throws InterruptedException { + long rollMs = 200L; + PendingContainerTracker shortRollTracker = + new PendingContainerTracker(MAX_CONTAINER_SIZE, rollMs, null); + + shortRollTracker.recordPendingAllocationForDatanode(dn1, container1); + assertEquals(1, shortRollTracker.getPendingContainers(dn1).size()); + assertThat(shortRollTracker.getPendingContainers(dn1)).contains(container1); + + // First roll: C1 moves from currentWindow to previousWindow; union still includes C1 + Thread.sleep(rollMs + 80); + shortRollTracker.rollWindowsIfNeeded(dn1); + assertEquals(1, shortRollTracker.getPendingContainers(dn1).size()); + assertThat(shortRollTracker.getPendingContainers(dn1)).contains(container1); + + // Second roll: prior previousWindow (holding C1) is dropped; C1 is no longer pending + Thread.sleep(rollMs + 80); + shortRollTracker.rollWindowsIfNeeded(dn1); + assertEquals(0, shortRollTracker.getPendingContainers(dn1).size()); + assertEquals(0L, shortRollTracker.getPendingAllocationSize(dn1)); + } + + @Test + public void testRemovePendingAllocation() { + tracker.recordPendingAllocation(pipeline, container1); + tracker.recordPendingAllocation(pipeline, container2); + + assertEquals(2, tracker.getPendingContainers(dn1).size()); + + // Remove one container from DN1 + tracker.removePendingAllocation(dn1, container1); + + assertEquals(1, tracker.getPendingContainers(dn1).size()); + assertEquals(MAX_CONTAINER_SIZE, tracker.getPendingAllocationSize(dn1)); + + // DN2 and DN3 should still have both containers + assertEquals(2, tracker.getPendingContainers(dn2).size()); + assertEquals(2, tracker.getPendingContainers(dn3).size()); + } + + @Test + public void testRemovePendingAllocationFromPipeline() { + tracker.recordPendingAllocation(pipeline, container1); + tracker.recordPendingAllocation(pipeline, container2); + + // Remove container1 from all nodes in pipeline + for (DatanodeDetails dn : pipeline.getNodes()) { + tracker.removePendingAllocation(dn, container1); + } + + // All DNs should have only container2 remaining + assertEquals(1, tracker.getPendingContainers(dn1).size()); + assertEquals(1, tracker.getPendingContainers(dn2).size()); + assertEquals(1, tracker.getPendingContainers(dn3).size()); + } + + @Test + public void testRemoveNonExistentContainer() { + tracker.recordPendingAllocation(pipeline, container1); + + // Remove a container that was never added - should not throw exception + tracker.removePendingAllocation(dn1, container2); + + // DN1 should still have container1 + assertEquals(1, tracker.getPendingContainers(dn1).size()); + } + + @Test + public void testGetPendingContainers() { + tracker.recordPendingAllocation(pipeline, container1); + tracker.recordPendingAllocation(pipeline, container2); + + Set pending = tracker.getPendingContainers(dn1); + + assertEquals(2, pending.size()); + assertThat(pending.contains(container1)); + assertThat(pending.contains(container2)); + + // Returned set should be a copy - modifying it shouldn't affect tracker + pending.add(container3); + assertEquals(2, tracker.getPendingContainers(dn1).size()); // Should still be 2 + } + + @Test + public void testGetPendingContainersForNonExistentDN() { + DatanodeDetails unknownDN = MockDatanodeDetails.randomDatanodeDetails(); + + Set pending = tracker.getPendingContainers(unknownDN); + + assertThat(pending.isEmpty()); + } + + @Test + public void testGetTotalPendingCount() { + assertEquals(0, tracker.getTotalPendingCount()); + + tracker.recordPendingAllocation(pipeline, container1); + + // 1 container × 3 DNs = 3 total pending + assertEquals(3, tracker.getTotalPendingCount()); + + tracker.recordPendingAllocation(pipeline, container2); + + // 2 containers × 3 DNs = 6 total pending + assertEquals(6, tracker.getTotalPendingCount()); + + // Remove from one DN + tracker.removePendingAllocation(dn1, container1); + + // (2 containers × 2 DNs) + (1 container × 1 DN) = 5 total + assertEquals(5, tracker.getTotalPendingCount()); + } + + @Test + public void testConcurrentModification() throws InterruptedException { + // Test thread-safety by having multiple threads add/remove containers + final int numThreads = 10; + final int operationsPerThread = 100; + + Thread[] threads = new Thread[numThreads]; + + for (int i = 0; i < numThreads; i++) { + final int threadId = i; + threads[i] = new Thread(() -> { + for (int j = 0; j < operationsPerThread; j++) { + ContainerID cid = ContainerID.valueOf(threadId * 1000L + j); + tracker.recordPendingAllocation(pipeline, cid); + + if (j % 2 == 0) { + tracker.removePendingAllocation(dn1, cid); + } + } + }); + } + + // Start all threads + for (Thread thread : threads) { + thread.start(); + } + + // Wait for all to finish + for (Thread thread : threads) { + thread.join(); + } + + // Verify no exceptions occurred and counts are reasonable + assertThat(tracker.getTotalPendingCount() >= 0); + assertThat(tracker.getDataNodeCount() <= 3); + } + + @Test + public void testMemoryCleanupOnEmptySet() { + tracker.recordPendingAllocation(pipeline, container1); + + assertEquals(3, tracker.getDataNodeCount()); + + // Remove the only pending container from DN1 + tracker.removePendingAllocation(dn1, container1); + + // DN1 should be removed from the map (memory cleanup) + assertEquals(2, tracker.getDataNodeCount()); + } + + @Test + public void testPendingContainer() { + // Simulate allocation and confirmation flow + + // Allocate 3 containers + tracker.recordPendingAllocation(pipeline, container1); + tracker.recordPendingAllocation(pipeline, container2); + tracker.recordPendingAllocation(pipeline, container3); + + // Each DN should have 3 pending, 15GB total + assertEquals(3, tracker.getPendingContainers(dn1).size()); + assertEquals(3 * MAX_CONTAINER_SIZE, tracker.getPendingAllocationSize(dn1)); + + // DN1 confirms container1 via container report + tracker.removePendingAllocation(dn1, container1); + + // DN1 now has 2 pending, 10GB + assertEquals(2, tracker.getPendingContainers(dn1).size()); + assertEquals(2 * MAX_CONTAINER_SIZE, tracker.getPendingAllocationSize(dn1)); + + // DN2 and DN3 still have 3 pending + assertEquals(3, tracker.getPendingContainers(dn2).size()); + assertEquals(3, tracker.getPendingContainers(dn3).size()); + + // All DNs eventually confirm all containers + for (DatanodeDetails dn : pipeline.getNodes()) { + tracker.removePendingAllocation(dn, container1); + tracker.removePendingAllocation(dn, container2); + tracker.removePendingAllocation(dn, container3); + } + + // All DNs should have 0 pending + assertEquals(0, tracker.getPendingContainers(dn1).size()); + assertEquals(0, tracker.getPendingContainers(dn2).size()); + assertEquals(0, tracker.getPendingContainers(dn3).size()); + assertEquals(0, tracker.getTotalPendingCount()); + assertEquals(0, tracker.getDataNodeCount()); + } + + @Test + public void testRemoveFromBothWindows() { + // This test verifies that removal works from both current and previous windows + // In general, a container could be in previous window after a roll + + // Add containers + tracker.recordPendingAllocation(pipeline, container1); + tracker.recordPendingAllocation(pipeline, container2); + + assertEquals(2, tracker.getPendingContainers(dn1).size()); + + // Remove container1 - should work regardless of which window it's in + tracker.removePendingAllocation(dn1, container1); + + assertEquals(1, tracker.getPendingContainers(dn1).size()); + + Set pending = tracker.getPendingContainers(dn1); + assertFalse(pending.contains(container1)); + assertThat(pending.contains(container2)); + } + + @Test + public void testUnionOfBothWindows() { + // This test verifies the two-window concept: + // getPendingContainers should return union of current + previous windows + + // Add container1 + tracker.recordPendingAllocation(pipeline, container1); + + assertEquals(1, tracker.getPendingContainers(dn1).size()); + Set pending1 = tracker.getPendingContainers(dn1); + assertThat(pending1.contains(container1)); + + // Add container2 - should be in same window initially + tracker.recordPendingAllocation(pipeline, container2); + + assertEquals(2, tracker.getPendingContainers(dn1).size()); + Set pending2 = tracker.getPendingContainers(dn1); + assertThat(pending2.contains(container1)); + assertThat(pending2.contains(container2)); + + // Both containers should be in the union + assertEquals(2, pending2.size()); + } + + @Test + public void testIdempotencyAcrossWindows() { + // Recording same container multiple times should only count it once + // This should work even if it spans windows + + tracker.recordPendingAllocation(pipeline, container1); + assertEquals(1, tracker.getPendingContainers(dn1).size()); + + // Record again - should still be 1 (idempotency via Set) + tracker.recordPendingAllocation(pipeline, container1); + assertEquals(1, tracker.getPendingContainers(dn1).size()); + + // Add different container + tracker.recordPendingAllocation(pipeline, container2); + assertEquals(2, tracker.getPendingContainers(dn1).size()); + + // Record container1 again + tracker.recordPendingAllocation(pipeline, container1); + assertEquals(2, tracker.getPendingContainers(dn1).size()); + } + + @Test + public void testExplicitRemoval() { + + tracker.recordPendingAllocation(pipeline, container1); + tracker.recordPendingAllocation(pipeline, container2); + tracker.recordPendingAllocation(pipeline, container3); + + assertEquals(3, tracker.getPendingContainers(dn1).size()); + + // Simulate container report confirms container1 and container2 + tracker.removePendingAllocation(dn1, container1); + tracker.removePendingAllocation(dn1, container2); + + // Immediately reflects the removal (doesn't wait for aging) + assertEquals(1, tracker.getPendingContainers(dn1).size()); + + Set pending = tracker.getPendingContainers(dn1); + assertEquals(1, pending.size()); + assertThat(pending.contains(container3)); + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java index d6a3fc546352..4086c9484729 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition; +import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.utils.db.CodecException; import org.apache.hadoop.hdds.utils.db.DBStore; @@ -50,9 +51,11 @@ public class MockPipelineManager implements PipelineManager { private final PipelineStateManager stateManager; + private final NodeManager nodeManager; public MockPipelineManager(DBStore dbStore, SCMHAManager scmhaManager, NodeManager nodeManager) throws RocksDatabaseException, CodecException, DuplicatedPipelineIdException { + this.nodeManager = nodeManager; stateManager = PipelineStateManagerImpl .newBuilder().setNodeManager(nodeManager) .setRatisServer(scmhaManager.getRatisServer()) @@ -333,7 +336,12 @@ public boolean isPipelineCreationFrozen() { @Override public boolean hasEnoughSpace(Pipeline pipeline, long containerSize) { - return false; + for (DatanodeDetails node : pipeline.getNodes()) { + if (!nodeManager.hasSpaceForNewContainerAllocation(node, containerSize)) { + return false; + } + } + return true; } @Override @@ -346,4 +354,9 @@ public int openContainerLimit(List datanodes) { public SCMPipelineMetrics getMetrics() { return null; } + + @Override + public DatanodeInfo getDatanodeInfo(DatanodeDetails datanodeDetails) { + return nodeManager.getDatanodeInfo(datanodeDetails); + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java index e7fc6f14f9b6..d7a0e682fab4 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; +import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; @@ -976,6 +977,12 @@ public void testHasEnoughSpace() throws IOException { doReturn(info).when(mockedNodeManager).getDatanodeInfo(dn); datanodeInfoList.add(info); } + doAnswer(invocation -> { + DatanodeDetails dn = invocation.getArgument(0); + long cs = invocation.getArgument(1); + DatanodeInfo info = mockedNodeManager.getDatanodeInfo(dn); + return SCMCommonPlacementPolicy.hasEnoughSpace(info, 0, cs); + }).when(mockedNodeManager).hasSpaceForNewContainerAllocation(any(DatanodeDetails.class), anyLong()); assertTrue(pipelineManager.hasEnoughSpace(pipeline, containerSize)); // Case 2: One node does not have enough space. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java index fdf38a7a67ca..a011d87cd17b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java @@ -584,7 +584,8 @@ public void testContainerSafeModeRuleEC(int data, int parity) throws Exception { ContainerManager containerManager = new ContainerManagerImpl(config, SCMHAManagerStub.getInstance(true), null, pipelineManager, scmMetadataStore.getContainerTable(), - new ContainerReplicaPendingOps(Clock.system(ZoneId.systemDefault()), null)); + new ContainerReplicaPendingOps(Clock.system(ZoneId.systemDefault()), null), + nodeManager); scmSafeModeManager = new SCMSafeModeManager(config, nodeManager, pipelineManager, containerManager, serviceManager, queue, scmContext); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTrackerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTrackerIntegration.java new file mode 100644 index 000000000000..ac53f43a9d52 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTrackerIntegration.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker; +import org.apache.hadoop.hdds.scm.node.SCMNodeManager; +import org.apache.hadoop.hdds.scm.node.SCMNodeMetrics; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.TestDataUtil; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration tests for PendingContainerTracker. + */ +@Timeout(300) +public class TestPendingContainerTrackerIntegration { + + private static final Logger LOG = + LoggerFactory.getLogger(TestPendingContainerTrackerIntegration.class); + private MiniOzoneCluster cluster; + private StorageContainerManager scm; + private OzoneClient client; + private ContainerManager containerManager; + private PendingContainerTracker pendingTracker; + private SCMNodeMetrics metrics; + private OzoneBucket bucket; + + @BeforeEach + public void setup() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + + conf.set(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, "60s"); + + // Reduce heartbeat interval for faster container reports + conf.set(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, "10s"); + + conf.set("ozone.scm.container.size", "100MB"); + conf.set("ozone.scm.pipeline.owner.container.count", "1"); + conf.set("ozone.scm.pipeline.per.metadata.disk", "1"); + conf.set("ozone.scm.datanode.pipeline.limit", "1"); + + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(3) + .build(); + cluster.waitForClusterToBeReady(); + cluster.waitTobeOutOfSafeMode(); + + scm = cluster.getStorageContainerManager(); + containerManager = scm.getContainerManager(); + client = cluster.newClient(); + + // Create bucket for testing + bucket = TestDataUtil.createVolumeAndBucket(client); + + SCMNodeManager nodeManager = (SCMNodeManager) scm.getScmNodeManager(); + assertNotNull(nodeManager); + pendingTracker = nodeManager.getPendingContainerTracker(); + assertNotNull(pendingTracker, "PendingContainerTracker should be initialized"); + metrics = pendingTracker.getMetrics(); + + LOG.info("Test setup complete - ICR interval: 5s, Heartbeat interval: 1s"); + } + + @AfterEach + public void cleanup() throws Exception { + if (client != null) { + client.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Test: Write key → Container allocation → Pending tracked → ICR → Pending removed. + */ + @Test + public void testKeyWriteRecordsPendingAndICRRemovesIt() throws Exception { + long initialAdded = metrics.getNumPendingContainersAdded(); + long initialRemoved = metrics.getNumPendingContainersRemoved(); + + // Allocate a container directly + ContainerInfo container = containerManager.allocateContainer( + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + "omServiceIdDefault"); + + // Find the container that was allocated + ContainerInfo containerInfo = scm.getContainerManager().getContainers().get(0); + ContainerWithPipeline containerWithPipeline = + scm.getClientProtocolServer().getContainerWithPipeline( + containerInfo.getContainerID()); + + Pipeline pipeline = containerWithPipeline.getPipeline(); + + // Verify pending containers are tracked for all nodes in pipeline + List nodesWithPending = new ArrayList<>(); + for (DatanodeDetails dn : pipeline.getNodes()) { + long pendingSize = pendingTracker.getPendingAllocationSize(dn); + if (pendingSize > 0) { + nodesWithPending.add(dn); + LOG.info("DataNode {} has {} bytes pending", dn.getUuidString(), pendingSize); + + Set pendingContainers = pendingTracker.getPendingContainers(dn); + assertThat(pendingContainers.contains(container.containerID())); + } + } + + assertThat(!nodesWithPending.isEmpty()); + + // Verify metrics increased + long afterAdded = metrics.getNumPendingContainersAdded(); + assertThat(afterAdded > initialAdded); + + LOG.info("Pending tracked successfully. Waiting for ICR to remove pending..."); + + // Write a key + String keyName = "testKey1"; + byte[] data = "Hello Ozone - Testing Pending Container Tracker".getBytes(UTF_8); + + LOG.info("Writing key: {}", keyName); + try (OzoneOutputStream out = bucket.createKey(keyName, data.length, + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + new java.util.HashMap<>())) { + out.write(data); + } + LOG.info("Key written successfully"); + + // Wait for ICRs to be sent + GenericTestUtils.waitFor(() -> { + for (DatanodeDetails dn : nodesWithPending) { + Set pendingContainers = pendingTracker.getPendingContainers(dn); + if (pendingContainers.contains(container.containerID())) { + LOG.info("Still waiting for ICR from DN {}", dn.getUuidString()); + return false; + } + } + + LOG.info("All pending containers removed via ICR!"); + return true; + }, 100, 5000); + + // Verify all pending containers removed + for (DatanodeDetails dn : nodesWithPending) { + Set pendingContainers = pendingTracker.getPendingContainers(dn); + assertThat(!pendingContainers.contains(container.containerID())); + } + + // Verify remove metrics increased + long afterRemoved = metrics.getNumPendingContainersRemoved(); + assertThat(afterRemoved > initialRemoved); + + LOG.info("After added = " + afterAdded); + + } + + /** + * Test: Verify idempotency - container reported multiple times. + */ + @Test + public void testIdempotentPendingTracking() throws Exception { + // Allocate a container directly + ContainerInfo container = containerManager.allocateContainer( + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + "omServiceIdDefault"); + + Pipeline pipeline = scm.getPipelineManager().getPipeline(container.getPipelineID()); + DatanodeDetails firstNode = pipeline.getFirstNode(); + + // Record initial state + long initialSize = pendingTracker.getPendingAllocationSize(firstNode); + int initialCount = pendingTracker.getPendingContainers(firstNode).size(); + + LOG.info("Initial pending state: size={}, count={}", initialSize, initialCount); + + // Try adding the same container again (simulates retry or duplicate allocation) + pendingTracker.recordPendingAllocationForDatanode(firstNode, container.containerID()); + + long afterSize = pendingTracker.getPendingAllocationSize(firstNode); + int afterCount = pendingTracker.getPendingContainers(firstNode).size(); + + // Size and count should remain the same (idempotent) + assertEquals(initialSize, afterSize, + "Pending size should not change when adding duplicate container"); + assertEquals(initialCount, afterCount, + "Pending count should not change when adding duplicate container"); + + } + + /** + * Test: Verify metrics are updated correctly. + */ + @Test + public void testMetricsUpdateThroughLifecycle() throws Exception { + long initialAdded = metrics.getNumPendingContainersAdded(); + long initialRemoved = metrics.getNumPendingContainersRemoved(); + + LOG.info("Initial metrics: added={}, removed={}", initialAdded, initialRemoved); + + // Write multiple keys + for (int i = 0; i < 3; i++) { + String keyName = "metricsTestKey" + i; + byte[] data = ("Metrics test " + i).getBytes(UTF_8); + + try (OzoneOutputStream out = bucket.createKey(keyName, data.length, + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + new java.util.HashMap<>())) { + out.write(data); + } + } + + // addedMetrics should increase as containers are allocated + GenericTestUtils.waitFor(() -> { + long afterAdded = metrics.getNumPendingContainersAdded(); + return afterAdded > initialAdded; + }, 100, 5000); + + // Removed metric should increase after icr process + GenericTestUtils.waitFor(() -> { + long afterRemoved = metrics.getNumPendingContainersRemoved(); + return initialRemoved < afterRemoved; + }, 100, 5000); + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java index 586aad5fd68f..31488c6d4a03 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator; +import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.utils.db.DBStore; @@ -85,10 +86,11 @@ public ReconContainerManager( ReconContainerMetadataManager reconContainerMetadataManager, SCMHAManager scmhaManager, SequenceIdGenerator sequenceIdGen, - ContainerReplicaPendingOps pendingOps) + ContainerReplicaPendingOps pendingOps, + NodeManager nodeManager) throws IOException { super(conf, scmhaManager, sequenceIdGen, pipelineManager, containerStore, - pendingOps); + pendingOps, nodeManager); this.scmClient = scm; this.pipelineManager = pipelineManager; this.containerHealthSchemaManager = containerHealthSchemaManager; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java index 278bac0011dc..46926b7d4fe0 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java @@ -246,7 +246,7 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf, pipelineManager, scmServiceProvider, containerHealthSchemaManager, reconContainerMetadataManager, - scmhaManager, sequenceIdGen, pendingOps); + scmhaManager, sequenceIdGen, pendingOps, nodeManager); this.scmServiceProvider = scmServiceProvider; this.isSyncDataFromSCMRunning = new AtomicBoolean(); this.containerCountBySizeDao = containerCountBySizeDao; diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java index 33e20413bfd6..0900fd2bb96f 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java @@ -116,7 +116,8 @@ public void setUp(@TempDir File tempDir) throws Exception { mock(ReconContainerMetadataManager.class), scmhaManager, sequenceIdGen, - pendingOps); + pendingOps, + nodeManager); } @AfterEach