From bd3980542eb2cd02ab820eff200cc72914fdc2f6 Mon Sep 17 00:00:00 2001 From: xyuanlu Date: Thu, 7 Aug 2025 11:14:47 -0700 Subject: [PATCH 1/2] fix a race condition --- .../leaderelection/LeaderElectionClient.java | 32 ++++++++- .../recipes/leaderelection/LeaderInfo.java | 14 +++- .../leaderelection/TestLeaderElection.java | 69 +++++++++++++++++++ 3 files changed, 111 insertions(+), 4 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java index fc25289a86..c1d577a6ae 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.helix.metaclient.api.ConnectStateChangeListener; import org.apache.helix.metaclient.api.DataChangeListener; @@ -40,6 +42,7 @@ import org.apache.helix.metaclient.factories.MetaClientConfig; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory; +import org.apache.helix.zookeeper.zkclient.ZkClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,9 +79,12 @@ public class LeaderElectionClient implements AutoCloseable { private final static String LEADER_ENTRY_KEY = "/LEADER"; private final static String PARTICIPANTS_ENTRY_KEY = "/PARTICIPANTS"; + private final static String PARTICIPANTS_ENTRY_PARENT = "/PARTICIPANTS/"; ReElectListener _reElectListener = new ReElectListener(); ConnectStateListener _connectStateListener = new ConnectStateListener(); + private final ConcurrentHashMap> _leaderChangeListeners = + new ConcurrentHashMap<>(); /** * Construct a LeaderElectionClient using a user passed in leaderElectionConfig. It creates a MetaClient @@ -192,6 +198,7 @@ private void subscribeAndTryCreateLeaderEntry(String leaderPath) { _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, _reElectListener, false); LeaderInfo leaderInfo = new LeaderInfo(LEADER_ENTRY_KEY); leaderInfo.setLeaderName(_participant); + leaderInfo.setAcquiredTime(); try { createPathIfNotExists(leaderPath); @@ -349,6 +356,7 @@ public List getParticipants(String leaderPath) { */ public boolean subscribeLeadershipChanges(String leaderPath, LeaderElectionListenerInterface listener) { LeaderElectionListenerInterfaceAdapter adapter = new LeaderElectionListenerInterfaceAdapter(leaderPath, listener); + _leaderChangeListeners.computeIfAbsent(leaderPath, k -> ConcurrentHashMap.newKeySet()).add(adapter); _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, adapter, false /*skipWatchingNonExistNode*/); // we need to subscribe event when path is not there _metaClient.subscribeStateChanges(adapter); @@ -364,6 +372,7 @@ public void unsubscribeLeadershipChanges(String leaderPath, LeaderElectionListen _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, adapter ); _metaClient.unsubscribeConnectStateChanges(adapter); + _leaderChangeListeners.get(leaderPath).remove(adapter); } @Override @@ -421,6 +430,23 @@ public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState LOG.info("Participant {} already in leader group {}.", _participant, leaderPath); } } + // resubscribe the re-elect listener + for (String leaderPath : _leaderGroups) { + LOG.info("Resubscribe re-elect listener for leaderPath {}.", leaderPath); + _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, _reElectListener, false); + } + + // resubscribe to leader entry change since we are reconnected + for (Map.Entry> entry: _leaderChangeListeners.entrySet()) { + LOG.info("Resubscribe leader change listener for leaderPath {}.", entry.getKey()); + String leaderPath = entry.getKey(); + Set listeners = entry.getValue(); + for (LeaderElectionListenerInterfaceAdapter listener : listeners) { + _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, + listener, false /*skipWatchingNonExistNode*/); // we need to subscribe event when path is not there + _metaClient.subscribeStateChanges(listener); + } + } // touch leader node to renew session ID touchLeaderNode(); } @@ -436,11 +462,15 @@ private void touchLeaderNode() { for (String leaderPath : _leaderGroups) { String key = leaderPath; ImmutablePair tup = _metaClient.getDataAndStat(key); + LOG.info("touch leader node: current leader: {}, current participant: {}", + tup.left.getLeaderName(), _participant); if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) { int expectedVersion = tup.right.getVersion(); + LeaderInfo newInfo = new LeaderInfo(tup.left, tup.left.getId()); + newInfo.setAcquiredTime(); try { LOG.info("Try touch leader node for path {}", _leaderGroups); - _metaClient.set(key, tup.left, expectedVersion); + _metaClient.set(key, newInfo, expectedVersion); } catch (MetaClientNoNodeException ex) { LOG.info("leaderPath {} gone when retouch leader node.", key); } catch (MetaClientBadVersionException e) { diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java index ab0562502d..2e1a4b2a36 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java @@ -53,15 +53,23 @@ public enum LeaderAttribute { PARTICIPANTS } -@JsonIgnore(true) -public String getLeaderName() { + @JsonIgnore(true) + public String getLeaderName() { return getSimpleField("LEADER_NAME"); } @JsonIgnore(true) public void setLeaderName(String id) { - setSimpleField("LEADER_NAME", id); + setSimpleField("LEADER_NAME", id); } + @JsonIgnore(true) + public void setAcquiredTime() { + setSimpleField("ACQUIRED_TIME", String.valueOf(System.currentTimeMillis())); + } + @JsonIgnore(true) + public String getAcquiredTime() { + return getSimpleField("ACQUIRED_TIME"); + } } diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java index c7aa06eff1..38e117bb8a 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java @@ -55,6 +55,7 @@ public void testIsLeaderBeforeJoiningParticipantPool() throws Exception { clt1.close(); } + @Test (dependsOnMethods = "testIsLeaderBeforeJoiningParticipantPool") public void testAcquireLeadership() throws Exception { System.out.println("START TestLeaderElection.testAcquireLeadership"); @@ -382,6 +383,74 @@ public void testClientClosedAndReconnectAfterExpire() throws Exception { System.out.println("END TestLeaderElection.testClientClosedAndReconnectAfterExpire"); } + @Test (dependsOnMethods = "testClientClosedAndReconnectAfterExpire") + public void testClientLeadershipChangeListenersAfterExpire() throws Exception { + System.out.println("START TestLeaderElection.testClientLeadershipChangeListenersAfterEspire"); + String leaderPath = LEADER_PATH + "/testClientLeadershipChangeListenersAfterEspire"; + LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1); + participantInfo.setSimpleField("Key1", "value1"); + LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2); + participantInfo2.setSimpleField("Key2", "value2"); + LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1); + LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2); + + clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); + clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2); + + final int[] numNewLeaderEvent = {0}; + final int[] numLeaderGoneEvent = {0}; + CountDownLatch countDownLatchNewLeader = new CountDownLatch(1); + + + LeaderElectionListenerInterface listener = new LeaderElectionListenerInterface() { + + @Override + public void onLeadershipChange(String leaderPath, ChangeType type, String curLeader) { + if (type == ChangeType.LEADER_LOST) { + //countDownLatchLeaderGone.countDown(); + Assert.assertEquals(curLeader.length(), 0); + numLeaderGoneEvent[0]++; + System.out.println("LEADER_LOST " + numLeaderGoneEvent[0]); + } else if (type == ChangeType.LEADER_ACQUIRED) { + countDownLatchNewLeader.countDown(); + numNewLeaderEvent[0]++; + System.out.println("LEADER_ACQUIRED, cur leader: " + curLeader); + Assert.assertTrue(curLeader.length() != 0); + } else { + Assert.fail(); + } + } + }; + clt1.subscribeLeadershipChanges(leaderPath, listener); + // session expire and reconnect + expireSession((ZkMetaClient) clt1.getMetaClient()); + + // when session recreated, participant info node should maintain + Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME1).getSimpleField("Key1"), "value1"); + Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2"); + + // clt1 closed and reconnected + simulateZkStateClosedAndReconnect((ZkMetaClient) clt1.getMetaClient()); + + // verify listener get called after session expire and reconnect + clt2.exitLeaderElectionParticipantPool(leaderPath); + + // now clt1 should be leader + // verify we got a new leader event after node 2 left + Assert.assertTrue(MetaClientTestUtil.verify(()-> { + return (numNewLeaderEvent[0] == 1); + }, MetaClientTestUtil.WAIT_DURATION)); + countDownLatchNewLeader.await(); + + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt1.getLeader(leaderPath) != null); + }, MetaClientTestUtil.WAIT_DURATION)); + + ((ZkMetaClient) clt1.getMetaClient()).close(); + System.out.println("END TestLeaderElection.testClientClosedAndReconnectAfterExpire"); + } + + private void joinPoolTestHelper(String leaderPath, LeaderElectionClient clt1, LeaderElectionClient clt2) throws Exception { clt1.joinLeaderElectionParticipantPool(leaderPath); From eb4a28f1f8b794347e5831a62ce1eb053921ba63 Mon Sep 17 00:00:00 2001 From: xyuanlu Date: Thu, 7 Aug 2025 21:06:24 -0700 Subject: [PATCH 2/2] adress comments --- .../leaderelection/LeaderElectionClient.java | 46 ++++++++++--------- .../leaderelection/TestLeaderElection.java | 20 ++++++++ 2 files changed, 45 insertions(+), 21 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java index c1d577a6ae..9f8513e1e0 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java @@ -195,7 +195,8 @@ private void createPathIfNotExists(String path) { } private void subscribeAndTryCreateLeaderEntry(String leaderPath) { - _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, _reElectListener, false); + _leaderGroups.add(leaderPath + LEADER_ENTRY_KEY); + registerAllListeners(); LeaderInfo leaderInfo = new LeaderInfo(LEADER_ENTRY_KEY); leaderInfo.setLeaderName(_participant); leaderInfo.setAcquiredTime(); @@ -215,8 +216,6 @@ private void subscribeAndTryCreateLeaderEntry(String leaderPath) { } catch (MetaClientNodeExistsException ex) { LOG.info("Already a leader in leader group {}.", leaderPath); } - - _leaderGroups.add(leaderPath + LEADER_ENTRY_KEY); } /** @@ -356,7 +355,7 @@ public List getParticipants(String leaderPath) { */ public boolean subscribeLeadershipChanges(String leaderPath, LeaderElectionListenerInterface listener) { LeaderElectionListenerInterfaceAdapter adapter = new LeaderElectionListenerInterfaceAdapter(leaderPath, listener); - _leaderChangeListeners.computeIfAbsent(leaderPath, k -> ConcurrentHashMap.newKeySet()).add(adapter); + _leaderChangeListeners.computeIfAbsent(leaderPath + LEADER_ENTRY_KEY, k -> ConcurrentHashMap.newKeySet()).add(adapter); _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, adapter, false /*skipWatchingNonExistNode*/); // we need to subscribe event when path is not there _metaClient.subscribeStateChanges(adapter); @@ -372,7 +371,7 @@ public void unsubscribeLeadershipChanges(String leaderPath, LeaderElectionListen _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, adapter ); _metaClient.unsubscribeConnectStateChanges(adapter); - _leaderChangeListeners.get(leaderPath).remove(adapter); + _leaderChangeListeners.get(leaderPath + LEADER_ENTRY_KEY).remove(adapter); } @Override @@ -430,23 +429,8 @@ public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState LOG.info("Participant {} already in leader group {}.", _participant, leaderPath); } } - // resubscribe the re-elect listener - for (String leaderPath : _leaderGroups) { - LOG.info("Resubscribe re-elect listener for leaderPath {}.", leaderPath); - _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, _reElectListener, false); - } - // resubscribe to leader entry change since we are reconnected - for (Map.Entry> entry: _leaderChangeListeners.entrySet()) { - LOG.info("Resubscribe leader change listener for leaderPath {}.", entry.getKey()); - String leaderPath = entry.getKey(); - Set listeners = entry.getValue(); - for (LeaderElectionListenerInterfaceAdapter listener : listeners) { - _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, - listener, false /*skipWatchingNonExistNode*/); // we need to subscribe event when path is not there - _metaClient.subscribeStateChanges(listener); - } - } + registerAllListeners(); // touch leader node to renew session ID touchLeaderNode(); } @@ -482,6 +466,26 @@ private void touchLeaderNode() { } } + private void registerAllListeners(){ + // resubscribe the re-elect listener + for (String leaderPath : _leaderGroups) { + LOG.info("Subscribe re-elect listener for leaderPath {}.", leaderPath); + _metaClient.subscribeDataChange(leaderPath, _reElectListener, false); + } + + // resubscribe to leader entry change since we are reconnected + for (Map.Entry> entry: _leaderChangeListeners.entrySet()) { + String leaderPath = entry.getKey(); + LOG.info("Subscribe leader change listener for leaderPath {}.",leaderPath); + Set listeners = entry.getValue(); + for (LeaderElectionListenerInterfaceAdapter listener : listeners) { + _metaClient.subscribeDataChange(leaderPath, + listener, false /*skipWatchingNonExistNode*/); // we need to subscribe event when path is not there + _metaClient.subscribeStateChanges(listener); + } + } + } + public MetaClientInterface getMetaClient() { return _metaClient; } diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java index 38e117bb8a..8bb4841e72 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java @@ -446,6 +446,26 @@ public void onLeadershipChange(String leaderPath, ChangeType type, String curLea return (clt1.getLeader(leaderPath) != null); }, MetaClientTestUtil.WAIT_DURATION)); + // have clt2 join and clt1 leave and join again, and verify listener still works + clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2); + clt1.exitLeaderElectionParticipantPool(leaderPath); + + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt2.getLeader(leaderPath) != null); + }, MetaClientTestUtil.WAIT_DURATION)); + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt2.getLeader(leaderPath).equals(PARTICIPANT_NAME2)); + }, MetaClientTestUtil.WAIT_DURATION)); + + // now clt1 join again, and verify listener still works + clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); + clt2.exitLeaderElectionParticipantPool(leaderPath); + // verify clt1 is leader + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt1.getLeader(leaderPath) != null); + }, MetaClientTestUtil.WAIT_DURATION)); + + ((ZkMetaClient) clt1.getMetaClient()).close(); System.out.println("END TestLeaderElection.testClientClosedAndReconnectAfterExpire"); }