Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.helix.metaclient.api.ConnectStateChangeListener;
import org.apache.helix.metaclient.api.DataChangeListener;
Expand All @@ -40,6 +42,7 @@
import org.apache.helix.metaclient.factories.MetaClientConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -76,9 +79,12 @@ public class LeaderElectionClient implements AutoCloseable {

private final static String LEADER_ENTRY_KEY = "/LEADER";
private final static String PARTICIPANTS_ENTRY_KEY = "/PARTICIPANTS";

private final static String PARTICIPANTS_ENTRY_PARENT = "/PARTICIPANTS/";
ReElectListener _reElectListener = new ReElectListener();
ConnectStateListener _connectStateListener = new ConnectStateListener();
private final ConcurrentHashMap<String, Set<LeaderElectionListenerInterfaceAdapter>> _leaderChangeListeners =
new ConcurrentHashMap<>();

/**
* Construct a LeaderElectionClient using a user passed in leaderElectionConfig. It creates a MetaClient
Expand Down Expand Up @@ -189,9 +195,11 @@ private void createPathIfNotExists(String path) {
}

private void subscribeAndTryCreateLeaderEntry(String leaderPath) {
_metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, _reElectListener, false);
_leaderGroups.add(leaderPath + LEADER_ENTRY_KEY);
registerAllListeners();
Copy link

@LZD-PratyushBhatt LZD-PratyushBhatt Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @xyuanlu thanks for this!
I have a question here.
what happens if listener re-subscription succeeds but participant node creation fails? What i mean is, currently, the flow would re-subscribe all listeners successfully, then if it fail to create the participant node (due to network issues, parent path missing, whatever), but still proceed to attempt leader node creation and touch the leader node. could this result in a participant becoming leader without being a valid member of the participant pool, leading to an invalid leader state that other participants don't recognize as legitimate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In handleConnectStateChanged, where we recreate participant node, re-subscribe listeners and touch leader node. I think if recreate participant node failed in line 425 and if it is not nodeAlreadyExist error, it would through exception out and exit before register listeners and touch leader node.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, Thanks!

LeaderInfo leaderInfo = new LeaderInfo(LEADER_ENTRY_KEY);
leaderInfo.setLeaderName(_participant);
leaderInfo.setAcquiredTime();

try {
createPathIfNotExists(leaderPath);
Expand All @@ -208,8 +216,6 @@ private void subscribeAndTryCreateLeaderEntry(String leaderPath) {
} catch (MetaClientNodeExistsException ex) {
LOG.info("Already a leader in leader group {}.", leaderPath);
}

_leaderGroups.add(leaderPath + LEADER_ENTRY_KEY);
}

/**
Expand Down Expand Up @@ -349,6 +355,7 @@ public List<String> getParticipants(String leaderPath) {
*/
public boolean subscribeLeadershipChanges(String leaderPath, LeaderElectionListenerInterface listener) {
LeaderElectionListenerInterfaceAdapter adapter = new LeaderElectionListenerInterfaceAdapter(leaderPath, listener);
_leaderChangeListeners.computeIfAbsent(leaderPath + LEADER_ENTRY_KEY, k -> ConcurrentHashMap.newKeySet()).add(adapter);
_metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY,
adapter, false /*skipWatchingNonExistNode*/); // we need to subscribe event when path is not there
_metaClient.subscribeStateChanges(adapter);
Expand All @@ -364,6 +371,7 @@ public void unsubscribeLeadershipChanges(String leaderPath, LeaderElectionListen
_metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, adapter
);
_metaClient.unsubscribeConnectStateChanges(adapter);
_leaderChangeListeners.get(leaderPath + LEADER_ENTRY_KEY).remove(adapter);
}

@Override
Expand Down Expand Up @@ -421,6 +429,8 @@ public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState
LOG.info("Participant {} already in leader group {}.", _participant, leaderPath);
}
}

