Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.helix.TestHelper;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
import org.apache.helix.zookeeper.exception.ZkClientException;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.testng.Assert;
Expand Down Expand Up @@ -197,4 +199,22 @@ public void handleDataDeleted(String s) {

deleteCluster("testSharingZkClient");
}

@Test
public void testZKClientConfig() {
System.setProperty(ZkSystemPropertyKeys.ZK_OPERATION_RETRY_TIMEOUT_MS, "5000");

HelixZkClient.ZkConnectionConfig connectionConfig =
new HelixZkClient.ZkConnectionConfig(ZK_ADDR);
HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();

// A factory just for this tests, this for avoiding the impact from other tests running in
// parallel.
final SharedZkClientFactory testFactory = new SharedZkClientFactory();
ZkClient zkClient =
(ZkClient) testFactory.buildZkClient(connectionConfig, clientConfig);
Assert.assertEquals(zkClient.getOperationRetryTimeout(), 5000);

zkClient.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.zookeeper.constant.RoutingDataReaderType;
import org.apache.helix.zookeeper.routing.RoutingDataManager;
import org.apache.helix.zookeeper.util.ZNRecordUtil;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
Expand Down Expand Up @@ -63,9 +64,6 @@ enum RealmMode {
SINGLE_REALM, MULTI_REALM
}

// Setting default operation retry timeout to 24 hours. It can also be overwritten via RealmAwareZkClientConfig.
// This timeout will be used while retrying zookeeper operations.
int DEFAULT_OPERATION_TIMEOUT = 24 * 60 * 60 * 1000;
int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000;
int DEFAULT_SESSION_TIMEOUT = 30 * 1000;

Expand Down Expand Up @@ -558,7 +556,7 @@ class RealmAwareZkClientConfig {
protected long _connectInitTimeout = DEFAULT_CONNECTION_TIMEOUT;

// Data access configs
protected long _operationRetryTimeout = DEFAULT_OPERATION_TIMEOUT;
protected long _operationRetryTimeout = ZNRecordUtil.getDefaultOperationRetryTimeout();

// Serializer
protected PathBasedZkSerializer _zkSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,15 @@ public class ZkSystemPropertyKeys {
// TODO: deprecate this config after paginated API is deployed and stable
public static final String ZK_GETCHILDREN_PAGINATION_DISABLED =
"zk.getChildren.pagination.disabled";

/**
* This property defines the default operation retry timeout in milliseconds for ZkClient.
* Most ZkClient operations are retried in cases like connection loss with the ZooKeeper servers.
* During such failures, this timeout decides the total amount of time to spend for retries before giving up.
* A value lesser than or equal to 0 is ignored and the default value will be used.
* <p>
* The default value is 86400000 (24 hours) if not configured.
*/
public static final String ZK_OPERATION_RETRY_TIMEOUT_MS =
"zk.operation.retry.timeout.ms";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.exception.ZkClientException;
import org.apache.helix.zookeeper.util.ZNRecordUtil;
import org.apache.helix.zookeeper.zkclient.IZkConnection;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
Expand Down Expand Up @@ -61,9 +62,6 @@
public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient implements HelixZkClient {
private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);

// Setting default operation retry timeout to 24 hours. It can also be overwritten via RealmAwareZkClientConfig.
// This timeout will be used while retrying zookeeper operations.
public static final int DEFAULT_OPERATION_TIMEOUT = 24 * 60 * 60 * 1000;
public static final int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000;
public static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;

Expand Down Expand Up @@ -117,7 +115,7 @@ public ZkClient(IZkConnection connection, int connectionTimeout,
PathBasedZkSerializer zkSerializer,
String monitorType, String monitorKey) {
this(connection, connectionTimeout, zkSerializer, monitorType, monitorKey,
DEFAULT_OPERATION_TIMEOUT);
ZNRecordUtil.getDefaultOperationRetryTimeout());
}

public ZkClient(String zkServers, String monitorType, String monitorKey) {
Expand Down Expand Up @@ -191,7 +189,7 @@ public static class Builder {

PathBasedZkSerializer _zkSerializer;

long _operationRetryTimeout = DEFAULT_OPERATION_TIMEOUT;
long _operationRetryTimeout = ZNRecordUtil.getDefaultOperationRetryTimeout();
int _connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
int _sessionTimeout = DEFAULT_SESSION_TIMEOUT;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import java.util.HashSet;
import java.util.Set;

import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.impl.client.SharedZkClient;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.helix.zookeeper.exception.ZkClientException;
import org.apache.helix.zookeeper.util.ZNRecordUtil;
import org.apache.helix.zookeeper.zkclient.IZkConnection;
import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.SerializableSerializer;
Expand Down Expand Up @@ -63,7 +63,7 @@ public class ZkConnectionManager extends ZkClient {
*/
protected ZkConnectionManager(IZkConnection zkConnection, long connectionTimeout,
String monitorKey) {
super(zkConnection, (int) connectionTimeout, HelixZkClient.DEFAULT_OPERATION_TIMEOUT,
super(zkConnection, (int) connectionTimeout, ZNRecordUtil.getDefaultOperationRetryTimeout(),
new BasicZkSerializer(new SerializableSerializer()), MONITOR_TYPE, monitorKey, null, true);
_monitorKey = monitorKey;
LOG.info("ZkConnection {} was created for sharing.", _monitorKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,24 @@ public static int getSerializerWriteSizeLimit() {

return writeSizeLimit;
}

/**
* Gets the default operation retry timeout from system property if configured,
* otherwise returns the hardcoded default value.
* It can also be overwritten via respective ZKClient Config
* @return the operation retry timeout in milliseconds
*/
public static long getDefaultOperationRetryTimeout() {
int defaultOperationTimeout = 24 * 60 * 60 * 1000;
String timeoutStr = System.getProperty(ZkSystemPropertyKeys.ZK_OPERATION_RETRY_TIMEOUT_MS);
if (timeoutStr != null && !timeoutStr.trim().isEmpty()) {
try {
long timeout = Long.parseLong(timeoutStr.trim());
return timeout <= 0 ? defaultOperationTimeout : timeout;
} catch (NumberFormatException e) {
// Invalid format, use default value
}
}
return defaultOperationTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public class ZkClient implements Watcher {
Integer.getInteger(ZkSystemPropertyKeys.JUTE_MAXBUFFER, ZNRecord.SIZE_LIMIT);

private final IZkConnection _connection;

// The operation retry timeout can be configured via:
// 1. Constructor parameter operationRetryTimeout
// 2. System property "zk.operation.retry.timeout.ms" (used as default if not explicitly set)
// 3. Respective ZKClientConfig (eg: RealmAwareZkClientConfig.setOperationRetryTimeout()) for higher-level clients
private final long _operationRetryTimeoutInMillis;
private final Map<String, Set<IZkChildListener>> _childListener = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Set<IZkDataListenerEntry>> _dataListener =
Expand Down Expand Up @@ -268,6 +273,10 @@ protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long opera
_persistListenerMutex = new ReentrantLock();
}

public long getOperationRetryTimeout() {
return _operationRetryTimeoutInMillis;
}

protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
String monitorInstanceName, boolean monitorRootPathOnly) {
Expand Down Expand Up @@ -2205,6 +2214,7 @@ public <T> T retryUntilConnected(final Callable<T> callable)
private void waitForRetry(long maxSleep) {
if (waitUntilConnected(_operationRetryTimeoutInMillis, TimeUnit.MILLISECONDS)) {
try {
LOG.debug("zkclient {} Wait for {} ms before retrying operation", _uid, maxSleep);
Thread.sleep(maxSleep);
} catch (InterruptedException ex) {
// we don't need to re-throw.
Expand Down
Loading