Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
@StateModelInfo(initialState = "OFFLINE", states = {"LEADER", "STANDBY"})
public class DistClusterControllerStateModel extends AbstractHelixLeaderStandbyStateModel {
private static Logger logger = LoggerFactory.getLogger(DistClusterControllerStateModel.class);
protected Optional<HelixManager> _controllerOpt = Optional.empty();
protected volatile Optional<HelixManager> _controllerOpt = Optional.empty();
private final Set<Pipeline.Type> _enabledPipelineTypes;

// dedicated lock object to avoid cross-instance contention from Optional.empty() singleton
private final Object _controllerLock = new Object();

public DistClusterControllerStateModel(String zkAddr) {
this(zkAddr, Sets.newHashSet(Pipeline.Type.DEFAULT, Pipeline.Type.TASK));
}
Expand All @@ -62,7 +65,7 @@ public void onBecomeLeaderFromStandby(Message message, NotificationContext conte

logger.info(controllerName + " becoming leader from standby for " + clusterName);

synchronized (_controllerOpt) {
synchronized (_controllerLock) {
if (!_controllerOpt.isPresent()) {
HelixManager newController = HelixManagerFactory
.getZKHelixManager(clusterName, controllerName, InstanceType.CONTROLLER, _zkAddr);
Expand Down Expand Up @@ -112,7 +115,7 @@ public String getStateModeInstanceDescription(String partitionName, String insta

@Override
public void reset() {
synchronized (_controllerOpt) {
synchronized (_controllerLock) {
if (_controllerOpt.isPresent()) {
logger.info("Disconnecting controller: " + _controllerOpt.get().getInstanceName() + " for "
+ _controllerOpt.get().getClusterName());
Expand All @@ -121,4 +124,4 @@ public void reset() {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,19 @@
import org.apache.helix.model.Message.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.lang.reflect.Field;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class TestDistControllerStateModel extends ZkUnitTestBase {
private static Logger LOG = LoggerFactory.getLogger(TestDistControllerStateModel.class);

Expand Down Expand Up @@ -124,4 +133,136 @@ public void testReset() {
stateModel.reset();
}

}
/**
* Test to verify that different DistClusterControllerStateModel instances
* use separate lock objects, ensuring no cross-instance blocking.
*/
@Test()
public void testNoSharedLockAcrossInstances() throws Exception {
LOG.info("Testing that lock objects are not shared across DistClusterControllerStateModel instances");

// Verify different instances have different lock objects
DistClusterControllerStateModel instance1 = new DistClusterControllerStateModel(ZK_ADDR);
DistClusterControllerStateModel instance2 = new DistClusterControllerStateModel(ZK_ADDR);

Field lockField = DistClusterControllerStateModel.class.getDeclaredField("_controllerLock");
lockField.setAccessible(true);

Object lock1 = lockField.get(instance1);
Object lock2 = lockField.get(instance2);

Assert.assertNotNull(lock1, "First instance should have a lock object");
Assert.assertNotNull(lock2, "Second instance should have a lock object");
Assert.assertNotSame(lock1, lock2, "Different instances must have different lock objects");

// Verify concurrent access doesn't block across instances
final int NUM_INSTANCES = 10;
ExecutorService executor = Executors.newFixedThreadPool(NUM_INSTANCES);
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch completionLatch = new CountDownLatch(NUM_INSTANCES);
AtomicInteger completedInstances = new AtomicInteger(0);

for (int i = 0; i < NUM_INSTANCES; i++) {
final int instanceId = i;
final DistClusterControllerStateModel instance = new DistClusterControllerStateModel(ZK_ADDR);

executor.submit(() -> {
try {
startLatch.await(); // wait for all threads to be ready

// Simulate state transition operations that would use the lock
synchronized (lockField.get(instance)) {
// hold the lock here briefly to simulate real state transition work
Thread.sleep(100);
completedInstances.incrementAndGet();
}

} catch (Exception e) {
LOG.error("Instance {} failed during concurrent test", instanceId, e);
} finally {
completionLatch.countDown();
}
});
}

// start all threads simultaneously
startLatch.countDown();

// All instances should complete within reasonable time since they don't block each other
boolean allCompleted = completionLatch.await(500, TimeUnit.MILLISECONDS);

executor.shutdown();
executor.awaitTermination(2, TimeUnit.SECONDS);

Assert.assertTrue(allCompleted, "All instances should complete without blocking each other");
Assert.assertEquals(completedInstances.get(), NUM_INSTANCES,
"All instances should successfully complete their synchronized work");
}

/**
* Explicit test to verify that while one instance holds its lock indefinitely,
* another instance with a different lock can complete immediately.
*/
@Test()
public void testExplicitLockIndependence() throws Exception {
LOG.info("Testing explicit lock independence - one blocked, other should complete");

DistClusterControllerStateModel instance1 = new DistClusterControllerStateModel(ZK_ADDR);
DistClusterControllerStateModel instance2 = new DistClusterControllerStateModel(ZK_ADDR);

Field lockField = DistClusterControllerStateModel.class.getDeclaredField("_controllerLock");
lockField.setAccessible(true);

Object lock1 = lockField.get(instance1);
Object lock2 = lockField.get(instance2);

Assert.assertNotSame(lock1, lock2, "Different instances must have different lock objects");

CountDownLatch instance1Started = new CountDownLatch(1);
CountDownLatch instance2Completed = new CountDownLatch(1);
AtomicBoolean instance1Interrupted = new AtomicBoolean(false);

// Thread 1: Hold lock1 for 5 seconds
Thread thread1 = new Thread(() -> {
try {
synchronized (lock1) {
instance1Started.countDown();
Thread.sleep(5000); // Hold much longer than test timeout
}
} catch (InterruptedException e) {
instance1Interrupted.set(true);
Thread.currentThread().interrupt();
}
}, "BlockingThread");

// Thread 2: Should complete immediately since it uses lock2
Thread thread2 = new Thread(() -> {
try {
instance1Started.await(1000, TimeUnit.MILLISECONDS); // Wait for thread1 to acquire lock1
synchronized (lock2) {
// Should acquire immediately since lock2 != lock1
Thread.sleep(50);
instance2Completed.countDown();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "NonBlockingThread");

thread1.start();
thread2.start();

// Instance2 should complete immediately even though instance1 is blocked
boolean instance2CompletedQuickly = instance2Completed.await(200, TimeUnit.MILLISECONDS);

// Clean up
thread1.interrupt();
thread1.join(1000);
thread2.join(1000);

Assert.assertTrue(instance2CompletedQuickly,
"Instance2 should complete immediately, proving locks are not shared");
Assert.assertTrue(instance1Interrupted.get(),
"Instance1 should have been interrupted while holding its lock");
}
}
Loading