From 35737fff2212ad9c3a5ac8d1a2a010a839d5ba40 Mon Sep 17 00:00:00 2001 From: LZD-PratyushBhatt Date: Mon, 11 Aug 2025 17:26:23 +0530 Subject: [PATCH 1/2] Remove and cleanup IZkStateListenerI0ItecImpl --- .../zookeeper/api/client/HelixZkClient.java | 11 ++ .../api/client/RealmAwareZkClient.java | 115 +++--------------- .../impl/client/DedicatedZkClient.java | 2 +- .../impl/client/FederatedZkClient.java | 12 -- .../zookeeper/impl/client/SharedZkClient.java | 2 +- .../helix/zookeeper/zkclient/ZkClient.java | 91 +------------- .../impl/client/TestRawZkClient.java | 98 +-------------- 7 files changed, 36 insertions(+), 295 deletions(-) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java index f3810a9530..a491f0de92 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java @@ -1,4 +1,5 @@ package org.apache.helix.zookeeper.api.client; +import org.apache.helix.zookeeper.zkclient.IZkStateListener; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -32,6 +33,16 @@ @Deprecated public interface HelixZkClient extends RealmAwareZkClient { + /** + * Subscribes state changes for a {@link IZkStateListener} listener. + */ + void subscribeStateChanges(IZkStateListener listener); + + /** + * Unsubscribes state changes for a {@link IZkStateListener} listener. + */ + void unsubscribeStateChanges(IZkStateListener listener); + /** * Deprecated - please use RealmAwareZkClient and RealmAwareZkConnectionConfig instead. * diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java index e18e868772..1358f2f526 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java @@ -97,6 +97,16 @@ ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener list void unsubscribeChildChanges(String path, IZkChildListener listener); + /** + * Subscribes state changes for a {@link IZkStateListener} listener. + */ + void subscribeStateChanges(IZkStateListener listener); + + /** + * Unsubscribes state changes for a {@link IZkStateListener} listener. + */ + void unsubscribeStateChanges(IZkStateListener listener); + /** * Subscribe the path and the listener will handle data events of the path * Add the exists watch to Zookeeper server even if the path does not exists in zookeeper server @@ -120,49 +130,14 @@ boolean subscribeDataChanges(String path, IZkDataListener listener, void unsubscribeDataChanges(String path, IZkDataListener listener); - /* - * This is for backwards compatibility. - * - * TODO: remove below default implementation when getting rid of I0Itec in the new zk client. - */ - default void subscribeStateChanges(final IZkStateListener listener) { - subscribeStateChanges(new I0ItecIZkStateListenerImpl(listener)); - } + // Backward-compatibility adapter removed. Implementations must support + // subscribeStateChanges/unsubscribeStateChanges with modern IZkStateListener. - /* - * This is for backwards compatibility. - * - * TODO: remove below default implementation when getting rid of I0Itec in the new zk client. - */ - default void unsubscribeStateChanges(IZkStateListener listener) { - unsubscribeStateChanges(new I0ItecIZkStateListenerImpl(listener)); - } + - /** - * Subscribes state changes for a - * {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener} listener. - * @deprecated - * This is deprecated. It is kept for backwards compatibility. Please use - * {@link #subscribeStateChanges(IZkStateListener)}. - * @param listener {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener} - * listener - */ - @Deprecated - void subscribeStateChanges( - final org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener); + - /** - * Unsubscribes state changes for a - * {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener} listener. - * @deprecated - * This is deprecated. It is kept for backwards compatibility. Please use - * {@link #unsubscribeStateChanges(IZkStateListener)}. - * @param listener {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener} - * listener - */ - @Deprecated - void unsubscribeStateChanges( - org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener); + void unsubscribeAll(); @@ -355,64 +330,10 @@ default RealmAwareZkClientConfig getRealmAwareZkClientConfig() { } /** - * A class that wraps a default implementation of - * {@link IZkStateListener}, which means this listener - * runs the methods of {@link IZkStateListener}. - * This is for backward compatibility and to avoid breaking the original implementation of - * {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener}. + * A class that wraps a default implementation of {@link IZkStateListener}. + * Backward-compatibility with deprecated I0Itec listeners has been removed. */ - class I0ItecIZkStateListenerImpl implements org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener { - private IZkStateListener _listener; - - I0ItecIZkStateListenerImpl(IZkStateListener listener) { - _listener = listener; - } - - @Override - public void handleStateChanged(Watcher.Event.KeeperState keeperState) throws Exception { - _listener.handleStateChanged(keeperState); - } - - @Override - public void handleNewSession() throws Exception { - /* - * org.apache.helix.manager.zk.zookeeper.IZkStateListener does not have handleNewSession(), - * so null is passed into handleNewSession(sessionId). - */ - _listener.handleNewSession(null); - } - - @Override - public void handleSessionEstablishmentError(Throwable error) throws Exception { - _listener.handleSessionEstablishmentError(error); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - if (!(obj instanceof I0ItecIZkStateListenerImpl)) { - return false; - } - if (_listener == null) { - return false; - } - - I0ItecIZkStateListenerImpl defaultListener = (I0ItecIZkStateListenerImpl) obj; - - return _listener.equals(defaultListener._listener); - } - - @Override - public int hashCode() { - /* - * The original listener's hashcode helps find the wrapped listener with the same original - * listener. This is helpful in unsubscribeStateChanges(listener). - */ - return _listener.hashCode(); - } - } + /** * ZkConnection-related configs for creating an instance of RealmAwareZkClient. diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java index e214806293..26e3efb1ee 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java @@ -37,7 +37,7 @@ import org.apache.helix.zookeeper.zkclient.IZkDataListener; import org.apache.helix.zookeeper.zkclient.ZkConnection; import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks; -import org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener; +import org.apache.helix.zookeeper.zkclient.IZkStateListener; import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer; import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; import org.apache.zookeeper.CreateMode; diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java index 7943bc27ad..977f7c0e20 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java @@ -147,18 +147,6 @@ public void unsubscribeStateChanges(IZkStateListener listener) { throwUnsupportedOperationException(); } - @Override - public void subscribeStateChanges( - org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener) { - throwUnsupportedOperationException(); - } - - @Override - public void unsubscribeStateChanges( - org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener) { - throwUnsupportedOperationException(); - } - @Override public void unsubscribeAll() { _zkRealmToZkClientMap.values().forEach(ZkClient::unsubscribeAll); diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java index 210e82bd6b..ae23834526 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java @@ -33,7 +33,7 @@ import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks; -import org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener; +import org.apache.helix.zookeeper.zkclient.IZkStateListener; import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer; import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; import org.apache.zookeeper.CreateMode; diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index 33b8bc185c..a36d76b59a 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -436,20 +436,7 @@ public void subscribeStateChanges(final IZkStateListener listener) { } } - /** - * Subscribes state changes for a {@link IZkStateListener} listener. - * - * @deprecated - * This is deprecated. It is kept for backwards compatibility. Please use - * {@link #subscribeStateChanges(IZkStateListener)}. - * - * @param listener {@link IZkStateListener} listener - */ - @Deprecated - public void subscribeStateChanges( - final org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener) { - subscribeStateChanges(new IZkStateListenerI0ItecImpl(listener)); - } + public void unsubscribeStateChanges(IZkStateListener stateListener) { synchronized (_stateListener) { @@ -457,20 +444,7 @@ public void unsubscribeStateChanges(IZkStateListener stateListener) { } } - /** - * Unsubscribes state changes for a {@link IZkStateListener} listener. - * - * @deprecated - * This is deprecated. It is kept for backwards compatibility. Please use - * {@link #unsubscribeStateChanges(IZkStateListener)}. - * - * @param stateListener {@link IZkStateListener} listener - */ - @Deprecated - public void unsubscribeStateChanges( - org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener stateListener) { - unsubscribeStateChanges(new IZkStateListenerI0ItecImpl(stateListener)); - } + public void unsubscribeAll() { if (_usePersistWatcher) { @@ -3034,66 +3008,7 @@ private void recordStateChange(boolean stateChanged, boolean dataChanged, boolea } } - /** - * Creates a {@link IZkStateListener} that wraps a default - * implementation of {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener}, which means the returned - * listener runs the methods of {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener}. - * This is for backward compatibility with {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener}. - */ - private static class IZkStateListenerI0ItecImpl implements IZkStateListener { - private org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener _listener; - - IZkStateListenerI0ItecImpl( - org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener) { - _listener = listener; - } - - @Override - public void handleStateChanged(KeeperState keeperState) throws Exception { - _listener.handleStateChanged(keeperState); - } - - @Override - public void handleNewSession(final String sessionId) throws Exception { - /* - * org.I0Itec.zkclient.IZkStateListener does not have handleNewSession(sessionId), - * so just call handleNewSession() by default. - */ - _listener.handleNewSession(); - } - - @Override - public void handleSessionEstablishmentError(Throwable error) throws Exception { - _listener.handleSessionEstablishmentError(error); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - if (!(obj instanceof IZkStateListenerI0ItecImpl)) { - return false; - } - if (_listener == null) { - return false; - } - - IZkStateListenerI0ItecImpl defaultListener = (IZkStateListenerI0ItecImpl) obj; - - return _listener.equals(defaultListener._listener); - } - - @Override - public int hashCode() { - /* - * The original listener's hashcode helps find the wrapped listener with the same original - * listener. This is helpful in unsubscribeStateChanges(listener) when finding the listener - * to remove. - */ - return _listener.hashCode(); - } - } + private void validateCurrentThread() { if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) { diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java index 0ce0349aba..bff40045ab 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java @@ -280,103 +280,9 @@ public void handleSessionEstablishmentError(Throwable error) { } } - /* - * Tests state changes subscription for I0Itec's IZkStateListener. - * This is a test for backward compatibility. - * - * TODO: remove this test when getting rid of I0Itec. - */ - @Test - public void testSubscribeStateChangesForI0ItecIZkStateListener() { - int numListeners = _zkClient.numberOfListeners(); - List listeners = - new ArrayList<>(); - - // Subscribe multiple listeners to test that listener's hashcode works as expected. - // Each listener is subscribed and unsubscribed successfully. - for (int i = 0; i < 3; i++) { - org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener = - new org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener() { - @Override - public void handleStateChanged(KeeperState state) { - System.out.println("Handle new state: " + state); - } + - @Override - public void handleNewSession() { - System.out.println("Handle new session: "); - } - - @Override - public void handleSessionEstablishmentError(Throwable error) { - System.out.println("Handle session establishment error: " + error); - } - }; - - _zkClient.subscribeStateChanges(listener); - Assert.assertEquals(_zkClient.numberOfListeners(), ++numListeners); - - // Try to subscribe the listener again but number of listeners should not change because the - // listener already exists. - _zkClient.subscribeStateChanges(listener); - Assert.assertEquals(_zkClient.numberOfListeners(), numListeners); - - listeners.add(listener); - } - - for (org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener : listeners) { - _zkClient.unsubscribeStateChanges(listener); - Assert.assertEquals(_zkClient.numberOfListeners(), --numListeners); - } - } - - /* - * Tests session expiry for I0Itec's IZkStateListener. - * This is a test for backward compatibility. - * - * TODO: remove this test when getting rid of I0Itec. - */ - @Test - public void testSessionExpiryForI0IItecZkStateListener() - throws Exception { - org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener = - new org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener() { - - @Override - public void handleStateChanged(KeeperState state) { - System.out.println("In Old connection New state " + state); - } - - @Override - public void handleNewSession() { - System.out.println("In Old connection New session"); - } - - @Override - public void handleSessionEstablishmentError(Throwable var1) { - } - }; - - _zkClient.subscribeStateChanges(listener); - ZkConnection connection = ((ZkConnection) _zkClient.getConnection()); - ZooKeeper zookeeper = connection.getZookeeper(); - long oldSessionId = zookeeper.getSessionId(); - System.out.println("old sessionId= " + oldSessionId); - Watcher watcher = event -> System.out.println("In New connection In process event:" + event); - ZooKeeper newZookeeper = - new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher, - zookeeper.getSessionId(), zookeeper.getSessionPasswd()); - Thread.sleep(3000); - System.out.println("New sessionId= " + newZookeeper.getSessionId()); - Thread.sleep(3000); - newZookeeper.close(); - Thread.sleep(10000); - connection = ((ZkConnection) _zkClient.getConnection()); - zookeeper = connection.getZookeeper(); - long newSessionId = zookeeper.getSessionId(); - System.out.println("After session expiry sessionId= " + newSessionId); - _zkClient.unsubscribeStateChanges(listener); - } + @Test public void testZkClientMonitor() From 43c9ecd2959b24f30e295b23bad3172091b0c2e6 Mon Sep 17 00:00:00 2001 From: LZD-PratyushBhatt Date: Thu, 21 Aug 2025 12:09:33 +0530 Subject: [PATCH 2/2] Add tests to verify the wrong session issue --- .../manager/zk/TestStaleSessionEvents.java | 254 ++++++++++++++++++ .../zookeeper/api/client/HelixZkClient.java | 2 +- .../api/client/RealmAwareZkClient.java | 17 +- .../impl/client/FederatedZkClient.java | 4 +- .../helix/zookeeper/zkclient/ZkClient.java | 34 ++- .../zkclient/deprecated/IZkStateListener.java | 57 ---- .../impl/client/TestRawZkClient.java | 8 +- 7 files changed, 275 insertions(+), 101 deletions(-) create mode 100644 helix-core/src/test/java/org/apache/helix/manager/zk/TestStaleSessionEvents.java delete mode 100644 zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/deprecated/IZkStateListener.java diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestStaleSessionEvents.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestStaleSessionEvents.java new file mode 100644 index 0000000000..c9775c1171 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestStaleSessionEvents.java @@ -0,0 +1,254 @@ +package org.apache.helix.manager.zk; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.helix.InstanceType; +import org.apache.helix.PropertyKey; +import org.apache.helix.SystemPropertyKeys; +import org.apache.helix.TestHelper; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants; +import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer; +import org.apache.helix.zookeeper.impl.client.DedicatedZkClient; +import org.apache.helix.zookeeper.impl.client.ZkClient; +import org.apache.helix.zookeeper.routing.RoutingDataManager; +import org.apache.helix.zookeeper.zkclient.IZkStateListener; +import org.apache.helix.zookeeper.zkclient.ZkEventThread; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Tests for stale session event handling to prevent zombie participant conditions. + */ +public class TestStaleSessionEvents extends ZkTestBase { + private static final String CLUSTER_PREFIX = "CLUSTER"; + private static final String MSDS_HOSTNAME = "localhost"; + private static final int MSDS_PORT = 19922; + private static final String MSDS_NAMESPACE = "testStaleSessionEvents"; + + private static MockMetadataStoreDirectoryServer _msdsServer; + + + @Test + public void testStaleSessionEventDoesNotCauseZombieParticipant() throws Exception { + String instanceName = "localhost_12346"; + int participantPort = 12346; + long sessionTimeout = 10000L; + + String clusterName = CLUSTER_PREFIX + "_" + getShortClassName() + "_staleSessionTest"; + String originalMultiZkEnabled = System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED); + String originalMsdsEndpoint = System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY); + String originalZkSessionTimeout = System.getProperty(SystemPropertyKeys.ZK_SESSION_TIMEOUT); + + // start with clean slate + System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED); + System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY); + System.clearProperty(SystemPropertyKeys.ZK_SESSION_TIMEOUT); + + try { + setupMultiZkEnvironment(clusterName, participantPort, sessionTimeout); + + // reset before ZKHelixManager creation to ensure clean state + RoutingDataManager.getInstance().reset(true); + + ZKHelixManager manager = new ZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR); + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient)); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + try { + manager.connect(); // <- this will create the LiveInstance node + String actualSessionId = manager.getSessionId(); + + // active sessionID processing working as expected + verifySessionBehaviorCore(accessor, keyBuilder, instanceName, actualSessionId, manager); + + // stale session behavior with two different fake sessions + for (int i = 0; i < 2; i++) { + String fakeSessionId = "0x" + Long.toHexString(Double.doubleToLongBits(Math.random() + i * 1000)); + Assert.assertFalse(actualSessionId.equals(fakeSessionId), + String.format("Actual and fake session %d should be different", i + 1)); + + CountDownLatch eventProcessed = new CountDownLatch(1); + AtomicReference caughtException = new AtomicReference<>(); + + sendStaleSessionEvent(manager, fakeSessionId, eventProcessed, caughtException); + eventProcessed.await(3 * sessionTimeout, TimeUnit.MILLISECONDS); // wait for event to be processed + + verifyStaleSessionEventBehavior(accessor, keyBuilder, instanceName, actualSessionId, + caughtException, manager); + } + + } finally { + try { + manager.disconnect(); + } catch (Exception e) { + // Ignore cleanup errors + } + } + } finally { + cleanupMultiZkEnvironment(clusterName); + + System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED); + System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY); + System.clearProperty(SystemPropertyKeys.ZK_SESSION_TIMEOUT); + + // Set original values if they were not null + if (originalMultiZkEnabled != null) { + System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, originalMultiZkEnabled); + } + if (originalMsdsEndpoint != null) { + System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, originalMsdsEndpoint); + } + if (originalZkSessionTimeout != null) { + System.setProperty(SystemPropertyKeys.ZK_SESSION_TIMEOUT, originalZkSessionTimeout); + } + } + } + + private void setupMultiZkEnvironment(String clusterName, int participantPort, + long sessionTimeoutMs) throws Exception { + Map> routingData = new HashMap<>(); + routingData.put(ZK_ADDR, Collections.singletonList("/" + clusterName)); + + _msdsServer = new MockMetadataStoreDirectoryServer(MSDS_HOSTNAME, MSDS_PORT, MSDS_NAMESPACE, routingData); + _msdsServer.startServer(); + + String msdsEndpoint = "http://" + MSDS_HOSTNAME + ":" + MSDS_PORT + "/admin/v2/namespaces/" + MSDS_NAMESPACE; + System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, msdsEndpoint); + + TestHelper.setupCluster(clusterName, ZK_ADDR, participantPort, + "localhost", "TestDB", 1, 2, 2, 1, + "MasterSlave", true); + + System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true"); + System.setProperty(SystemPropertyKeys.ZK_SESSION_TIMEOUT, Long.toString(sessionTimeoutMs)); + } + + private void sendStaleSessionEvent(ZKHelixManager manager, String fakeSessionId, + CountDownLatch eventProcessed, AtomicReference caughtException) throws Exception { + + DedicatedZkClient dedicatedZkClient = (DedicatedZkClient) manager._zkclient; + Field zkClientField = DedicatedZkClient.class.getDeclaredField("_rawZkClient"); + zkClientField.setAccessible(true); + ZkClient zkClient = (ZkClient) zkClientField.get(dedicatedZkClient); + + Class zkClientSuperClass = zkClient.getClass().getSuperclass(); + + Field stateListenerField = zkClientSuperClass.getDeclaredField("_stateListener"); + stateListenerField.setAccessible(true); + @SuppressWarnings("unchecked") + Set stateListeners = (Set) stateListenerField.get(zkClient); + + Field eventThreadField = zkClientSuperClass.getDeclaredField("_eventThread"); + eventThreadField.setAccessible(true); + ZkEventThread eventThread = (ZkEventThread) eventThreadField.get(zkClient); + + ZkEventThread.ZkEvent event = new ZkEventThread.ZkEvent("Stale session event") { + @Override + public void run() throws Exception { + try { + for (final IZkStateListener listener : stateListeners) { + listener.handleNewSession(fakeSessionId); + } + } catch (Exception e) { + caughtException.set(e); + } finally { + eventProcessed.countDown(); + } + } + }; + eventThread.send(event); + } + + private void verifyPreConditions(ZKHelixDataAccessor accessor, PropertyKey.Builder keyBuilder, + String instanceName, String expectedSessionId, ZKHelixManager manager) throws Exception { + // Verify LiveInstance exists and has correct session ID + LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName)); + Assert.assertNotNull(liveInstance, "LiveInstance should exist before testing session behavior"); + Assert.assertEquals(liveInstance.getEphemeralOwner(), expectedSessionId, + "LiveInstance should have current active session ID"); + + // Verify callBackHandlers exist and are healthy + Field handlersField = ZKHelixManager.class.getDeclaredField("_handlers"); + handlersField.setAccessible(true); + @SuppressWarnings("unchecked") + List handlers = (List) handlersField.get(manager); + + Assert.assertNotNull(handlers, "CallbackHandlers should exist"); + Assert.assertFalse(handlers.isEmpty(), "CallbackHandlers should not be empty"); + + long readyCount = handlers.stream().filter(CallbackHandler::isReady).count(); + Assert.assertTrue(readyCount > 0, "At least one callback handler should be ready"); + } + + private void verifySessionBehaviorCore(ZKHelixDataAccessor accessor, PropertyKey.Builder keyBuilder, + String instanceName, String expectedSessionId, ZKHelixManager manager) throws Exception { + LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName)); + Assert.assertNotNull(liveInstance, "LiveInstance should exist after session event"); + + Assert.assertEquals(liveInstance.getEphemeralOwner(), expectedSessionId, + "LiveInstance ephemeral owner should match expected session ID"); + + // verify handlers remain healthy + Field handlersField = ZKHelixManager.class.getDeclaredField("_handlers"); + handlersField.setAccessible(true); + @SuppressWarnings("unchecked") + List handlers = (List) handlersField.get(manager); + + if (handlers != null && !handlers.isEmpty()) { + long readyCount = handlers.stream().filter(CallbackHandler::isReady).count(); + long resetCount = handlers.size() - readyCount; + Assert.assertTrue(readyCount > 0 && resetCount == 0, + String.format("Expected all handlers to be ready. Ready: %d, Reset: %d", readyCount, resetCount)); + } + } + + private void verifyStaleSessionEventBehavior(ZKHelixDataAccessor accessor, PropertyKey.Builder keyBuilder, + String instanceName, String actualSessionId, + AtomicReference caughtException, ZKHelixManager manager) throws Exception { + + Assert.assertNull(caughtException.get(), "Stale session event should not throw exceptions"); + verifySessionBehaviorCore(accessor, keyBuilder, instanceName, actualSessionId, manager); + } + + private void cleanupMultiZkEnvironment(String clusterName) { + TestHelper.dropCluster(clusterName, _gZkClient); + + if (_msdsServer != null) { + _msdsServer.stopServer(); + _msdsServer = null; + } + + // Reset RoutingDataManager to ensure clean state for next test + RoutingDataManager.getInstance().reset(true); + } +} \ No newline at end of file diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java index a491f0de92..c2ff5a44ee 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java @@ -165,4 +165,4 @@ public ZkClientConfig setConnectInitTimeout(long connectInitTimeout) { return this; } } -} +} \ No newline at end of file diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java index 1358f2f526..e031e54760 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java @@ -130,15 +130,6 @@ boolean subscribeDataChanges(String path, IZkDataListener listener, void unsubscribeDataChanges(String path, IZkDataListener listener); - // Backward-compatibility adapter removed. Implementations must support - // subscribeStateChanges/unsubscribeStateChanges with modern IZkStateListener. - - - - - - - void unsubscribeAll(); // data access @@ -329,12 +320,6 @@ default RealmAwareZkClientConfig getRealmAwareZkClientConfig() { throw new UnsupportedOperationException("getRealmAwareZkClientConfig() is not supported!"); } - /** - * A class that wraps a default implementation of {@link IZkStateListener}. - * Backward-compatibility with deprecated I0Itec listeners has been removed. - */ - - /** * ZkConnection-related configs for creating an instance of RealmAwareZkClient. */ @@ -621,4 +606,4 @@ static MetadataStoreRoutingData getMetadataStoreRoutingData( routingDataSourceEndpoint); } } -} +} \ No newline at end of file diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java index 977f7c0e20..5a1b95d658 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java @@ -495,8 +495,8 @@ public List multi(Iterable ops) { throw new NullPointerException("ops must not be null."); } boolean anyDifferent = StreamSupport.stream(ops.spliterator(), false) - .map(op -> getZkRealm(op.getPath())) - .anyMatch(s -> !s.equals(getZkRealm(ops.iterator().next().getPath()))); + .map(op -> getZkRealm(op.getPath())) + .anyMatch(s -> !s.equals(getZkRealm(ops.iterator().next().getPath()))); if (anyDifferent) { throw new IllegalArgumentException("Cannot execute multi on ops of different realms!"); diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index a36d76b59a..5d0d4d17e9 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -436,16 +436,12 @@ public void subscribeStateChanges(final IZkStateListener listener) { } } - - public void unsubscribeStateChanges(IZkStateListener stateListener) { synchronized (_stateListener) { _stateListener.remove(stateListener); } } - - public void unsubscribeAll() { if (_usePersistWatcher) { ManipulateListener removeAllListeners = () -> { @@ -1513,15 +1509,15 @@ private Stat installWatchOnlyPathExist(final String path) { long startT = System.currentTimeMillis(); final Stat stat; try { - stat = new Stat(); - try { - LOG.debug("installWatchOnlyPathExist with path: {} ", path); - retryUntilConnected(() -> ((ZkConnection) getConnection()).getZookeeper().getData(path, true, stat)); - } catch (ZkNoNodeException e) { - LOG.debug("installWatchOnlyPathExist path not existing: {}", path); - record(path, null, startT, ZkClientMonitor.AccessType.READ); - return null; - } + stat = new Stat(); + try { + LOG.debug("installWatchOnlyPathExist with path: {} ", path); + retryUntilConnected(() -> ((ZkConnection) getConnection()).getZookeeper().getData(path, true, stat)); + } catch (ZkNoNodeException e) { + LOG.debug("installWatchOnlyPathExist path not existing: {}", path); + record(path, null, startT, ZkClientMonitor.AccessType.READ); + return null; + } record(path, null, startT, ZkClientMonitor.AccessType.READ); return stat; } catch (Exception e) { @@ -1600,7 +1596,7 @@ private void reconnectOnExpiring() { } catch (Exception e) { reconnectException = e; long waitInterval = ExponentialBackoffStrategy.getWaitInterval(currTime, - MAX_RECONNECT_INTERVAL_MS, true, retryCount++); + MAX_RECONNECT_INTERVAL_MS, true, retryCount++); LOG.warn("ZkClient {}, reconnect on expiring failed. Will retry after {} ms", _uid, waitInterval, e); try { @@ -1872,7 +1868,7 @@ private List getOpsForRecursiveDelete(String root) { getChildren(node, false).stream().forEach(child -> nodes.offer(node + "/" + child)); ops.add(Op.delete(node, -1)); } - // Reverse the list so that operations are ordered from children to parent nodes + // Reverse the list so that operations are ordered from children to parent nodes Collections.reverse(ops); return ops; } @@ -2139,13 +2135,13 @@ public T retryUntilConnected(final Callable callable) // we give the event thread some time to update the status to 'Disconnected' Thread.yield(); waitForRetry(ExponentialBackoffStrategy.getWaitInterval(currTime, - _operationRetryTimeoutInMillis, true, retryCount++)); + _operationRetryTimeoutInMillis, true, retryCount++)); } catch (SessionExpiredException e) { retryCauseCode = e.code(); // we give the event thread some time to update the status to 'Expired' Thread.yield(); waitForRetry(ExponentialBackoffStrategy.getWaitInterval(currTime, - _operationRetryTimeoutInMillis, true, retryCount++)); + _operationRetryTimeoutInMillis, true, retryCount++)); } catch (ZkSessionMismatchedException e) { throw e; } catch (KeeperException e) { @@ -3008,7 +3004,7 @@ private void recordStateChange(boolean stateChanged, boolean dataChanged, boolea } } - + private void validateCurrentThread() { if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) { @@ -3163,4 +3159,4 @@ private void validateNativeZkWatcherType(boolean watch) { "Can not subscribe one time watcher when ZkClient is using PersistWatcher"); } } -} +} \ No newline at end of file diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/deprecated/IZkStateListener.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/deprecated/IZkStateListener.java deleted file mode 100644 index 67b35040af..0000000000 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/deprecated/IZkStateListener.java +++ /dev/null @@ -1,57 +0,0 @@ -package org.apache.helix.zookeeper.zkclient.deprecated; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.zookeeper.Watcher.Event.KeeperState; - -@Deprecated -public interface IZkStateListener { - - /** - * Called when the zookeeper connection state has changed. - * - * @param state - * The new state. - * @throws Exception - * On any error. - */ - public void handleStateChanged(KeeperState state) throws Exception; - - /** - * Called after the zookeeper session has expired and a new session has been created. You would have to re-create - * any ephemeral nodes here. - * - * @throws Exception - * On any error. - */ - public void handleNewSession() throws Exception; - - /** - * Called when a session cannot be re-established. This should be used to implement connection - * failure handling e.g. retry to connect or pass the error up - * - * @param error - * The error that prevents a session from being established - * @throws Exception - * On any error. - */ - public void handleSessionEstablishmentError(final Throwable error) throws Exception; - -} diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java index bff40045ab..72a4710dd4 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java @@ -280,10 +280,6 @@ public void handleSessionEstablishmentError(Throwable error) { } } - - - - @Test public void testZkClientMonitor() throws Exception { @@ -634,7 +630,7 @@ public void testAutoSyncWithNewSessionEstablishment() throws Exception { Stat stat = new Stat(); String nodeData = null; try { - nodeData = _zkClient.readData(path, stat, true); + nodeData = _zkClient.readData(path, stat, true); } catch (ZkException e) { Assert.fail("fail to read data"); } @@ -1193,4 +1189,4 @@ void testDeleteRecursivelyAtomic() { Assert.assertFalse(_zkClient.exists(grandParent)); Assert.assertFalse(_zkClient.exists(newNode)); } -} +} \ No newline at end of file