Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ void participateAndBlockStateTransition(List<AmbryStatsReport> ambryStatsReports
*/
void unblockStateTransition();

/**
* Populates initial data node config.
*/
boolean populateDataNodeConfig();

/**
* Set the sealed state of the given replica.
* @param replicaId the {@link ReplicaId} whose sealed state will be updated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,25 @@ public class DataNodeConfig {
private final Map<String, DiskConfig> diskConfigs = new TreeMap<>();
private final Map<String, Map<String, String>> extraMapFields = new HashMap<>();

/**
* @param datacenterName the datacenter this server is in.
* @param hostName the host name of the server.
* @param http2Port the HTTP2 port, or {@code null} if the server does not have one.
* @param port the port of the server.
* @param sslPort the ssl port, or {@code null} if the server does not have one.
*/
public DataNodeConfig(String datacenterName, String hostName, Integer http2Port, int port,
Integer sslPort) {
this.datacenterName = datacenterName;
this.hostName = hostName;
this.http2Port = http2Port;
this.port = port;
this.sslPort = sslPort;
this.instanceName = "";
this.rackId = "";
this.xid = 0;
}

/**
* @param instanceName a name that can be used as a unique key for this server.
* @param hostName the host name of the server.
Expand Down Expand Up @@ -156,6 +175,15 @@ Map<String, DiskConfig> getDiskConfigs() {
return diskConfigs;
}

/**
* Add a disk configuration to this DataNode.
* @param mountPath the mount path of the disk
* @param diskConfig the disk configuration
*/
public void addDiskConfig(String mountPath, DiskConfig diskConfig) {
diskConfigs.put(mountPath, diskConfig);
}

/**
* This can be used for extra fields that are not recognized by {@link DataNodeConfigSource} but still need to be
* read from or written to the source of truth. This should be used sparingly and is mainly provided for legacy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,55 @@ public class ClusterMapConfig {
@Default("false")
public final boolean enableFileCopyProtocol;

/**
* Path to the nimbus service metadata file containing instance information.
*/
@Config("clustermap.nimbus.service.metadata.file.path")
@Default("")
public final String nimbusServiceMetadataFilePath;

/**
* Path to the LiStatefulSet metadata file containing Kubernetes StatefulSet information.
*/
@Config("clustermap.listatefulset.metadata.file.path")
@Default("")
public final String liStatefulSetMetadataFilePath;

/**
* Percentage of disk space to reserve (0.0 to 1.0).
*/
@Config("clustermap.reserve.disk.space.percentage")
@Default("0.0")
public final double clusterMapReserveDiskSpacePercentage;

/**
* Prefix for resource tags in cluster map.
*/
@Config("clustermap.resource.tag.prefix")
@Default("")
public final String clusterMapResourceTagPrefix;

/**
* Default HTTP2 port for cluster nodes.
*/
@Config("clustermap.default.http2.port")
@Default("0")
public final int clusterMapDefaultHttp2Port;

/**
* Default port for cluster nodes.
*/
@Config("clustermap.default.port")
@Default("0")
public final int clusterMapDefaultPort;

/**
* Default SSL port for cluster nodes.
*/
@Config("clustermap.default.ssl.port")
@Default("0")
public final int clusterMapDefaultSslPort;

public ClusterMapConfig(VerifiableProperties verifiableProperties) {
clusterMapFixedTimeoutDatanodeErrorThreshold =
verifiableProperties.getIntInRange("clustermap.fixedtimeout.datanode.error.threshold", 3, 1, 100);
Expand Down Expand Up @@ -508,5 +557,12 @@ public ClusterMapConfig(VerifiableProperties verifiableProperties) {
routerPutSuccessTarget = verifiableProperties.getIntInRange(ROUTER_PUT_SUCCESS_TARGET, 2, 1, Integer.MAX_VALUE);
clusterMapPartitionFilteringEnabled = verifiableProperties.getBoolean(PARTITION_FILTERING_ENABLED, false);
enableFileCopyProtocol = verifiableProperties.getBoolean(ENABLE_FILE_COPY_PROTOCOL, false);
nimbusServiceMetadataFilePath = verifiableProperties.getString("clustermap.nimbus.service.metadata.file.path", "");
liStatefulSetMetadataFilePath = verifiableProperties.getString("clustermap.listatefulset.metadata.file.path", "");
clusterMapReserveDiskSpacePercentage = verifiableProperties.getDouble("clustermap.reserve.disk.space.percentage", 0.0);
clusterMapResourceTagPrefix = verifiableProperties.getString("clustermap.resource.tag.prefix", "");
clusterMapDefaultHttp2Port = verifiableProperties.getInt("clustermap.default.http2.port", 0);
clusterMapDefaultPort = verifiableProperties.getInt("clustermap.default.port", 0);
clusterMapDefaultSslPort = verifiableProperties.getInt("clustermap.default.ssl.port", 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,29 @@ public List<ClusterParticipant> getClusterParticipants() throws IOException {
}
return helixParticipants;
}

/**
* Get the ClusterMapConfig. Exposed for subclasses.
* @return the {@link ClusterMapConfig} associated with this factory.
*/
protected ClusterMapConfig getClusterMapConfig() {
return clusterMapConfig;
}

/**
* Get the HelixFactory. Exposed for subclasses.
* @return the {@link HelixFactory} used by this factory.
*/
protected HelixFactory getHelixFactory() {
return helixFactory;
}

/**
* Get the MetricRegistry. Exposed for subclasses.
* @return the {@link MetricRegistry} used by this factory.
*/
protected MetricRegistry getMetricRegistry() {
return metricRegistry;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,12 @@ public void unblockStateTransition() {
this.blockStateTransitionLatch.countDown();
}

@Override
public boolean populateDataNodeConfig() {
// No-op in base implementation, returns true for backward compatibility.
return true;
}

/**
* A zookeeper based implementation for distributed lock.
*/
Expand Down Expand Up @@ -671,6 +677,22 @@ protected void markDisablePartitionComplete() {
disablePartitionsComplete = true;
}

/**
* Get the ClusterMapConfig. Exposed for subclasses.
* @return the {@link ClusterMapConfig} associated with this participant.
*/
protected ClusterMapConfig getClusterMapConfig() {
return clusterMapConfig;
}

/**
* Get the DataNodeConfigSource. Exposed for subclasses.
* @return the {@link DataNodeConfigSource} associated with this participant.
*/
protected DataNodeConfigSource getDataNodeConfigSource() {
return dataNodeConfigSource;
}

/**
* Disable/enable partition on local node. This method will update both InstanceConfig and DataNodeConfig in PropertyStore.
* @param partitionName name of partition on local node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ public void participateAndBlockStateTransition(List<AmbryStatsReport> ambryHealt
public void unblockStateTransition() {
}

@Override
public boolean populateDataNodeConfig() {
// Recovery test cluster doesn't support populating data node config.
// Return false to indicate this operation is not supported.
return false;
}

@Override
public void close() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ public void participateAndBlockStateTransition(List<AmbryStatsReport> ambryStats
public void unblockStateTransition() {
}

@Override
public boolean populateDataNodeConfig() {
// Static clustermap doesn't support populating data node config dynamically.
// Return false to indicate this operation is not supported.
return false;
}

@Override
public void close() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ public void startup() throws InstantiationException {

// wait for dataNode to be populated
if (nodeId == null) {
if (clusterParticipant != null && !clusterParticipant.populateDataNodeConfig()) {
logger.error("Failed to populate data node config to property store for instance: {}", networkConfig.hostName);
}
logger.info("Waiting on dataNode config to be populated...");
if(!dataNodeLatch.await(serverConfig.serverDatanodeConfigTimeout, TimeUnit.SECONDS)) {
throw new IllegalArgumentException("Startup timed out waiting for data node config to be populated");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,51 @@ public void testAmbryServerStartupWithoutDataNodeIdTimeoutCase() throws Exceptio
"failure during startup java.lang.IllegalArgumentException: Startup timed out waiting for data node config to be populated");
});
}

@Test
public void testPopulateDataNodeConfigWithHostnameCheck() throws Exception {
ClusterAgentsFactory spyClusterAgentsFactory = spy(new MockClusterAgentsFactory(false, false, 1, 1, 1));
DataNodeId dataNodeId = spyClusterAgentsFactory.getClusterMap().getDataNodeIds().get(0);
MockClusterMap spyClusterMap = spy(new MockClusterMap(false, false, 1, 1, 1, false, false, null));
doReturn(null).when(spyClusterMap).getDataNodeId(dataNodeId.getHostname(), dataNodeId.getPort());
doReturn(spyClusterMap).when(spyClusterAgentsFactory).getClusterMap();

// Test case 1: Matching hostnames - should call populateDataNodeConfig
Properties props1 = new Properties();
props1.setProperty("host.name", "localhost");
props1.setProperty("port", "1234");
props1.setProperty("clustermap.cluster.name", "test");
props1.setProperty("clustermap.datacenter.name", "DC1");
props1.setProperty("clustermap.host.name", "localhost"); // Same as host.name
props1.setProperty("clustermap.port", "1234");
props1.setProperty("server.datanode.config.timeout", "1"); // Short timeout for test

AmbryServer ambryServer1 = new AmbryServer(new VerifiableProperties(props1), spyClusterAgentsFactory, null,
new LoggingNotificationSystem(), SystemTime.getInstance(), null);

// Should timeout because populateDataNodeConfig is called but no listener triggers
assertException(InstantiationException.class, ambryServer1::startup, e -> {
assertTrue("Should timeout waiting for DataNode config",
e.getMessage().contains("Startup timed out waiting for data node config to be populated"));
});

// Test case 2: Non-matching hostnames - should NOT call populateDataNodeConfig
Properties props2 = new Properties();
props2.setProperty("host.name", "localhost");
props2.setProperty("port", "1234");
props2.setProperty("clustermap.cluster.name", "test");
props2.setProperty("clustermap.datacenter.name", "DC1");
props2.setProperty("clustermap.host.name", "different-host"); // Different from host.name
props2.setProperty("clustermap.port", "1234");
props2.setProperty("server.datanode.config.timeout", "1"); // Short timeout for test

AmbryServer ambryServer2 = new AmbryServer(new VerifiableProperties(props2), spyClusterAgentsFactory, null,
new LoggingNotificationSystem(), SystemTime.getInstance(), null);

// Should also timeout, but for different reason - no DataNode config population attempted
assertException(InstantiationException.class, ambryServer2::startup, e -> {
assertTrue("Should timeout waiting for DataNode config",
e.getMessage().contains("Startup timed out waiting for data node config to be populated"));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ public void participateAndBlockStateTransition(List<AmbryStatsReport> ambryStats
public void unblockStateTransition() {
}

@Override
public boolean populateDataNodeConfig() {
// Mock cluster participant doesn't support populating data node config.
// Return false to indicate this operation is not supported.
return false;
}

@Override
public boolean setReplicaSealedState(ReplicaId replicaId, ReplicaSealStatus replicaSealStatus) {
String replicaPath = replicaId.getReplicaPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ public void participateAndBlockStateTransition(List<AmbryStatsReport> ambryHealt
public void unblockStateTransition() {
}

@Override
public boolean populateDataNodeConfig() {
// Mock cluster doesn't support populating data node config.
// Return false to indicate this operation is not supported.
return false;
}

@Override
public void close() {

Expand Down
Loading