Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.ha;

import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;

/**
* Base interface for SCM handlers participating in Ratis-based HA.
* Each handler is associated with a fixed {@link RequestType}.
*/
public interface SCMHandler {

/**
* Returns the {@link RequestType} of this handler.
*/
RequestType getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import java.util.List;
import java.util.UUID;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMHandler;
import org.apache.hadoop.hdds.scm.metadata.Replicate;

/**
* This component holds the state of managed SecretKeys, including the
* current key and all active keys.
*/
public interface SecretKeyState {
public interface SecretKeyState extends SCMHandler {
/**
* Get the current active key, which is used for signing tokens. This is
* also the latest key managed by this state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -134,4 +135,9 @@ private void updateKeysInternal(List<ManagedSecretKey> newKeys) {
public void reinitialize(List<ManagedSecretKey> secretKeys) {
updateKeysInternal(secretKeys);
}

@Override
public RequestType getType() {
return RequestType.SECRET_KEY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.security.cert.X509Certificate;
import java.util.List;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
import org.apache.hadoop.hdds.scm.ha.SCMHandler;
import org.apache.hadoop.hdds.scm.metadata.Replicate;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;

Expand All @@ -34,7 +35,7 @@
* With this interface, DefaultCA server read and write DB or persistence
* layer and we can write to SCM's Metadata DB.
*/
public interface CertificateStore {
public interface CertificateStore extends SCMHandler {

/**
* Writes a new certificate that was issued to the persistent store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;

/**
Expand Down Expand Up @@ -67,4 +68,9 @@ public List<X509Certificate> listCertificate(NodeType role,
@Override
public void reinitialize(SCMMetadataStore metadataStore) {
}

@Override
public RequestType getType() {
return RequestType.CERT_STORE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.ha.SCMHandler;
import org.apache.hadoop.hdds.scm.ha.ScmInvokerCodeGenerator;
import org.apache.hadoop.hdds.scm.metadata.Replicate;
import org.apache.hadoop.hdds.utils.db.Table;
Expand All @@ -31,7 +32,7 @@
* DeletedBlockLogStateManager interface to
* manage deleted blocks and record them in the underlying persist store.
*/
public interface DeletedBlockLogStateManager {
public interface DeletedBlockLogStateManager extends SCMHandler {
Copy link
Copy Markdown
Contributor

@szetszwo szetszwo Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Similarly, we should implement getType() here instead of the subclasses.
  • We should do it for all the handler interfaces.
  @Override
  default RequestType getType() {
    return RequestType.BLOCK;
  }

@Replicate
void addTransactionsToDB(ArrayList<DeletedBlocksTransaction> txs,
DeletedBlocksTransactionSummary summary) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
Expand Down Expand Up @@ -198,6 +199,11 @@ public static Builder newBuilder() {
return new Builder();
}

@Override
public RequestType getType() {
return RequestType.BLOCK;
}

/**
* Builder for ContainerStateManager.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.ha.SCMHandler;
import org.apache.hadoop.hdds.scm.metadata.Replicate;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.utils.db.Table;
Expand All @@ -49,7 +50,7 @@
* 4. The declaration should throw RaftException
*
*/
public interface ContainerStateManager {
public interface ContainerStateManager extends SCMHandler {

/* **********************************************************************
* Container Life Cycle *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ private void initialize() throws IOException {
return actions;
}

@Override
public RequestType getType() {
return RequestType.CONTAINER;
}

@Override
public List<ContainerID> getContainerIDs(LifeCycleState state, ContainerID start, int count) {
try (AutoCloseableLock ignored = readLock()) {
Expand Down Expand Up @@ -636,8 +641,7 @@ public ContainerStateManager build() throws IOException {
conf, pipelineMgr, table, transactionBuffer,
containerReplicaPendingOps);

return scmRatisServer.getProxyHandler(RequestType.CONTAINER,
ContainerStateManager.class, csm);
return scmRatisServer.getProxyHandler(ContainerStateManager.class, csm);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ SCMRatisResponse submitRequest(SCMRatisRequest request)

RaftPeerId getLeaderId();

default <T> T getProxyHandler(ScmInvoker<T> invoker) {
default <T extends SCMHandler> T getProxyHandler(ScmInvoker<T> invoker) {
registerStateMachineHandler(invoker.getType(), invoker.getImpl());
return invoker.getProxy();
}

default <T> T getProxyHandler(RequestType type, Class<T> intf, T impl) {
default <T extends SCMHandler> T getProxyHandler(Class<T> intf, T impl) {
final SCMHAInvocationHandler invocationHandler =
new SCMHAInvocationHandler(type, impl, this);
new SCMHAInvocationHandler(impl.getType(), impl, this);
return intf.cast(Proxy.newProxyInstance(getClass().getClassLoader(),
new Class<?>[] {intf}, invocationHandler));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.hadoop.hdds.scm.ha;

import static org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType.SEQUENCE_ID;
import static org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SEQUENCE_ID_BATCH_SIZE;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SEQUENCE_ID_BATCH_SIZE_DEFAULT;

Expand Down Expand Up @@ -187,7 +187,7 @@ public void reinitialize(Table<String, Long> sequenceIdTable)
/**
* Maintain SequenceIdTable in RocksDB.
*/
interface StateManager {
interface StateManager extends SCMHandler {
/**
* Compare And Swap lastId saved in db from expectedLastId to newLastId.
* If based on Ratis, it will submit a raft client request.
Expand Down Expand Up @@ -291,6 +291,11 @@ private void initialize() throws IOException {
}
}

@Override
public RequestType getType() {
return RequestType.SEQUENCE_ID;
}

/**
* Builder for Ratis based StateManager.
*/
Expand Down Expand Up @@ -321,7 +326,7 @@ public StateManager build() {

final StateManager impl = new StateManagerImpl(table, buffer);

return ratisServer.getProxyHandler(SEQUENCE_ID, StateManager.class, impl);
return ratisServer.getProxyHandler(StateManager.class, impl);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* This interface defines an API for saving and reading configurations of a
* {@link StatefulService}.
*/
public interface StatefulServiceStateManager {
public interface StatefulServiceStateManager extends SCMHandler {

/**
* Persists the specified configurations bytes to RocksDB and replicates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ private StatefulServiceStateManagerImpl(
this.transactionBuffer = scmDBTransactionBuffer;
}

@Override
public RequestType getType() {
return RequestType.STATEFUL_SERVICE_CONFIG;
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -134,8 +139,7 @@ public StatefulServiceStateManager build() {
new StatefulServiceStateManagerImpl(statefulServiceConfig,
transactionBuffer);

return scmRatisServer.getProxyHandler(RequestType.STATEFUL_SERVICE_CONFIG,
StatefulServiceStateManager.class, stateManager);
return scmRatisServer.getProxyHandler(StatefulServiceStateManager.class, stateManager);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public DeletedBlockLogStateManagerInvoker(DeletedBlockLogStateManager impl, SCMR

@Override
public RequestType getType() {
return RequestType.BLOCK;
return impl.getType();
}

@Override
Expand All @@ -82,6 +82,11 @@ public DeletedBlockLogStateManager getImpl() {
@Override
public DeletedBlockLogStateManager getProxy() {
return new DeletedBlockLogStateManager() {
@Override
public RequestType getType() {
return impl.getType();
}

@Override
public void addTransactionsToDB(ArrayList<DeletedBlocksTransaction> txs)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@

import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMHandler;
import org.apache.hadoop.hdds.scm.ha.SCMRatisRequest;
import org.apache.hadoop.hdds.scm.ha.SCMRatisResponse;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;

/**
* Invokes methods without using reflection.
*/
public abstract class ScmInvoker<T> {
public abstract class ScmInvoker<T extends SCMHandler> {
private final SCMRatisServer ratisHandler;

ScmInvoker(SCMRatisServer ratisHandler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.ha.SCMHandler;
import org.apache.hadoop.hdds.scm.metadata.Replicate;
import org.apache.hadoop.hdds.utils.db.CodecException;
import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
Expand All @@ -33,7 +34,7 @@
/**
* Manages the state of pipelines in SCM.
*/
public interface PipelineStateManager {
public interface PipelineStateManager extends SCMHandler {

/**
* Adding pipeline would be replicated to Ratis.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ public static Builder newBuilder() {
return new Builder();
}

@Override
public RequestType getType() {
return RequestType.PIPELINE;
}

/**
* Builder for PipelineStateManagerImpl.
*/
Expand Down Expand Up @@ -364,8 +369,7 @@ public PipelineStateManager build() throws RocksDatabaseException, DuplicatedPip
pipelineStore, nodeManager, transactionBuffer);
pipelineStateManager.initialize();

return scmRatisServer.getProxyHandler(RequestType.PIPELINE,
PipelineStateManager.class, pipelineStateManager);
return scmRatisServer.getProxyHandler(PipelineStateManager.class, pipelineStateManager);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.hadoop.hdds.scm.security;

import java.io.IOException;
import org.apache.hadoop.hdds.scm.ha.SCMHandler;
import org.apache.hadoop.hdds.scm.metadata.Replicate;

/**
* This interface defines APIs for sub-ca rotation instructions.
*/
public interface RootCARotationHandler {
public interface RootCARotationHandler extends SCMHandler {

/**
* Notify SCM peers to do sub-ca rotation preparation and replicate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BACKUP_KEY_CERT_DIR_NAME_SUFFIX;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NEW_KEY_CERT_DIR_NAME_SUFFIX;
import static org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType.CERT_ROTATE;

import java.io.File;
import java.io.IOException;
Expand All @@ -29,6 +28,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.SecurityConfig;
Expand Down Expand Up @@ -198,6 +198,11 @@ public void setSubCACertId(String subCACertId) {
LOG.info("Scm sub CA new certificate is {}", subCACertId);
}

@Override
public RequestType getType() {
return RequestType.CERT_ROTATE;
}

/**
* Builder for RootCARotationHandlerImpl.
*/
Expand Down Expand Up @@ -228,8 +233,7 @@ public RootCARotationHandler build() {
final RootCARotationHandler impl =
new RootCARotationHandlerImpl(scm, rootCARotationManager);

return ratisServer.getProxyHandler(CERT_ROTATE,
RootCARotationHandler.class, impl);
return ratisServer.getProxyHandler(RootCARotationHandler.class, impl);
}
}
}
Loading
Loading