diff --git a/pom.xml b/pom.xml
index b52529f8..8ff0d9d9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -361,8 +361,44 @@
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.4
+
+
+
+ package
+
+ shade
+
+
+ true
+ shade
+
+
+ mysql:mysql-connector-java
+ org.slf4j:slf4j-api
+
+
+
+
+
+
+
+
+ com.alipay.sofa.common
+ com.shaded.alipay.sofa.common
+
+
+ com.alipay.remoting
+ com.shaded.alipay.remoting
+
+
+
+
-
\ No newline at end of file
+
diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
index efd49dc5..203e1679 100644
--- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
+++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
@@ -1241,8 +1241,7 @@ public TableEntry getOrRefreshTableEntry(final String tableName, final boolean r
// the server roster is ordered by priority
long punishInterval = (long) (tableEntryRefreshIntervalBase * Math.pow(2,
-serverRoster.getMaxPriority()));
- punishInterval = punishInterval <= tableEntryRefreshIntervalCeiling ? punishInterval
- : tableEntryRefreshIntervalCeiling;
+ punishInterval = Math.min(punishInterval, tableEntryRefreshIntervalCeiling);
// control refresh frequency less than 100 milli second
// just in case of connecting to OB Server failed or change master
long interval = System.currentTimeMillis() - tableEntry.getRefreshTimeMills();
@@ -1267,8 +1266,7 @@ public TableEntry getOrRefreshTableEntry(final String tableName, final boolean r
}
int serverSize = serverRoster.getMembers().size();
- int refreshTryTimes = tableEntryRefreshTryTimes > serverSize ? serverSize
- : tableEntryRefreshTryTimes;
+ int refreshTryTimes = Math.min(tableEntryRefreshTryTimes, serverSize);
for (int i = 0; i < refreshTryTimes; i++) {
try {
@@ -1330,6 +1328,62 @@ private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName)
return refreshTableEntry(tableEntry, tableName, false);
}
+ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String tableName, Long tabletId) throws ObTableAuthException {
+ TableEntryKey tableEntryKey = new TableEntryKey(clusterName, tenantName, database, tableName);
+ try {
+ if (tableEntry == null) {
+ throw new ObTableEntryRefreshException("Table entry is null, tableName=" + tableName);
+ }
+ long lastRefreshTime = tableEntry.getPartitionEntry().getPartitionInfo(tabletId).getLastUpdateTime();
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - lastRefreshTime < tableEntryRefreshLockTimeout) {
+ return tableEntry;
+ }
+
+ Lock lock = tableEntry.refreshLockMap.computeIfAbsent(tabletId, k -> new ReentrantLock());
+
+ if (!lock.tryLock(tableEntryRefreshLockTimeout, TimeUnit.MILLISECONDS)) {
+ String errMsg = String.format("Try to lock table-entry refreshing timeout. DataSource: %s, TableName: %s, Timeout: %d.",
+ dataSourceName, tableName, tableEntryRefreshLockTimeout);
+ RUNTIME.error(errMsg);
+ throw new ObTableEntryRefreshException(errMsg);
+ }
+
+ try {
+ lastRefreshTime = tableEntry.getPartitionEntry().getPartitionInfo(tabletId).getLastUpdateTime();
+ currentTime = System.currentTimeMillis();
+ if (currentTime - lastRefreshTime < tableEntryRefreshLockTimeout) {
+ return tableEntry;
+ }
+ tableEntry = loadTableEntryLocationWithPriority(serverRoster, tableEntryKey, tableEntry, tabletId,
+ tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout,
+ serverAddressPriorityTimeout, serverAddressCachingTimeout, sysUA);
+
+ tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation());
+ } finally {
+ lock.unlock();
+ }
+
+ } catch (ObTableNotExistException | ObTableServerCacheExpiredException e) {
+ RUNTIME.error("RefreshTableEntry encountered an exception", e);
+ throw e;
+ } catch (Exception e) {
+ String errorMsg = String.format("Failed to get table entry. Key=%s, TabletId=%d, message=%s", tableEntryKey, tabletId, e.getMessage());
+ RUNTIME.error(LCD.convert("01-00020"), tableEntryKey, tableEntry, e);
+ throw new ObTableEntryRefreshException(errorMsg, e);
+ }
+
+ tableLocations.put(tableName, tableEntry);
+ tableEntryRefreshContinuousFailureCount.set(0);
+
+ if (logger.isInfoEnabled()) {
+ logger.info("Refreshed table entry. DataSource: {}, TableName: {}, Key: {}, Entry: {}",
+ dataSourceName, tableName, tableEntryKey, JSON.toJSON(tableEntry));
+ }
+
+ return tableEntry;
+ }
+
/**
* 刷新 table entry 元数据
* @param tableEntry
@@ -1345,13 +1399,18 @@ private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName, bo
try {
// if table entry is exist we just need to refresh table locations
if (tableEntry != null && !fetchAll) {
- tableEntry = loadTableEntryLocationWithPriority(serverRoster, //
- tableEntryKey,//
- tableEntry,//
- tableEntryAcquireConnectTimeout,//
- tableEntryAcquireSocketTimeout,//
- serverAddressPriorityTimeout, //
- serverAddressCachingTimeout, sysUA);
+ if (ObGlobal.obVsnMajor() >= 4) {
+ // do nothing
+ } else {
+ // 3.x still proactively refreshes all locations
+ tableEntry = loadTableEntryLocationWithPriority(serverRoster, //
+ tableEntryKey,//
+ tableEntry,//
+ tableEntryAcquireConnectTimeout,//
+ tableEntryAcquireSocketTimeout,//
+ serverAddressPriorityTimeout, //
+ serverAddressCachingTimeout, sysUA);
+ }
} else {
// if table entry is not exist we should fetch partition info and table locations
tableEntry = loadTableEntryWithPriority(serverRoster, //
@@ -1360,7 +1419,6 @@ private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName, bo
tableEntryAcquireSocketTimeout,//
serverAddressPriorityTimeout,//
serverAddressCachingTimeout, sysUA);
-
if (tableEntry.isPartitionTable()) {
switch (runningMode) {
case HBASE:
@@ -1544,17 +1602,15 @@ private ObPair getPartitionReplica(TableEntry tableEntry,
*/
private ReplicaLocation getPartitionLocation(TableEntry tableEntry, long partId,
ObServerRoute route) {
- if (ObGlobal.obVsnMajor() >= 4 && tableEntry.isPartitionTable()) {
- ObPartitionInfo partInfo = tableEntry.getPartitionInfo();
- Map tabletIdMap = partInfo.getPartTabletIdMap();
- long partIdx = tableEntry.getPartIdx(partId);
- long TabletId = tabletIdMap.get(partIdx);
- return tableEntry.getPartitionEntry().getPartitionLocationWithTabletId(TabletId)
- .getReplica(route);
- } else {
- return tableEntry.getPartitionEntry().getPartitionLocationWithPartId(partId)
- .getReplica(route);
- }
+ long tabletId = getTabletIdByPartId(tableEntry, partId);
+ return tableEntry.getPartitionEntry().getPartitionLocationWithTabletId(tabletId)
+ .getReplica(route);
+
+ }
+
+ private ReplicaLocation getPartitionLocation(ObPartitionLocationInfo obPartitionLocationInfo,
+ ObServerRoute route) {
+ return obPartitionLocationInfo.getPartitionLocation().getReplica(route);
}
/**
@@ -1784,48 +1840,87 @@ public ObTable addTable(ObServerAddr addr){
public ObPair getTableInternal(String tableName, TableEntry tableEntry,
long partId, boolean waitForRefresh,
ObServerRoute route) throws Exception {
- ObPair partitionReplica = getPartitionReplica(tableEntry, partId,
- route);
-
- ReplicaLocation replica = partitionReplica.getRight();
-
+ ReplicaLocation replica = null;
+ long tabletId = getTabletIdByPartId(tableEntry, partId);
+ ObPartitionLocationInfo obPartitionLocationInfo = null;
+ if (ObGlobal.obVsnMajor() >= 4) {
+
+ obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
+
+ replica = getPartitionLocation(obPartitionLocationInfo, route);
+ } else {
+ ObPair partitionReplica = getPartitionReplica(tableEntry, partId,
+ route);
+ replica = partitionReplica.getRight();
+ }
+ if (replica == null) {
+ RUNTIME.error("Cannot get replica by partId: " + partId);
+ throw new ObTableGetException("Cannot get replica by partId: " + partId);
+ }
ObServerAddr addr = replica.getAddr();
ObTable obTable = tableRoster.get(addr);
- boolean addrExpired = addr.isExpired(serverAddressCachingTimeout);
- if (obTable == null) {
- logger.warn("can not get ObTable by addr {}, refresh metadata.", addr);
- syncRefreshMetadata();
- }
- if (addrExpired || obTable == null) {
- if (logger.isInfoEnabled() && addrExpired) {
- logger.info("server addr {} is expired, refresh tableEntry.", addr);
- }
- tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false);
- replica = getPartitionReplica(tableEntry, partId, route).getRight();
+ if (obTable == null || addr.isExpired(serverAddressCachingTimeout)) {
+ if (obTable == null) {
+ logger.warn("Cannot get ObTable by addr {}, refreshing metadata.", addr);
+ syncRefreshMetadata();
+ }
+ if (addr.isExpired(serverAddressCachingTimeout)) {
+ logger.info("Server addr {} is expired, refreshing tableEntry.", addr);
+ }
+
+ if (ObGlobal.obVsnMajor() >= 4) {
+ obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
+ replica = getPartitionLocation(obPartitionLocationInfo, route);
+ } else {
+ tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false);
+ replica = getPartitionReplica(tableEntry, partId, route).getRight();
+ }
+
addr = replica.getAddr();
obTable = tableRoster.get(addr);
+
+ if (obTable == null) {
+ RUNTIME.error("Cannot get table by addr: " + addr);
+ throw new ObTableGetException("Cannot get table by addr: " + addr);
+ }
}
+ ObTableParam param = null;
+ if (ObGlobal.obVsnMajor() >= 4) {
+ param = createTableParam(obTable, tableEntry, obPartitionLocationInfo, partId, tabletId);
+ } else {
+ param.setPartId(partId);
+ param.setTableId(tableEntry.getTableId());
+ param.setPartitionId(partId);
+ }
+ addr.recordAccess();
+ return new ObPair<>(tabletId, param);
+ }
- if (obTable == null) {
- RUNTIME.error("cannot get table by addr: " + addr);
- throw new ObTableGetException("cannot get table by addr: " + addr);
+ private ObPartitionLocationInfo getOrRefreshPartitionInfo(TableEntry tableEntry,
+ String tableName, long tabletId)
+ throws Exception {
+ ObPartitionLocationInfo obPartitionLocationInfo = tableEntry.getPartitionEntry()
+ .getPartitionInfo(tabletId);
+ if (!obPartitionLocationInfo.initialized.get()) {
+ tableEntry = refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
+ obPartitionLocationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId);
+ obPartitionLocationInfo.initializationLatch.await();
}
+ return obPartitionLocationInfo;
+ }
+ private ObTableParam createTableParam(ObTable obTable, TableEntry tableEntry,
+ ObPartitionLocationInfo obPartitionLocationInfo,
+ long partId, long tabletId) {
ObTableParam param = new ObTableParam(obTable);
- param.setPartId(partId); // used in getTable(), 4.x may change the origin partId
+ param.setPartId(partId);
if (ObGlobal.obVsnMajor() >= 4 && tableEntry != null) {
- long partIdx = tableEntry.getPartIdx(partId);
- partId = tableEntry.isPartitionTable() ? tableEntry.getPartitionInfo()
- .getPartTabletIdMap().get(partIdx) : partId;
- param.setLsId(tableEntry.getPartitionEntry().getLsId(partId));
+ param.setLsId(obPartitionLocationInfo.getTabletLsId());
}
-
param.setTableId(tableEntry.getTableId());
- param.setPartitionId(partId);
-
- addr.recordAccess();
- return new ObPair(partitionReplica.getLeft(), param);
+ param.setPartitionId(tabletId);
+ return param;
}
/**
@@ -1840,39 +1935,58 @@ public ObPair getTableInternal(String tableName, TableEntry
* @throws Exception
*/
private List> getPartitionReplica(TableEntry tableEntry,
+ String tableName,
Row startRow,
boolean startIncluded,
Row endRow,
boolean endIncluded,
- ObServerRoute route)
- throws Exception {
- // non partition
- List> replicas = new ArrayList>();
- if (!tableEntry.isPartitionTable()
- || tableEntry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_ZERO) {
- replicas.add(new ObPair(0L, getPartitionLocation(tableEntry, 0L,
- route)));
+ ObServerRoute route) throws Exception {
+ List> replicas = new ArrayList<>();
+
+ if (!tableEntry.isPartitionTable() || tableEntry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_ZERO) {
+ long tabletId = getTabletIdByPartId(tableEntry, 0L);
+ ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
+ replicas.add(new ObPair<>(tabletId, getPartitionLocation(locationInfo, route)));
return replicas;
- } else if (tableEntry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_ONE) {
- List partIds = tableEntry.getPartitionInfo().getFirstPartDesc()
+ }
+
+ ObPartitionLevel partitionLevel = tableEntry.getPartitionInfo().getLevel();
+ List partIds = getPartitionTablePartitionIds(tableEntry, startRow, startIncluded, endRow, endIncluded, partitionLevel);
+
+ for (Long partId : partIds) {
+ long tabletId = getTabletIdByPartId(tableEntry, partId);
+ ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
+ replicas.add(new ObPair<>(tabletId, getPartitionLocation(locationInfo, route)));
+ }
+
+ return replicas;
+ }
+
+ private List getPartitionTablePartitionIds(TableEntry tableEntry,
+ Row startRow, boolean startIncluded,
+ Row endRow, boolean endIncluded,
+ ObPartitionLevel level)
+ throws Exception {
+ if (level == ObPartitionLevel.LEVEL_ONE) {
+ return tableEntry.getPartitionInfo().getFirstPartDesc()
.getPartIds(startRow, startIncluded, endRow, endIncluded);
- for (Long partId : partIds) {
- replicas.add(new ObPair(partId, getPartitionLocation(
- tableEntry, partId, route)));
- }
- } else if (tableEntry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_TWO) {
- List partIds = getPartitionsForLevelTwo(tableEntry, startRow, startIncluded,
- endRow, endIncluded);
- for (Long partId : partIds) {
- replicas.add(new ObPair(partId, getPartitionLocation(
- tableEntry, partId, route)));
- }
+ } else if (level == ObPartitionLevel.LEVEL_TWO) {
+ return getPartitionsForLevelTwo(tableEntry, startRow, startIncluded,
+ endRow, endIncluded);
} else {
RUNTIME.error("not allowed bigger than level two");
throw new ObTableGetException("not allowed bigger than level two");
}
+ }
- return replicas;
+ public long getTabletIdByPartId(TableEntry tableEntry, Long partId) {
+ if (ObGlobal.obVsnMajor() >= 4 && tableEntry.isPartitionTable()) {
+ ObPartitionInfo partInfo = tableEntry.getPartitionInfo();
+ Map tabletIdMap = partInfo.getPartTabletIdMap();
+ long partIdx = tableEntry.getPartIdx(partId);
+ return tabletIdMap.getOrDefault(partIdx, partId);
+ }
+ return partId;
}
/**
@@ -1947,7 +2061,7 @@ public List> getTables(String tableName, ObTableQuery
}
}
- List> partIdWithReplicaList = getPartitionReplica(tableEntry,
+ List> partIdWithReplicaList = getPartitionReplica(tableEntry, tableName,
startRow, startInclusive, endRow, endInclusive, route);
// obTableParams -> List>
@@ -1976,12 +2090,6 @@ public List> getTables(String tableName, ObTableQuery
}
ObTableParam param = new ObTableParam(obTable);
- if (ObGlobal.obVsnMajor() >= 4) {
- long partIdx = tableEntry.getPartIdx(partId);
- partId = tableEntry.isPartitionTable() ? tableEntry.getPartitionInfo()
- .getPartTabletIdMap().get(partIdx) : partId;
- }
-
param.setTableId(tableEntry.getTableId());
// real partition(tablet) id
param.setPartitionId(partId);
diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java
index 4da53fb5..37a708bf 100644
--- a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java
+++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java
@@ -122,7 +122,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
ObRpcResultCode resultCode = new ObRpcResultCode();
resultCode.decode(buf);
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
- if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) {
+ if (!conn.getObTable().isEnableRerouting() && response.getHeader().isRoutingWrong()) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: " + response.getMessage());
logger.warn(errMessage);
@@ -139,7 +139,8 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
}
}
- if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
+ if (resultCode.getRcode() != 0
+ && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: " + response.getMessage());
logger.warn(errMessage);
@@ -193,6 +194,8 @@ private boolean needFetchAll(int errorCode, int pcode) {
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
+ || errorCode == ResultCodes.OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST.errorCode
+ || errorCode == ResultCodes.OB_SNAPSHOT_DISCARDED.errorCode
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
}
diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java
index 20d657e0..ecc37f25 100644
--- a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java
+++ b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java
@@ -56,135 +56,156 @@
public class LocationUtil {
- private static final Logger logger = TableClientLoggerFactory
- .getLogger(LocationUtil.class);
+ private static final Logger logger = TableClientLoggerFactory
+ .getLogger(LocationUtil.class);
static {
ParserConfig.getGlobalInstance().setSafeMode(true);
}
- private static final String OB_VERSION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ OB_VERSION() AS CLUSTER_VERSION;";
+ private static final String OB_VERSION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ OB_VERSION() AS CLUSTER_VERSION;";
- private static final String PROXY_INDEX_INFO_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ data_table_id, table_id, index_type FROM oceanbase.__all_virtual_table "
- + "where table_name = ?";
+ private static final String PROXY_INDEX_INFO_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ data_table_id, table_id, index_type FROM oceanbase.__all_virtual_table "
+ + "where table_name = ?";
- private static final String PROXY_TABLE_ID_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ table_id from oceanbase.__all_virtual_proxy_schema "
- + "where tenant_name = ? and database_name = ? and table_name = ? limit 1";
+ private static final String PROXY_TABLE_ID_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ table_id from oceanbase.__all_virtual_proxy_schema "
+ + "where tenant_name = ? and database_name = ? and table_name = ? limit 1";
- private static final String OB_TENANT_EXIST_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ tenant_id from __all_tenant where tenant_name = ?;";
+ private static final String OB_TENANT_EXIST_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ tenant_id from __all_tenant where tenant_name = ?;";
@Deprecated
@SuppressWarnings("unused")
- private static final String PROXY_PLAIN_SCHEMA_SQL_FORMAT = "SELECT /*+READ_CONSISTENCY(WEAK)*/ partition_id, svr_ip, sql_port, table_id, role, part_num, replica_num, schema_version, spare1 "
- + "FROM oceanbase.__all_virtual_proxy_schema "
- + "WHERE tenant_name = ? AND database_name = ? AND table_name = ? AND partition_id in ({0}) AND sql_port > 0 "
- + "ORDER BY role ASC LIMIT ?";
-
- private static final String PROXY_PART_INFO_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_level, part_num, part_type, part_space, part_expr, "
- + "part_range_type, part_interval_bin, interval_start_bin, "
- + "sub_part_num, sub_part_type, sub_part_space, "
- + "sub_part_range_type, def_sub_part_interval_bin, def_sub_interval_start_bin, sub_part_expr, "
- + "part_key_name, part_key_type, part_key_idx, part_key_extra, spare1 "
- + "FROM oceanbase.__all_virtual_proxy_partition_info "
- + "WHERE table_id = ? group by part_key_name order by part_key_name LIMIT ?;";
+ private static final String PROXY_PLAIN_SCHEMA_SQL_FORMAT = "SELECT /*+READ_CONSISTENCY(WEAK)*/ partition_id, svr_ip, sql_port, table_id, role, part_num, replica_num, schema_version, spare1 "
+ + "FROM oceanbase.__all_virtual_proxy_schema "
+ + "WHERE tenant_name = ? AND database_name = ? AND table_name = ? AND partition_id in ({0}) AND sql_port > 0 "
+ + "ORDER BY role ASC LIMIT ?";
+
+ private static final String PROXY_PART_INFO_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_level, part_num, part_type, part_space, part_expr, "
+ + "part_range_type, part_interval_bin, interval_start_bin, "
+ + "sub_part_num, sub_part_type, sub_part_space, "
+ + "sub_part_range_type, def_sub_part_interval_bin, def_sub_interval_start_bin, sub_part_expr, "
+ + "part_key_name, part_key_type, part_key_idx, part_key_extra, spare1 "
+ + "FROM oceanbase.__all_virtual_proxy_partition_info "
+ + "WHERE table_id = ? group by part_key_name order by part_key_name LIMIT ?;";
@Deprecated
@SuppressWarnings("unused")
- private static final String PROXY_TENANT_SCHEMA_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ svr_ip, sql_port, table_id, role, part_num, replica_num, spare1 "
- + "FROM oceanbase.__all_virtual_proxy_schema "
- + "WHERE tenant_name = ? AND database_name = ? AND table_name = ? AND sql_port > 0 "
- + "ORDER BY partition_id ASC, role ASC LIMIT ?";
-
- private static final String PROXY_DUMMY_LOCATION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.partition_id as partition_id, A.svr_ip as svr_ip, A.sql_port as sql_port, "
- + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time "
- + ", A.spare1 as replica_type "
- + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port "
- + "WHERE tenant_name = ? and database_name=? and table_name = ?";
-
- private static final String PROXY_LOCATION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.partition_id as partition_id, A.svr_ip as svr_ip, A.sql_port as sql_port, "
- + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time "
- + ", A.spare1 as replica_type "
- + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port "
- + "WHERE tenant_name = ? and database_name=? and table_name = ? and partition_id = 0";
-
- private static final String PROXY_LOCATION_SQL_PARTITION = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.partition_id as partition_id, A.svr_ip as svr_ip, A.sql_port as sql_port, "
- + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time "
- + ", A.spare1 as replica_type "
- + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port "
- + "WHERE tenant_name = ? and database_name=? and table_name = ? and partition_id in ({0})";
-
- private static final String PROXY_FIRST_PARTITION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_id, part_name, high_bound_val "
- + "FROM oceanbase.__all_virtual_proxy_partition "
- + "WHERE table_id = ? LIMIT ?;";
-
- private static final String PROXY_SUB_PARTITION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ sub_part_id, part_name, high_bound_val "
- + "FROM oceanbase.__all_virtual_proxy_sub_partition "
- + "WHERE table_id = ? LIMIT ?;";
-
- private static final String PROXY_SERVER_STATUS_INFO = "SELECT ss.svr_ip, ss.zone, zs.region, zs.spare4 as idc "
- + "FROM oceanbase.__all_virtual_proxy_server_stat ss, oceanbase.__all_virtual_zone_stat zs "
- + "WHERE zs.zone = ss.zone ;";
+ private static final String PROXY_TENANT_SCHEMA_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ svr_ip, sql_port, table_id, role, part_num, replica_num, spare1 "
+ + "FROM oceanbase.__all_virtual_proxy_schema "
+ + "WHERE tenant_name = ? AND database_name = ? AND table_name = ? AND sql_port > 0 "
+ + "ORDER BY partition_id ASC, role ASC LIMIT ?";
+
+ private static final String PROXY_DUMMY_LOCATION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.partition_id as partition_id, A.svr_ip as svr_ip, A.sql_port as sql_port, "
+ + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time "
+ + ", A.spare1 as replica_type "
+ + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port "
+ + "WHERE tenant_name = ? and database_name=? and table_name = ?";
+
+ private static final String PROXY_LOCATION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.partition_id as partition_id, A.svr_ip as svr_ip, A.sql_port as sql_port, "
+ + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time "
+ + ", A.spare1 as replica_type "
+ + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port "
+ + "WHERE tenant_name = ? and database_name=? and table_name = ? and partition_id = 0";
+
+ private static final String PROXY_LOCATION_SQL_PARTITION = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.partition_id as partition_id, A.svr_ip as svr_ip, A.sql_port as sql_port, "
+ + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time "
+ + ", A.spare1 as replica_type "
+ + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port "
+ + "WHERE tenant_name = ? and database_name=? and table_name = ? and partition_id in ({0})";
+
+ private static final String PROXY_FIRST_PARTITION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_id, part_name, high_bound_val "
+ + "FROM oceanbase.__all_virtual_proxy_partition "
+ + "WHERE table_id = ? LIMIT ?;";
+
+ private static final String PROXY_SUB_PARTITION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ sub_part_id, part_name, high_bound_val "
+ + "FROM oceanbase.__all_virtual_proxy_sub_partition "
+ + "WHERE table_id = ? LIMIT ?;";
+
+ private static final String PROXY_SERVER_STATUS_INFO = "SELECT ss.svr_ip, ss.zone, zs.region, zs.spare4 as idc "
+ + "FROM oceanbase.__all_virtual_proxy_server_stat ss, oceanbase.__all_virtual_zone_stat zs "
+ + "WHERE zs.zone = ss.zone ;";
@Deprecated
@SuppressWarnings("unused")
- private static final String PROXY_PLAIN_SCHEMA_SQL_FORMAT_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ tablet_id, svr_ip, sql_port, table_id, role, part_num, replica_num, schema_version, spare1 "
- + "FROM oceanbase.__all_virtual_proxy_schema "
- + "WHERE tenant_name = ? AND database_name = ? AND table_name = ? AND tablet_id in ({0}) AND sql_port > 0 "
- + "ORDER BY role ASC LIMIT ?";
-
- private static final String PROXY_PART_INFO_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_level, part_num, part_type, part_space, part_expr, "
- + "part_range_type, sub_part_num, sub_part_type, sub_part_space, sub_part_range_type, sub_part_expr, "
- + "part_key_name, part_key_type, part_key_idx, part_key_extra, part_key_collation_type "
- + "FROM oceanbase.__all_virtual_proxy_partition_info "
- + "WHERE tenant_name = ? and table_id = ? group by part_key_name order by part_key_name LIMIT ?;";
+ private static final String PROXY_PLAIN_SCHEMA_SQL_FORMAT_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ tablet_id, svr_ip, sql_port, table_id, role, part_num, replica_num, schema_version, spare1 "
+ + "FROM oceanbase.__all_virtual_proxy_schema "
+ + "WHERE tenant_name = ? AND database_name = ? AND table_name = ? AND tablet_id in ({0}) AND sql_port > 0 "
+ + "ORDER BY role ASC LIMIT ?";
+
+ private static final String PROXY_PART_INFO_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_level, part_num, part_type, part_space, part_expr, "
+ + "part_range_type, sub_part_num, sub_part_type, sub_part_space, sub_part_range_type, sub_part_expr, "
+ + "part_key_name, part_key_type, part_key_idx, part_key_extra, part_key_collation_type "
+ + "FROM oceanbase.__all_virtual_proxy_partition_info "
+ + "WHERE tenant_name = ? and table_id = ? group by part_key_name order by part_key_name LIMIT ?;";
@Deprecated
@SuppressWarnings("unused")
- private static final String PROXY_TENANT_SCHEMA_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ svr_ip, sql_port, table_id, role, part_num, replica_num, spare1 "
- + "FROM oceanbase.__all_virtual_proxy_schema "
- + "WHERE tenant_name = ? AND database_name = ? AND table_name = ? AND sql_port > 0 "
- + "ORDER BY tablet_id ASC, role ASC LIMIT ?";
-
- private static final String PROXY_DUMMY_LOCATION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.tablet_id as tablet_id, A.svr_ip as svr_ip, A.sql_port as sql_port, "
- + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time "
- + ", A.spare1 as replica_type "
- + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port "
- + "WHERE tenant_name = ? and database_name=? and table_name = ?";
-
- private static final String PROXY_LOCATION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.tablet_id as tablet_id, A.svr_ip as svr_ip, A.sql_port as sql_port, "
- + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time "
- + ", A.spare1 as replica_type "
- + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port "
- + "WHERE tenant_name = ? and database_name=? and table_name = ? and tablet_id = 0";
-
- private static final String PROXY_LOCATION_SQL_PARTITION_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.tablet_id as tablet_id, A.svr_ip as svr_ip, A.sql_port as sql_port, "
- + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time "
- + ", A.spare1 as replica_type, D.ls_id as ls_id "
- + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port "
- + "inner join oceanbase.DBA_OB_TENANTS C on C.tenant_name = A.tenant_name "
- + "left join oceanbase.CDB_OB_TABLET_TO_LS D on D.tenant_id = C.tenant_id and D.tablet_id = A.tablet_id "
- + "WHERE C.tenant_name = ? and database_name= ? and table_name = ? and A.tablet_id in ({0}) ";
-
- private static final String PROXY_FIRST_PARTITION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_id, part_name, tablet_id, high_bound_val, sub_part_num "
- + "FROM oceanbase.__all_virtual_proxy_partition "
- + "WHERE tenant_name = ? and table_id = ? LIMIT ?;";
-
- private static final String PROXY_SUB_PARTITION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ sub_part_id, part_name, tablet_id, high_bound_val "
- + "FROM oceanbase.__all_virtual_proxy_sub_partition "
- + "WHERE tenant_name = ? and table_id = ? LIMIT ?;";
-
- private static final String PROXY_SERVER_STATUS_INFO_V4 = "SELECT ss.svr_ip, ss.zone, zs.region, zs.idc as idc "
- + "FROM DBA_OB_SERVERS ss, DBA_OB_ZONES zs "
- + "WHERE zs.zone = ss.zone ;";
-
- private static final String home = System.getProperty("user.home",
- "/home/admin");
-
- private static final String TABLE_GROUP_GET_TABLE_NAME_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ table_name "
- + "FROM oceanbase.CDB_OB_TABLEGROUP_TABLES "
- + "WHERE tablegroup_name = ? and tenant_id = ? limit 1;";
-
- private static final int TEMPLATE_PART_ID = -1;
+ private static final String PROXY_TENANT_SCHEMA_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ svr_ip, sql_port, table_id, role, part_num, replica_num, spare1 "
+ + "FROM oceanbase.__all_virtual_proxy_schema "
+ + "WHERE tenant_name = ? AND database_name = ? AND table_name = ? AND sql_port > 0 "
+ + "ORDER BY tablet_id ASC, role ASC LIMIT ?";
+
+ private static final String PROXY_DUMMY_LOCATION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.tablet_id as tablet_id, A.svr_ip as svr_ip, A.sql_port as sql_port, "
+ + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time "
+ + ", A.spare1 as replica_type "
+ + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port "
+ + "WHERE tenant_name = ? and database_name=? and table_name = ?";
+
+ private static final String PROXY_LOCATION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ A.tablet_id as tablet_id, A.svr_ip as svr_ip, A.sql_port as sql_port, "
+ + "A.table_id as table_id, A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, B.stop_time as stop_time "
+ + ", A.spare1 as replica_type "
+ + "FROM oceanbase.__all_virtual_proxy_schema A inner join oceanbase.__all_server B on A.svr_ip = B.svr_ip and A.sql_port = B.inner_port "
+ + "WHERE tenant_name = ? and database_name=? and table_name = ? and tablet_id = 0";
+
+ private static final String PROXY_LOCATION_SQL_PARTITION_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ * FROM ( "
+ + " SELECT A.tablet_id as tablet__id, A.svr_ip as svr_ip, A.sql_port as sql_port, A.table_id as table_id, "
+ + " A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, "
+ + " B.stop_time as stop_time, A.spare1 as replica_type "
+ + " FROM oceanbase.__all_virtual_proxy_schema A "
+ + " INNER JOIN oceanbase.__all_server B ON A.svr_ip = B.svr_ip AND A.sql_port = B.inner_port "
+ + " WHERE A.tablet_id IN ({0}) AND A.tenant_name = ? AND A.database_name = ? AND A.table_name = ?) AS left_table "
+ + "LEFT JOIN ("
+ + " SELECT D.ls_id, D.tablet_id "
+ + " FROM oceanbase.__all_virtual_tablet_to_ls D "
+ + " INNER JOIN oceanbase.DBA_OB_TENANTS C ON D.tenant_id = C.tenant_id "
+ + " WHERE C.tenant_name = ? "
+ + ") AS right_table ON left_table.tablet__id = right_table.tablet_id;";
+
+ private static final String PROXY_LOCATION_SQL_PARTITION_BY_TABLETID_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ * FROM ( "
+ + " SELECT A.tablet_id as tablet__id, A.svr_ip as svr_ip, A.sql_port as sql_port, A.table_id as table_id, "
+ + " A.role as role, A.replica_num as replica_num, A.part_num as part_num, B.svr_port as svr_port, B.status as status, "
+ + " B.stop_time as stop_time, A.spare1 as replica_type "
+ + " FROM oceanbase.__all_virtual_proxy_schema A "
+ + " INNER JOIN oceanbase.__all_server B ON A.svr_ip = B.svr_ip AND A.sql_port = B.inner_port "
+ + " WHERE A.tablet_id = ? AND A.tenant_name = ? AND A.database_name = ? AND A.table_name = ?) AS left_table "
+ + "LEFT JOIN ("
+ + " SELECT D.ls_id, D.tablet_id "
+ + " FROM oceanbase.__all_virtual_tablet_to_ls D "
+ + " INNER JOIN oceanbase.DBA_OB_TENANTS C ON D.tenant_id = C.tenant_id "
+ + " WHERE C.tenant_name = ? "
+ + ") AS right_table ON left_table.tablet__id = right_table.tablet_id;";
+
+ private static final String PROXY_FIRST_PARTITION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_id, part_name, tablet_id, high_bound_val, sub_part_num "
+ + "FROM oceanbase.__all_virtual_proxy_partition "
+ + "WHERE tenant_name = ? and table_id = ? LIMIT ?;";
+
+ private static final String PROXY_SUB_PARTITION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ sub_part_id, part_name, tablet_id, high_bound_val "
+ + "FROM oceanbase.__all_virtual_proxy_sub_partition "
+ + "WHERE tenant_name = ? and table_id = ? LIMIT ?;";
+
+ private static final String PROXY_SERVER_STATUS_INFO_V4 = "SELECT ss.svr_ip, ss.zone, zs.region, zs.idc as idc "
+ + "FROM DBA_OB_SERVERS ss, DBA_OB_ZONES zs "
+ + "WHERE zs.zone = ss.zone ;";
+
+ private static final String home = System.getProperty(
+ "user.home",
+ "/home/admin");
+
+ private static final String TABLE_GROUP_GET_TABLE_NAME_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ table_name "
+ + "FROM oceanbase.CDB_OB_TABLEGROUP_TABLES "
+ + "WHERE tablegroup_name = ? and tenant_id = ? limit 1;";
+
+ private static final int TEMPLATE_PART_ID = -1;
// limit the size of get tableEntry location from remote each time
- private static final int MAX_TABLET_NUMS_EPOCH = 300;
+ private static final int MAX_TABLET_NUMS_EPOCH = 300;
private abstract static class TableEntryRefreshWithPriorityCallback {
abstract T execute(ObServerAddr obServerAddr) throws ObTableEntryRefreshException;
@@ -374,7 +395,8 @@ private static TableEntry callTableEntryRefresh(ObServerAddr obServerAddr, Table
RUNTIME.error(LCD.convert("01-00007"), url, key, e);
}
throw new ObTableEntryRefreshException(format(
- "fail to refresh table entry from remote url=%s, key=%s", url, key), e);
+ "fail to refresh table entry from remote url=%s, key=%s, message=%s", url, key,
+ e.getMessage()), e);
} finally {
try {
if (null != connection) {
@@ -457,6 +479,37 @@ TableEntry execute(Connection connection)
});
}
+ /*
+ * Load table entry location with priority by tablet id.
+ */
+ public static TableEntry loadTableEntryLocationWithPriority(final ServerRoster serverRoster,
+ final TableEntryKey key,
+ final TableEntry tableEntry,
+ final Long tabletId,
+ final long connectTimeout,
+ final long socketTimeout,
+ final long priorityTimeout,
+ final long cachingTimeout,
+ final ObUserAuth sysUA)
+ throws ObTableEntryRefreshException {
+
+ return callTableEntryRefreshWithPriority(serverRoster, priorityTimeout, cachingTimeout,
+ new TableEntryRefreshWithPriorityCallback() {
+ @Override
+ TableEntry execute(ObServerAddr obServerAddr) throws ObTableEntryRefreshException {
+ return callTableEntryRefresh(obServerAddr, key, connectTimeout, socketTimeout,
+ sysUA, true, new TableEntryRefreshCallback() {
+ @Override
+ TableEntry execute(Connection connection)
+ throws ObTablePartitionLocationRefreshException {
+ return getTableEntryLocationFromRemote(connection, key, tableEntry,
+ tabletId);
+ }
+ });
+ }
+ });
+ }
+
/*
* load Table Name With table Group
*/
@@ -682,9 +735,16 @@ private static TableEntry getTableEntryFromRemote(Connection connection, TableEn
}
}
}
-
- // get location info
- getTableEntryLocationFromRemote(connection, key, tableEntry);
+
+ if (ObGlobal.obVsnMajor() >= 4) {
+ // only set empty partitionEntry
+ ObPartitionEntry partitionEntry = new ObPartitionEntry();
+ tableEntry.setPartitionEntry(partitionEntry);
+ tableEntry.setRefreshTimeMills(System.currentTimeMillis());
+ } else {
+ // get location info
+ getTableEntryLocationFromRemote(connection, key, tableEntry);
+ }
if (!initialized) {
if (BOOT.isInfoEnabled()) {
@@ -719,6 +779,17 @@ private static TableEntry getTableEntryFromRemote(Connection connection, TableEn
return tableEntry;
}
+ // Note: This code is applicable only for refreshing locations based on tablet ID in version 4.x
+ private static String genLocationSQLByTabletId() {
+ String sql = null;
+ if (ObGlobal.obVsnMajor() >= 4) {
+ sql = PROXY_LOCATION_SQL_PARTITION_BY_TABLETID_V4;
+ } else {
+ throw new FeatureNotSupportedException("not support ob version less than 4");
+ }
+ return sql;
+ }
+
private static String genLocationSQLByOffset(TableEntry tableEntry, int offset, int size) {
StringBuilder sb = new StringBuilder();
String sql = null;
@@ -776,6 +847,44 @@ private static String genLocationSQLByOffset(TableEntry tableEntry, int offset,
return sql;
}
+ public static TableEntry getTableEntryLocationFromRemote(Connection connection,
+ TableEntryKey key,
+ TableEntry tableEntry, Long tabletId)
+ throws ObTablePartitionLocationRefreshException {
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ ObPartitionEntry partitionEntry = tableEntry.getPartitionEntry();
+ String sql = genLocationSQLByTabletId();
+ try {
+ ps = connection.prepareStatement(sql);
+ ps.setLong(1, tabletId);
+ ps.setString(2, key.getTenantName());
+ ps.setString(3, key.getDatabaseName());
+ ps.setString(4, key.getTableName());
+ ps.setString(5, key.getTenantName());
+ rs = ps.executeQuery();
+ getPartitionLocationFromResultSetByTablet(tableEntry, rs, partitionEntry, tabletId);
+ } catch (Exception e) {
+ RUNTIME.error(LCD.convert("01-00010"), key, tableEntry, e);
+ throw new ObTablePartitionLocationRefreshException(format(
+ "fail to get partition location entry from remote entryKey = %s tableEntry =%s ",
+ key, tableEntry), e);
+ } finally {
+ try {
+ if (null != rs) {
+ rs.close();
+ }
+ if (null != ps) {
+ ps.close();
+ }
+ } catch (SQLException e) {
+ // ignore
+ }
+ }
+ tableEntry.setRefreshTimeMills(System.currentTimeMillis());
+ return tableEntry;
+ }
+
/*
* Get table entry location from remote.
*/
@@ -792,6 +901,7 @@ public static TableEntry getTableEntryLocationFromRemote(Connection connection,
for (int i = 0; i < epoch; i++) {
try {
int offset = i * MAX_TABLET_NUMS_EPOCH;
+ // // This code is executed only in version 3.x
String sql = genLocationSQLByOffset(tableEntry, offset, MAX_TABLET_NUMS_EPOCH);
ps = connection.prepareStatement(sql);
ps.setString(1, key.getTenantName());
@@ -802,8 +912,8 @@ public static TableEntry getTableEntryLocationFromRemote(Connection connection,
} catch (Exception e) {
RUNTIME.error(LCD.convert("01-00010"), key, partitionNum, tableEntry, e);
throw new ObTablePartitionLocationRefreshException(format(
- "fail to get partition location entry from remote entryKey = %s partNum = %d tableEntry =%s "
- + "offset =%d epoch =%d", key, partitionNum, tableEntry, i, epoch), e);
+ "fail to get partition location entry from remote entryKey = %s partNum = %d tableEntry =%s "
+ + "offset =%d epoch =%d", key, partitionNum, tableEntry, i, epoch), e);
} finally {
try {
if (null != rs) {
@@ -1081,6 +1191,63 @@ private static TableEntry getTableEntryFromResultSet(TableEntryKey key, ResultSe
return entry;
}
+ private static ObPartitionEntry getPartitionLocationFromResultSetByTablet(TableEntry tableEntry,
+ ResultSet rs,
+ ObPartitionEntry partitionEntry,
+ long tabletId)
+ throws SQLException,
+ ObTablePartitionLocationRefreshException {
+
+ if (partitionEntry == null || tableEntry == null) {
+ throw new IllegalArgumentException("partitionEntry: " + partitionEntry
+ + " tableEntry: " + tableEntry);
+ }
+
+ ObPartitionLocationInfo partitionLocationInfo = partitionEntry.getPartitionInfo(tabletId);
+
+ partitionLocationInfo.rwLock.writeLock().lock();
+ try {
+ while (rs.next()) {
+ ReplicaLocation replica = buildReplicaLocation(rs);
+
+ long partitionId = (ObGlobal.obVsnMajor() >= 4) ? rs.getLong("tablet_id") : rs
+ .getLong("partition_id");
+ long lsId = ObGlobal.obVsnMajor() >= 4 ? rs.getLong("ls_id") : INVALID_LS_ID;
+ if (rs.wasNull() && ObGlobal.obVsnMajor() >= 4) {
+ lsId = INVALID_LS_ID; // For non-partitioned table
+ }
+ partitionLocationInfo.setTabletLsId(lsId);
+
+ if (ObGlobal.obVsnMajor() < 4 && tableEntry.isPartitionTable()
+ && tableEntry.getPartitionInfo().getSubPartDesc() != null) {
+ partitionId = ObPartIdCalculator.getPartIdx(partitionId, tableEntry
+ .getPartitionInfo().getSubPartDesc().getPartNum());
+ }
+
+ if (!replica.isValid()) {
+ RUNTIME
+ .warn(format(
+ "Replica is invalid; continuing. Replica=%s, PartitionId/TabletId=%d, TableId=%d",
+ replica, partitionId, tableEntry.getTableId()));
+ continue;
+ }
+ ObPartitionLocation location = partitionLocationInfo.getPartitionLocation();
+ if (location == null) {
+ location = new ObPartitionLocation();
+ partitionLocationInfo.updateLocation(location);
+ }
+ location.addReplicaLocation(replica);
+
+ if (partitionLocationInfo.initialized.compareAndSet(false, true)) {
+ partitionLocationInfo.initializationLatch.countDown();
+ }
+ }
+ } finally {
+ partitionLocationInfo.rwLock.writeLock().unlock();
+ }
+ return partitionEntry;
+ }
+
private static ObPartitionEntry getPartitionLocationFromResultSet(TableEntry tableEntry,
ResultSet rs,
ObPartitionEntry partitionEntry)
diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableEntry.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableEntry.java
index 559c9e04..260a067c 100644
--- a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableEntry.java
+++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableEntry.java
@@ -23,9 +23,10 @@
import com.alipay.oceanbase.rpc.location.model.partition.ObPartitionLevel;
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
import static com.google.common.base.Preconditions.checkArgument;
@@ -53,7 +54,9 @@ public class TableEntry {
// partition location
private TableEntryKey tableEntryKey = null;
private volatile ObPartitionEntry partitionEntry = null;
-
+
+ public ConcurrentHashMap refreshLockMap = new ConcurrentHashMap<>();
+
/*
* Is valid.
*/
@@ -218,8 +221,6 @@ public void prepare() throws IllegalArgumentException {
checkArgument(partitionInfo != null, "partition table partition info is not ready. key"
+ tableEntryKey);
partitionInfo.prepare();
- checkArgument(partitionEntry != null,
- "partition table partition entry is not ready. key" + tableEntryKey);
}
}
diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionEntry.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionEntry.java
index d3e87e71..8b514d78 100644
--- a/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionEntry.java
+++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionEntry.java
@@ -21,13 +21,23 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
public class ObPartitionEntry {
private Map partitionLocation = new HashMap();
// mapping from tablet id to ls id, and the part id to tablet id mapping is in ObPartitionInfo
private Map tabletLsIdMap = new HashMap<>();
+
+ // tabelt id -> (PartitionLocation, LsId)
+ private ConcurrentHashMap partitionInfos = new ConcurrentHashMap<>();
+
+ public ObPartitionLocationInfo getPartitionInfo(long tabletId) {
+ return partitionInfos.computeIfAbsent(tabletId, id -> new ObPartitionLocationInfo());
+ }
+
public Map getPartitionLocation() {
return partitionLocation;
}
@@ -39,6 +49,16 @@ public void setPartitionLocation(Map partitionLocatio
this.partitionLocation = partitionLocation;
}
+ public Map getTabletLsIdMap() {
+ return tabletLsIdMap;
+ }
+
+ public void setTabletLsIdMap(Map tabletLsIdMap) {
+ this.tabletLsIdMap = tabletLsIdMap;
+ }
+
+ public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }
+
/*
* Get partition location with part id.
*/
@@ -86,14 +106,4 @@ public void prepareForWeakRead(ObServerLdcLocation ldcLocation) {
public String toString() {
return "ObPartitionEntry{" + "partitionLocation=" + partitionLocation + '}';
}
-
- public Map getTabletLsIdMap() {
- return tabletLsIdMap;
- }
-
- public void setTabletLsIdMap(Map tabletLsIdMap) {
- this.tabletLsIdMap = tabletLsIdMap;
- }
-
- public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }
}
diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionLocationInfo.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionLocationInfo.java
new file mode 100644
index 00000000..8b9181b9
--- /dev/null
+++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionLocationInfo.java
@@ -0,0 +1,64 @@
+/*-
+ * #%L
+ * com.oceanbase:obkv-table-client
+ * %%
+ * Copyright (C) 2021 - 2024 OceanBase
+ * %%
+ * OBKV Table Client Framework is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ * http://license.coscl.org.cn/MulanPSL2
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ * #L%
+ */
+
+package com.alipay.oceanbase.rpc.location.model.partition;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static com.alipay.oceanbase.rpc.protocol.payload.Constants.OB_INVALID_ID;
+
+public class ObPartitionLocationInfo {
+ private ObPartitionLocation partitionLocation = null;
+ private Long tabletLsId = OB_INVALID_ID;
+ private Long lastUpdateTime = 0L;
+ public ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+ public AtomicBoolean initialized = new AtomicBoolean(false);
+ public final CountDownLatch initializationLatch = new CountDownLatch(1);
+
+ public ObPartitionLocation getPartitionLocation() {
+ rwLock.readLock().lock();
+ try {
+ return partitionLocation;
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ public void updateLocation(ObPartitionLocation newLocation) {
+ this.partitionLocation = newLocation;
+ this.lastUpdateTime = System.currentTimeMillis();
+ }
+
+ public Long getTabletLsId() {
+ return tabletLsId;
+ }
+
+ public void setTabletLsId(Long tabletLsId) {
+ this.tabletLsId = tabletLsId;
+ }
+
+ public Long getLastUpdateTime() {
+ rwLock.readLock().lock();
+ try {
+ return lastUpdateTime;
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+}
diff --git a/src/main/java/com/alipay/oceanbase/rpc/property/Property.java b/src/main/java/com/alipay/oceanbase/rpc/property/Property.java
index abdfbfde..8ee42ebb 100644
--- a/src/main/java/com/alipay/oceanbase/rpc/property/Property.java
+++ b/src/main/java/com/alipay/oceanbase/rpc/property/Property.java
@@ -134,7 +134,7 @@ public enum Property {
NETTY_BLOCKING_WAIT_INTERVAL("bolt.netty.blocking.wait.interval", 1, "netty写缓存满后等待时间"),
// [ObTable][OTHERS]
- SERVER_ENABLE_REROUTING("server.enable.rerouting", true, "开启server端的重定向回复功能"),
+ SERVER_ENABLE_REROUTING("server.enable.rerouting", false, "开启server端的重定向回复功能"),
/*
* other config
diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java
index fbb79d79..349912f1 100644
--- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java
+++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java
@@ -358,7 +358,9 @@ public enum ResultCodes {
OB_CLUSTER_NO_MATCH(-4666), //
OB_CHECK_ZONE_MERGE_ORDER(-4667), //
OB_ERR_ZONE_NOT_EMPTY(-4668), //
- OB_USE_DUP_FOLLOW_AFTER_DML(-4686), OB_LS_NOT_EXIST(-4719), //
+ OB_USE_DUP_FOLLOW_AFTER_DML(-4686), //
+ OB_LS_NOT_EXIST(-4719), //
+ OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST(-4723), //
OB_TABLET_NOT_EXIST(-4725), //
OB_ERR_PARSER_INIT(-5000), //
OB_ERR_PARSE_SQL(-5001), //
diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java
index a11a8a67..5da49bbc 100644
--- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java
+++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java
@@ -17,11 +17,13 @@
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query;
+import com.alipay.oceanbase.rpc.ObGlobal;
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
import com.alipay.oceanbase.rpc.exception.*;
import com.alipay.oceanbase.rpc.location.model.ObReadConsistency;
import com.alipay.oceanbase.rpc.location.model.ObServerRoute;
+import com.alipay.oceanbase.rpc.location.model.TableEntry;
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
@@ -51,7 +53,6 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
protected volatile boolean closed = false;
protected volatile List row = null;
protected volatile int rowIndex = -1;
- // 调整它的startKey
protected ObTableQuery tableQuery;
protected long operationTimeout = -1;
protected String tableName;
@@ -324,10 +325,7 @@ public boolean next() throws Exception {
} catch (Exception e) {
if (e instanceof ObTableNeedFetchAllException) {
- // Adjust the start key and refresh the expectant
- this.tableQuery.adjustStartKey(currentStartKey);
setExpectant(refreshPartition(tableQuery, tableName));
-
// Reset the iterator to start over
it = expectant.entrySet().iterator();
referPartition.clear(); // Clear the referPartition if needed
@@ -362,7 +360,7 @@ public boolean next() throws Exception {
}
protected Map> buildPartitions(ObTableClient client, ObTableQuery tableQuery, String tableName) throws Exception {
- Map> partitionObTables = new HashMap<>();
+ Map> partitionObTables = new LinkedHashMap<>();
String indexName = tableQuery.getIndexName();
String indexTableName = null;
diff --git a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java
index b640fa71..cae7f39c 100644
--- a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java
+++ b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java
@@ -17,11 +17,13 @@
package com.alipay.oceanbase.rpc.stream;
+import com.alipay.oceanbase.rpc.ObGlobal;
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
import com.alipay.oceanbase.rpc.exception.ObTableException;
import com.alipay.oceanbase.rpc.exception.ObTableNeedFetchAllException;
import com.alipay.oceanbase.rpc.exception.ObTableRetryExhaustedException;
+import com.alipay.oceanbase.rpc.location.model.TableEntry;
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
@@ -84,8 +86,10 @@ public void init() throws Exception {
it = expectant.entrySet().iterator();
retryTimes++;
if (retryTimes > maxRetries) {
- RUNTIME.error("Fail to get refresh table entry response after {}", retryTimes);
- throw new ObTableRetryExhaustedException("Fail to get refresh table entry response after " + retryTimes);
+ RUNTIME.error("Fail to get refresh table entry response after {}",
+ retryTimes);
+ throw new ObTableRetryExhaustedException(
+ "Fail to get refresh table entry response after " + retryTimes);
}
} else {
@@ -199,11 +203,21 @@ public boolean next() throws Exception {
referToLastStreamResult(lastEntry.getValue());
} catch (Exception e) {
if (e instanceof ObTableNeedFetchAllException) {
- this.asyncRequest.getObTableQueryRequest().getTableQuery()
- .adjustStartKey(currentStartKey);
- setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
- .getTableQuery(), tableName));
- setEnd(true);
+
+ TableEntry entry = client.getOrRefreshTableEntry(tableName, false, false, false);
+ // Calculate the next partition only when the range partition is affected by a split, based on the keys already scanned.
+ if (ObGlobal.obVsnMajor() >= 4
+ && entry.isPartitionTable()
+ && entry.getPartitionInfo().getFirstPartDesc().getPartFuncType().isRangePart()) {
+ this.asyncRequest.getObTableQueryRequest().getTableQuery()
+ .adjustStartKey(currentStartKey);
+ setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
+ .getTableQuery(), tableName));
+ setEnd(true);
+ } else {
+ setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
+ .getTableQuery(), tableName));
+ }
} else {
throw e;
}
@@ -230,15 +244,22 @@ public boolean next() throws Exception {
referToNewPartition(entry.getValue());
} catch (Exception e) {
if (e instanceof ObTableNeedFetchAllException) {
- this.asyncRequest.getObTableQueryRequest().getTableQuery()
- .adjustStartKey(currentStartKey);
- setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
- .getTableQuery(), tableName));
+ TableEntry tableEntry = client.getOrRefreshTableEntry(tableName, false, false, false);
+ if (ObGlobal.obVsnMajor() >= 4
+ && tableEntry.isPartitionTable()
+ && tableEntry.getPartitionInfo().getFirstPartDesc().getPartFuncType().isRangePart()) {
+ this.asyncRequest.getObTableQueryRequest().getTableQuery()
+ .adjustStartKey(currentStartKey);
+ setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
+ .getTableQuery(), tableName));
+ }
it = expectant.entrySet().iterator();
retryTimes++;
if (retryTimes > client.getTableEntryRefreshTryTimes()) {
- RUNTIME.error("Fail to get refresh table entry response after {}", retryTimes);
- throw new ObTableRetryExhaustedException("Fail to get refresh table entry response after " + retryTimes);
+ RUNTIME.error("Fail to get refresh table entry response after {}",
+ retryTimes);
+ throw new ObTableRetryExhaustedException(
+ "Fail to get refresh table entry response after " + retryTimes);
}
continue;
} else {
diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java
index fdbce217..551782dd 100644
--- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java
+++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java
@@ -66,7 +66,7 @@ public class ObTable extends AbstractObTable implements Lifecycle {
private volatile boolean initialized = false;
private volatile boolean closed = false;
- private boolean reRouting = true; // only used for init packet factory
+ private boolean enableRerouting = true; // only used for init packet factory
private ReentrantLock statusLock = new ReentrantLock();
@@ -89,7 +89,7 @@ public void init() throws Exception {
.configWriteBufferWaterMark(getNettyBufferLowWatermark(),
getNettyBufferHighWatermark()).build();
connectionFactory.init(new ConnectionEventHandler(new GlobalSwitch())); // Only for monitoring connection status
- realClient = new ObTableRemoting(new ObPacketFactory(reRouting));
+ realClient = new ObTableRemoting(new ObPacketFactory(enableRerouting));
connectionPool = new ObTableConnectionPool(this, obTableConnectionPoolSize);
connectionPool.init();
initialized = true;
@@ -164,7 +164,7 @@ private void initProperties() {
nettyBufferHighWatermark);
nettyBlockingWaitInterval = parseToInt(NETTY_BLOCKING_WAIT_INTERVAL.getKey(),
nettyBlockingWaitInterval);
- reRouting = parseToBoolean(SERVER_ENABLE_REROUTING.getKey(), reRouting);
+ enableRerouting = parseToBoolean(SERVER_ENABLE_REROUTING.getKey(), enableRerouting);
maxConnExpiredTime = parseToLong(MAX_CONN_EXPIRED_TIME.getKey(), maxConnExpiredTime);
Object value = this.configs.get("runtime");
@@ -174,8 +174,8 @@ private void initProperties() {
}
}
- public boolean getReRouting(){
- return reRouting;
+ public boolean isEnableRerouting(){
+ return enableRerouting;
}
/*
diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java
index 5334eece..b73453c9 100644
--- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java
+++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java
@@ -20,6 +20,7 @@
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.exception.*;
import com.alipay.oceanbase.rpc.location.model.ObServerRoute;
+import com.alipay.oceanbase.rpc.location.model.TableEntry;
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
import com.alipay.oceanbase.rpc.mutation.result.*;
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
@@ -352,11 +353,12 @@ public void partitionExecute(ObTableOperationResult[] results,
if (failedServerList != null) {
route.setBlackList(failedServerList);
}
- ObTableParam newParam = obTableClient.getTableWithPartId(tableName,
- originPartId, needRefreshTableEntry,
- obTableClient.isTableEntryRefreshIntervalWait(), needFetchAllRouteInfo,
- route).getRight();
-
+ TableEntry entry = obTableClient.getOrRefreshTableEntry(tableName, false,
+ false, false);
+ obTableClient.refreshTableLocationByTabletId(entry, tableName, partId);
+ ObTableParam newParam = obTableClient.getTableWithPartId(tableName, partId,
+ false, obTableClient.isTableEntryRefreshIntervalWait(), needFetchAllRouteInfo, route)
+ .getRight();
subObTable = newParam.getObTable();
subRequest.setPartitionId(newParam.getPartitionId());
}
@@ -418,6 +420,7 @@ public void partitionExecute(ObTableOperationResult[] results,
tableName, partId, ((ObTableException) ex).getErrorCode(),
tryTimes, ex);
if (ex instanceof ObTableNeedFetchAllException) {
+ // refresh table info
obTableClient.getOrRefreshTableEntry(tableName, needRefreshTableEntry,
obTableClient.isTableEntryRefreshIntervalWait(), true);
throw ex;
@@ -444,7 +447,6 @@ public void partitionExecute(ObTableOperationResult[] results,
throw new ObTableUnexpectedException(
"check batch operation result error: client get unexpected NULL result");
}
-
List subObTableOperationResults = subObTableBatchOperationResult
.getResults();
@@ -512,16 +514,18 @@ private void executeWithRetries(ObTableOperationResult[] results, Map.Entry>>> currentPartitions = new HashMap<>();
currentPartitions.put(entry.getKey(), entry.getValue());
-
- while (retryCount < maxRetries && !success) {
+ int errCode = ResultCodes.OB_SUCCESS.errorCode;
+ String errMsg = null;
+ while (retryCount <= maxRetries && !success) {
boolean allPartitionsSuccess = true;
-
for (Map.Entry>>> currentEntry : currentPartitions.entrySet()) {
try {
partitionExecute(results, currentEntry);
} catch (Exception e) {
if (shouldRetry(e)) {
retryCount++;
+ errCode = ((ObTableNeedFetchAllException)e).getErrorCode();
+ errMsg = e.getMessage();
List failedOperations = extractOperations(currentEntry.getValue().getRight());
currentPartitions = prepareOperations(failedOperations);
allPartitionsSuccess = false;
@@ -538,7 +542,9 @@ private void executeWithRetries(ObTableOperationResult[] results, Map.Entry>>> tabletOperations
+ Map>>> tabletOperations
= lsOperationsMap.computeIfAbsent(lsId, k -> new HashMap<>());
- // if ls id not exists
+ // if ls id not exists
- ObPair>> singleOperations =
+ ObPair>> singleOperations =
tabletOperations.computeIfAbsent(tableObPair.getLeft(), k -> new ObPair<>(tableObPair.getRight(), new ArrayList<>()));
- // if tablet id not exists
-
- singleOperations.getRight().add(new ObPair<>(i, operation));
- }
+ // if tablet id not exists
+ singleOperations.getRight().add(new ObPair<>(i, operation));
+ }
- return lsOperationsMap;
+ return lsOperationsMap;
}
public Map>>>> partitionPrepare()
@@ -474,6 +474,9 @@ public void partitionExecute(ObTableSingleOpResult[] results,
if (failedServerList != null) {
route.setBlackList(failedServerList);
}
+ TableEntry entry = obTableClient.getOrRefreshTableEntry(tableName, false,
+ false, false);
+ obTableClient.refreshTableLocationByTabletId(entry, tableName, obTableClient.getTabletIdByPartId(entry, originPartId));
subObTable = obTableClient.getTableWithPartId(tableName, originPartId, needRefreshTableEntry,
obTableClient.isTableEntryRefreshIntervalWait(), false, route).
getRight().getObTable();
@@ -628,15 +631,18 @@ private void executeWithRetries(
Map>>>> currentPartitions = new HashMap<>();
currentPartitions.put(entry.getKey(), entry.getValue());
- while (retryCount < maxRetries && !success) {
+ int errCode = ResultCodes.OB_SUCCESS.errorCode;
+ String errMsg = null;
+ while (retryCount <= maxRetries && !success) {
boolean allPartitionsSuccess = true;
-
for (Map.Entry>>>> currentEntry : currentPartitions.entrySet()) {
try {
partitionExecute(results, currentEntry);
} catch (Exception e) {
if (shouldRetry(e)) {
retryCount++;
+ errCode = ((ObTableNeedFetchAllException)e).getErrorCode();
+ errMsg = e.getMessage();
List failedOperations = extractOperations(currentEntry.getValue());
currentPartitions = prepareOperations(failedOperations);
allPartitionsSuccess = false;
@@ -653,7 +659,9 @@ private void executeWithRetries(
}
if (!success) {
- throw new ObTableUnexpectedException("Failed to execute operation after retrying " + maxRetries + " times.");
+ errMsg = "Failed to execute operation after retrying " + maxRetries + " times. Last error Msg:" +
+ "[errCode="+ errCode +"] " + errMsg;
+ throw new ObTableUnexpectedException(errMsg);
}
}
@@ -740,7 +748,7 @@ public void doTask() {
// Execute sub-batch operation one by one
for (final Map.Entry>>>> entry : lsOperations
.entrySet()) {
- partitionExecute(obTableOperationResults, entry);
+ executeWithRetries(obTableOperationResults, entry, maxRetries);
}
}
diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java
index 46cf35a8..7ff2a8c9 100644
--- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java
+++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java
@@ -242,7 +242,7 @@ ObTableClientQueryAsyncStreamResult execute() throws Exception {
}
public Map> initPartitions(ObTableQuery tableQuery, String tableName) throws Exception {
- Map> partitionObTables = new HashMap<>();
+ Map> partitionObTables = new LinkedHashMap<>();
String indexName = tableQuery.getIndexName();
String indexTableName = null;
diff --git a/src/test/java/com/alipay/oceanbase/rpc/util/ByteUtilTest.java b/src/test/java/com/alipay/oceanbase/rpc/util/ByteUtilTest.java
new file mode 100644
index 00000000..92dae7dc
--- /dev/null
+++ b/src/test/java/com/alipay/oceanbase/rpc/util/ByteUtilTest.java
@@ -0,0 +1,85 @@
+package com.alipay.oceanbase.rpc.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static com.alipay.oceanbase.rpc.util.ByteUtil.compareByteArrays;
+import static org.junit.Assert.*;
+
+
+public class ByteUtilTest {
+ @Test
+ public void testcompareByteArrays() {
+ {
+ byte[] array1 = {1, 2, 3};
+ byte[] array2 = {1, 2, 3};
+ Assert.assertEquals(0, compareByteArrays(array1, array2));
+ }
+ {
+ byte[] array1 = {2, 2, 3};
+ byte[] array2 = {1, 2, 3};
+ Assert.assertTrue(compareByteArrays(array1, array2) > 0);
+ }
+ {
+ byte[] array1 = {1, 2, 3, 4};
+ byte[] array2 = {1, 2, 3};
+ assertTrue(compareByteArrays(array1, array2) > 0);
+ }
+ {
+ byte[] array1 = {};
+ byte[] array2 = {};
+ assertEquals(0, compareByteArrays(array1, array2));
+ }
+ }
+ @Test
+ public void testincrementByteArray() {
+ {
+ byte[] input = {0x01, 0x02, 0x03};
+ byte[] expected = {0x01, 0x02, 0x04};
+ assertArrayEquals(expected, ByteUtil.incrementByteArray(input));
+ }
+ {
+ byte[] input = {(byte) 0xFF, (byte) 0xFF};
+ byte[] expected = {0x01, 0x00, 0x00};
+ assertArrayEquals(expected, ByteUtil.incrementByteArray(input));
+ }
+ {
+ byte[] input = {};
+ byte[] expected = {0x01};
+ assertArrayEquals(expected, ByteUtil.incrementByteArray(input));
+ }
+ {
+ byte[] expected = {0x01};
+ assertArrayEquals(expected, ByteUtil.incrementByteArray(null));
+ }
+ }
+
+ @Test
+ public void testdecrementByteArray() {
+ {
+ byte[] input = {0x01};
+ byte[] expected = {0x00};
+ assertArrayEquals(expected, ByteUtil.decrementByteArray(input));
+ }
+ {
+ byte[] input = {0x01, 0x00};
+ byte[] expected = {0x00, (byte) 0xFF};
+ assertArrayEquals(expected, ByteUtil.decrementByteArray(input));
+ }
+ {
+ byte[] input = {0x02, 0x00};
+ byte[] expected = {0x01, (byte) 0xFF};
+ assertArrayEquals(expected, ByteUtil.decrementByteArray(input));
+ }
+ {
+ byte[] input = {0x01, 0x00, 0x00};
+ byte[] expected = {0x00, (byte) 0xFF, (byte) 0xFF};
+ assertArrayEquals(expected, ByteUtil.decrementByteArray(input));
+ }
+ {
+ byte[] input = {(byte) 0xFF, (byte) 0xFF};
+ byte[] expected = {(byte) 0xFF, (byte) 0xFE};
+ assertArrayEquals(expected, ByteUtil.decrementByteArray(input));
+ }
+ }
+}