registerAllListeners();
// touch leader node to renew session ID
touchLeaderNode();
}
Expand All @@ -436,11 +446,15 @@ private void touchLeaderNode() {
for (String leaderPath : _leaderGroups) {
String key = leaderPath;
ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup = _metaClient.getDataAndStat(key);
LOG.info("touch leader node: current leader: {}, current participant: {}",
tup.left.getLeaderName(), _participant);
if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
int expectedVersion = tup.right.getVersion();
LeaderInfo newInfo = new LeaderInfo(tup.left, tup.left.getId());
newInfo.setAcquiredTime();
try {
LOG.info("Try touch leader node for path {}", _leaderGroups);
_metaClient.set(key, tup.left, expectedVersion);
_metaClient.set(key, newInfo, expectedVersion);
} catch (MetaClientNoNodeException ex) {
LOG.info("leaderPath {} gone when retouch leader node.", key);
} catch (MetaClientBadVersionException e) {
Expand All @@ -452,6 +466,26 @@ private void touchLeaderNode() {
}
}

private void registerAllListeners(){
// resubscribe the re-elect listener
for (String leaderPath : _leaderGroups) {
LOG.info("Subscribe re-elect listener for leaderPath {}.", leaderPath);
_metaClient.subscribeDataChange(leaderPath, _reElectListener, false);
}

// resubscribe to leader entry change since we are reconnected
for (Map.Entry<String, Set<LeaderElectionListenerInterfaceAdapter>> entry: _leaderChangeListeners.entrySet()) {
String leaderPath = entry.getKey();
LOG.info("Subscribe leader change listener for leaderPath {}.",leaderPath);
Set<LeaderElectionListenerInterfaceAdapter> listeners = entry.getValue();
for (LeaderElectionListenerInterfaceAdapter listener : listeners) {
_metaClient.subscribeDataChange(leaderPath,
listener, false /*skipWatchingNonExistNode*/); // we need to subscribe event when path is not there
_metaClient.subscribeStateChanges(listener);
}
}
}

public MetaClientInterface getMetaClient() {
return _metaClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,23 @@ public enum LeaderAttribute {
PARTICIPANTS
}

@JsonIgnore(true)
public String getLeaderName() {
@JsonIgnore(true)
public String getLeaderName() {
return getSimpleField("LEADER_NAME");
}

@JsonIgnore(true)
public void setLeaderName(String id) {
setSimpleField("LEADER_NAME", id);
setSimpleField("LEADER_NAME", id);
}

@JsonIgnore(true)
public void setAcquiredTime() {
setSimpleField("ACQUIRED_TIME", String.valueOf(System.currentTimeMillis()));
}

@JsonIgnore(true)
public String getAcquiredTime() {
return getSimpleField("ACQUIRED_TIME");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void testIsLeaderBeforeJoiningParticipantPool() throws Exception {
clt1.close();
}


@Test (dependsOnMethods = "testIsLeaderBeforeJoiningParticipantPool")
public void testAcquireLeadership() throws Exception {
System.out.println("START TestLeaderElection.testAcquireLeadership");
Expand Down Expand Up @@ -382,6 +383,94 @@ public void testClientClosedAndReconnectAfterExpire() throws Exception {
System.out.println("END TestLeaderElection.testClientClosedAndReconnectAfterExpire");
}

@Test (dependsOnMethods = "testClientClosedAndReconnectAfterExpire")
public void testClientLeadershipChangeListenersAfterExpire() throws Exception {
System.out.println("START TestLeaderElection.testClientLeadershipChangeListenersAfterEspire");
String leaderPath = LEADER_PATH + "/testClientLeadershipChangeListenersAfterEspire";
LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
participantInfo.setSimpleField("Key1", "value1");
LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
participantInfo2.setSimpleField("Key2", "value2");
LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);

clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);

final int[] numNewLeaderEvent = {0};
final int[] numLeaderGoneEvent = {0};
CountDownLatch countDownLatchNewLeader = new CountDownLatch(1);


LeaderElectionListenerInterface listener = new LeaderElectionListenerInterface() {

@Override
public void onLeadershipChange(String leaderPath, ChangeType type, String curLeader) {
if (type == ChangeType.LEADER_LOST) {
//countDownLatchLeaderGone.countDown();
Assert.assertEquals(curLeader.length(), 0);
numLeaderGoneEvent[0]++;
System.out.println("LEADER_LOST " + numLeaderGoneEvent[0]);
} else if (type == ChangeType.LEADER_ACQUIRED) {
countDownLatchNewLeader.countDown();
numNewLeaderEvent[0]++;
System.out.println("LEADER_ACQUIRED, cur leader: " + curLeader);
Assert.assertTrue(curLeader.length() != 0);
} else {
Assert.fail();
}
}
};
clt1.subscribeLeadershipChanges(leaderPath, listener);
// session expire and reconnect
expireSession((ZkMetaClient) clt1.getMetaClient());

// when session recreated, participant info node should maintain
Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");

// clt1 closed and reconnected
simulateZkStateClosedAndReconnect((ZkMetaClient) clt1.getMetaClient());

// verify listener get called after session expire and reconnect
clt2.exitLeaderElectionParticipantPool(leaderPath);

// now clt1 should be leader
// verify we got a new leader event after node 2 left
Assert.assertTrue(MetaClientTestUtil.verify(()-> {
return (numNewLeaderEvent[0] == 1);
}, MetaClientTestUtil.WAIT_DURATION));
countDownLatchNewLeader.await();

Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(leaderPath) != null);
}, MetaClientTestUtil.WAIT_DURATION));

// have clt2 join and clt1 leave and join again, and verify listener still works
clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);
clt1.exitLeaderElectionParticipantPool(leaderPath);

Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt2.getLeader(leaderPath) != null);
}, MetaClientTestUtil.WAIT_DURATION));
Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt2.getLeader(leaderPath).equals(PARTICIPANT_NAME2));
}, MetaClientTestUtil.WAIT_DURATION));

// now clt1 join again, and verify listener still works
clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
clt2.exitLeaderElectionParticipantPool(leaderPath);
// verify clt1 is leader
Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(leaderPath) != null);
}, MetaClientTestUtil.WAIT_DURATION));


((ZkMetaClient<?>) clt1.getMetaClient()).close();
System.out.println("END TestLeaderElection.testClientClosedAndReconnectAfterExpire");
}


private void joinPoolTestHelper(String leaderPath, LeaderElectionClient clt1, LeaderElectionClient clt2)
throws Exception {
clt1.joinLeaderElectionParticipantPool(leaderPath);
Expand Down
Loading