From e6a24284b95ddd706de63e2dee269fbfeffd9ade Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 25 Mar 2025 10:49:39 +0800 Subject: [PATCH 1/3] fix hbase tablegroup operations being hold for a long time --- .../alipay/oceanbase/rpc/ObTableClient.java | 6 ++ .../oceanbase/rpc/location/LocationUtil.java | 13 ++-- .../table/ObTableClientLSBatchOpsImpl.java | 39 ++++++++-- .../rpc/table/ObTableClientQueryImpl.java | 74 +++++++++++++------ 4 files changed, 97 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index 58238b2c..12956c12 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -1993,6 +1993,9 @@ public ObPair getTableInternal(String tableName, TableEntry ObPartitionLocationInfo obPartitionLocationInfo = null; if (ObGlobal.obVsnMajor() >= 4) { obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId); + if (obPartitionLocationInfo.getPartitionLocation() == null) { + throw new ObTableNotExistException("partition location is null after refresh, table: { " + tableName + " } may not exist"); + } replica = getPartitionLocation(obPartitionLocationInfo, route); /** * Normally, getOrRefreshPartitionInfo makes sure that a thread only continues if it finds the leader @@ -2145,6 +2148,9 @@ private List> getPartitionReplica(TableEntry table for (Long partId : partIds) { long tabletId = getTabletIdByPartId(tableEntry, partId); ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId); + if (locationInfo.getPartitionLocation() == null) { + throw new ObTableNotExistException("partition location is null after refresh, table: { " + tableName + " } may not exist"); + } replicas.add(new ObPair<>(partId, getPartitionLocation(locationInfo, route))); } } else { diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java index add11758..2912e8ba 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java @@ -1294,19 +1294,20 @@ private static ObPartitionEntry getPartitionLocationFromResultSetByTablet(TableE } location.addReplicaLocation(replica); - if (location.getLeader() != null && partitionLocationInfo.initialized.compareAndSet(false, true)) { - partitionLocationInfo.initializationLatch.countDown(); + if (location.getLeader() != null) { + partitionLocationInfo.initialized.compareAndSet(false, true); } else if (rs.isLast() && location.getLeader() == null) { partitionLocationInfo.initializationLatch.countDown(); RUNTIME.error(LCD.convert("01-00028"), partitionId, partitionEntry, tableEntry); RUNTIME.error(format( - "partition=%d has no leader partitionEntry=%s original tableEntry=%s", - partitionId, partitionEntry, tableEntry)); + "partition=%d has no leader partitionEntry=%s original tableEntry=%s", + partitionId, partitionEntry, tableEntry)); throw new ObTablePartitionNoMasterException(format( - "partition=%d has no leader partitionEntry=%s original tableEntry=%s", - partitionId, partitionEntry, tableEntry)); + "partition=%d has no leader partitionEntry=%s original tableEntry=%s", + partitionId, partitionEntry, tableEntry)); } } + partitionLocationInfo.initializationLatch.countDown(); return partitionEntry; } diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java index e7a588e4..2df0b2d3 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java @@ -300,7 +300,7 @@ public void addOperation(Get get) throws Exception { Object[] rowKeyValues = get.getRowKey().getValues(); String[] propertiesNames = get.getSelectColumns(); ObTableSingleOpEntity entity = ObTableSingleOpEntity.getInstance(rowKeyNames, rowKeyValues, - propertiesNames, null); + propertiesNames, null); ObTableSingleOp singleOp = new ObTableSingleOp(); singleOp.setSingleOpType(ObTableOperationType.GET); singleOp.addEntity(entity); @@ -344,8 +344,8 @@ public List executeWithResult() throws Exception { } } else { results.add(ExceptionUtil.convertToObTableException(result.getExecuteHost(), - result.getExecutePort(), result.getSequence(), result.getUniqueId(), errCode, - result.getHeader().getErrMsg())); + result.getExecutePort(), result.getSequence(), result.getUniqueId(), errCode, + result.getHeader().getErrMsg())); } } return results; @@ -393,16 +393,33 @@ public Map tableObPair= obTableClient.getTable(real_tableName, rowKey, - false, false, obTableClient.getRoute(false)); - long lsId = tableObPair.getRight().getLsId(); + ObPair tableObPair = null; + long lsId = INVALID_LS_ID; + try { + tableObPair = obTableClient.getTable(real_tableName, rowKey, + false, false, obTableClient.getRoute(false)); + lsId = tableObPair.getRight().getLsId(); + } catch (ObTableNotExistException e) { + if (this.entityType == ObTableEntityType.HKV + && obTableClient.isTableGroupName(tableName) + && obTableClient.getTableGroupInverted().get(real_tableName) != null) { + obTableClient.eraseTableGroupFromCache(tableName); + real_tableName = obTableClient.tryGetTableNameFromTableGroupCache(tableName, true); + tableObPair = obTableClient.getTable(real_tableName, rowKey, + false, false, obTableClient.getRoute(false)); + lsId = tableObPair.getRight().getLsId(); + } else { + throw e; + } + } Map>>> tabletOperations = lsOperationsMap.computeIfAbsent(lsId, k -> new HashMap<>()); // if ls id not exists + ObPair finalTableObPair = tableObPair; ObPair>> singleOperations = - tabletOperations.computeIfAbsent(tableObPair.getLeft(), k -> new ObPair<>(tableObPair.getRight(), new ArrayList<>())); + tabletOperations.computeIfAbsent(tableObPair.getLeft(), k -> new ObPair<>(finalTableObPair.getRight(), new ArrayList<>())); // if tablet id not exists singleOperations.getRight().add(operationsWithIndex.get(i)); } @@ -565,6 +582,14 @@ public void partitionExecute(ObTableSingleOpResult[] results, } else if (ex instanceof ObTableException && ((ObTableException) ex).isNeedRefreshTableEntry()) { needRefreshTableEntry = true; + if (((ObTableException) ex).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode + && obTableClient.isTableGroupName(tableName) + && obTableClient.getTableGroupInverted().get(realTableName) != null) { + // TABLE_NOT_EXIST + tableName is tableGroup + TableGroup cache is not empty + // means tableGroupName cache need to refresh + obTableClient.eraseTableGroupFromCache(tableName); + realTableName = obTableClient.tryGetTableNameFromTableGroupCache(tableName, true); + } if (obTableClient.isRetryOnChangeMasterTimes() && (tryTimes - 1) < obTableClient.getRuntimeRetryTimes()) { if (ex instanceof ObTableNeedFetchAllException) { diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java index b0f4add7..b9e00390 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java @@ -21,6 +21,8 @@ import com.alipay.oceanbase.rpc.ObTableClient; import com.alipay.oceanbase.rpc.exception.FeatureNotSupportedException; import com.alipay.oceanbase.rpc.exception.ObTableException; +import com.alipay.oceanbase.rpc.exception.ObTableNotExistException; +import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException; import com.alipay.oceanbase.rpc.location.model.partition.ObPair; import com.alipay.oceanbase.rpc.mutation.Row; import com.alipay.oceanbase.rpc.protocol.payload.ObPayload; @@ -133,12 +135,13 @@ public void checkArgumentBeforeExec() throws Exception { throw new IllegalArgumentException("table name is null"); } else if (tableQuery.isFTSQuery()) { if (!ObGlobal.isFtsQuerySupport()) { - throw new FeatureNotSupportedException("full text query is not supported in "+ObGlobal.obVsnString()); + throw new FeatureNotSupportedException("full text query is not supported in " + + ObGlobal.obVsnString()); } if (tableQuery.getIndexName() == null || tableQuery.getIndexName().isEmpty() - || tableQuery.getIndexName().equalsIgnoreCase("primary")) { + || tableQuery.getIndexName().equalsIgnoreCase("primary")) { throw new IllegalArgumentException( - "use fulltext search but specified index name is not fulltext index"); + "use fulltext search but specified index name is not fulltext index"); } } } @@ -176,31 +179,27 @@ private AbstractQueryStreamResult commonExecute(InitQueryResultCallback odpTable = obTableClient.getODPTableWithPartId( - realTableName, getPartId(), false); + tableName, getPartId(), false); partitionObTables.put(odpTable.getLeft(), odpTable); } catch (Exception e) { if (e instanceof ObTableException) { if (((ObTableException) e).getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode) { // current ODP version does not support get partition meta information - throw new FeatureNotSupportedException("current ODP version does not support query with part id", e); + throw new FeatureNotSupportedException( + "current ODP version does not support query with part id", e); } else if (((ObTableException) e).getErrorCode() == ResultCodes.OB_ERR_KV_ROUTE_ENTRY_EXPIRE.errorCode) { // retry one time with force-renew flag - ObPair odpTable = obTableClient.getODPTableWithPartId( - realTableName, getPartId(), true); + ObPair odpTable = obTableClient + .getODPTableWithPartId(tableName, getPartId(), true); partitionObTables.put(odpTable.getLeft(), odpTable); } else { throw e; @@ -211,7 +210,7 @@ private AbstractQueryStreamResult commonExecute(InitQueryResultCallback(0L, new ObTableParam( - obTableClient.getOdpTable()))); + obTableClient.getOdpTable()))); } } else { if (getPartId() == null) { @@ -224,9 +223,26 @@ private AbstractQueryStreamResult commonExecute(InitQueryResultCallback table = obTableClient.getTableWithPartId(indexTableName, - getPartId(), false, false, false, obTableClient.getRoute(false)); - partitionObTables.put(table.getLeft(), table); + try { + ObPair table = obTableClient.getTableWithPartId( + indexTableName, getPartId(), false, false, false, + obTableClient.getRoute(false)); + partitionObTables.put(table.getLeft(), table); + } catch (ObTableNotExistException e) { + if (this.entityType == ObTableEntityType.HKV + && obTableClient.isTableGroupName(tableName) + && obTableClient.getTableGroupInverted().get(indexTableName) != null) { + obTableClient.eraseTableGroupFromCache(indexTableName); + indexTableName = obTableClient.tryGetTableNameFromTableGroupCache( + tableName, true); + ObPair table = obTableClient.getTableWithPartId( + indexTableName, getPartId(), false, false, false, + obTableClient.getRoute(false)); + partitionObTables.put(table.getLeft(), table); + } else { + throw e; + } + } } } @@ -318,8 +334,22 @@ public Map> initPartitions(ObTableQuery tableQu } ObBorderFlag borderFlag = range.getBorderFlag(); // pairs -> List> - List> pairs = this.obTableClient.getTables(indexTableName, tableQuery, start, - borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(), false, false); + List> pairs = null; + try { + pairs = this.obTableClient.getTables(indexTableName, tableQuery, start, + borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(), false, false); + } catch (ObTableNotExistException e) { + if (this.entityType == ObTableEntityType.HKV + && obTableClient.isTableGroupName(tableName) + && obTableClient.getTableGroupInverted().get(indexTableName) != null) { + obTableClient.eraseTableGroupFromCache(indexTableName); + indexTableName = obTableClient.tryGetTableNameFromTableGroupCache(tableName, true); + pairs = this.obTableClient.getTables(indexTableName, tableQuery, start, + borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(), false, false); + } else { + throw e; + } + } if (tableQuery.getScanOrder() == ObScanOrder.Reverse) { for (int i = pairs.size() - 1; i >= 0; i--) { partitionObTables.put(pairs.get(i).getLeft(), pairs.get(i)); From 739cdd4a8badd4c08530096f12b7b6195e912cb8 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Wed, 26 Mar 2025 18:16:01 +0800 Subject: [PATCH 2/3] fix same tablegroup caching table dropped and request with same tabletId bug --- .../query/AbstractQueryStreamResult.java | 34 +++++++++++-------- .../table/ObTableClientLSBatchOpsImpl.java | 3 +- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java index 4e070870..083af7fd 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java @@ -71,8 +71,8 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen private ObReadConsistency readConsistency = ObReadConsistency.STRONG; // ObRowKey objs: [startKey, MIN_OBJECT, MIN_OBJECT] public List currentStartKey; - protected ObTableClient client; - + protected ObTableClient client; + /* * Get pcode. */ @@ -146,8 +146,10 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger, route.setBlackList(failedServerList); } if (ObGlobal.obVsnMajor() >= 4) { - TableEntry tableEntry = client.getOrRefreshTableEntry(indexTableName, false, false, false); - client.refreshTableLocationByTabletId(tableEntry, indexTableName, client.getTabletIdByPartId(tableEntry, partIdWithIndex.getLeft())); + TableEntry tableEntry = client.getOrRefreshTableEntry(indexTableName, + false, false, false); + client.refreshTableLocationByTabletId(tableEntry, indexTableName, + client.getTabletIdByPartId(tableEntry, partIdWithIndex.getLeft())); } subObTable = client @@ -239,12 +241,15 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger, } } else if (e instanceof ObTableException) { if ((((ObTableException) e).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode || ((ObTableException) e) - .getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode) + .getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode || ((ObTableException) e) + .getErrorCode() == ResultCodes.OB_SCHEMA_ERROR.errorCode) && ((request instanceof ObTableQueryAsyncRequest && ((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery().isHbaseQuery()) || (request instanceof ObTableQueryRequest && ((ObTableQueryRequest) request).getTableQuery().isHbaseQuery())) && client.getTableGroupInverted().get(indexTableName) != null) { // table not exists && hbase mode && table group exists , three condition both client.eraseTableGroupFromCache(tableName); + indexTableName = client.tryGetTableNameFromTableGroupCache(tableName, + true); } if (((ObTableException) e).isNeedRefreshTableEntry()) { needRefreshTableEntry = true; @@ -256,11 +261,10 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger, throw e; } } else { - String logMessage = String.format( + String logMessage = String + .format( "exhaust retry while meet NeedRefresh Exception, table name: %s, batch ops refresh table, errorCode: %d", - indexTableName, - ((ObTableException) e).getErrorCode() - ); + indexTableName, ((ObTableException) e).getErrorCode()); logger.warn(logMessage, e); client.calculateContinuousFailure(indexTableName, e.getMessage()); throw new ObTableRetryExhaustedException(logMessage, e); @@ -434,7 +438,8 @@ protected void checkStatus() throws IllegalStateException { } if (closed) { - throw new IllegalStateException("table " + indexTableName + " query stream result is closed"); + throw new IllegalStateException("table " + indexTableName + + " query stream result is closed"); } } @@ -565,7 +570,7 @@ public void init() throws Exception { if (tableQuery.getBatchSize() == -1) { if (!expectant.isEmpty()) { Iterator>> it = expectant.entrySet() - .iterator(); + .iterator(); int retryTimes = 0; while (it.hasNext()) { Map.Entry> entry = it.next(); @@ -579,10 +584,11 @@ public void init() throws Exception { retryTimes++; if (retryTimes > client.getRuntimeRetryTimes()) { RUNTIME.error("Fail to get refresh table entry response after {}", - retryTimes); + retryTimes); throw new ObTableRetryExhaustedException( - "Fail to get refresh table entry response after " + retryTimes + - "errorCode:" + ((ObTableNeedFetchAllException) e).getErrorCode()); + "Fail to get refresh table entry response after " + retryTimes + + "errorCode:" + + ((ObTableNeedFetchAllException) e).getErrorCode()); } } else { diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java index 2df0b2d3..49b7e49a 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java @@ -582,7 +582,8 @@ public void partitionExecute(ObTableSingleOpResult[] results, } else if (ex instanceof ObTableException && ((ObTableException) ex).isNeedRefreshTableEntry()) { needRefreshTableEntry = true; - if (((ObTableException) ex).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode + if ((((ObTableException) ex).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode || + ((ObTableException) ex).getErrorCode() == ResultCodes.OB_SCHEMA_ERROR.errorCode) && obTableClient.isTableGroupName(tableName) && obTableClient.getTableGroupInverted().get(realTableName) != null) { // TABLE_NOT_EXIST + tableName is tableGroup + TableGroup cache is not empty From 5c22891ecbcf0821964252c577c678fa9a8baecf Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Thu, 27 Mar 2025 11:58:57 +0800 Subject: [PATCH 3/3] fix multi-cf operations have not existed table go into dead loop --- .../query/AbstractQueryStreamResult.java | 19 ++++++++++++------- .../table/ObTableClientLSBatchOpsImpl.java | 7 ++++++- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java index 083af7fd..a9b11633 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java @@ -240,16 +240,21 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger, throw e; } } else if (e instanceof ObTableException) { - if ((((ObTableException) e).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode || ((ObTableException) e) - .getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode || ((ObTableException) e) - .getErrorCode() == ResultCodes.OB_SCHEMA_ERROR.errorCode) - && ((request instanceof ObTableQueryAsyncRequest && ((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery().isHbaseQuery()) - || (request instanceof ObTableQueryRequest && ((ObTableQueryRequest) request).getTableQuery().isHbaseQuery())) + if ((((ObTableException) e).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode + || ((ObTableException) e).getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode || ((ObTableException) e) + .getErrorCode() == ResultCodes.OB_SCHEMA_ERROR.errorCode) + && ((request instanceof ObTableQueryAsyncRequest && ((ObTableQueryAsyncRequest) request) + .getObTableQueryRequest().getTableQuery().isHbaseQuery()) || (request instanceof ObTableQueryRequest && ((ObTableQueryRequest) request) + .getTableQuery().isHbaseQuery())) && client.getTableGroupInverted().get(indexTableName) != null) { // table not exists && hbase mode && table group exists , three condition both client.eraseTableGroupFromCache(tableName); - indexTableName = client.tryGetTableNameFromTableGroupCache(tableName, - true); + String newIndexTableName = client.tryGetTableNameFromTableGroupCache(tableName, true); + if (indexTableName.equalsIgnoreCase(newIndexTableName)) { + throw new ObTableNotExistException("multi column-family operations contain not existed table name", ResultCodes.OB_ERR_UNKNOWN_TABLE.errorCode); + } else { + indexTableName = newIndexTableName; + } } if (((ObTableException) e).isNeedRefreshTableEntry()) { needRefreshTableEntry = true; diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java index 49b7e49a..c3b69ada 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java @@ -589,7 +589,12 @@ public void partitionExecute(ObTableSingleOpResult[] results, // TABLE_NOT_EXIST + tableName is tableGroup + TableGroup cache is not empty // means tableGroupName cache need to refresh obTableClient.eraseTableGroupFromCache(tableName); - realTableName = obTableClient.tryGetTableNameFromTableGroupCache(tableName, true); + String newRealTableName = obTableClient.tryGetTableNameFromTableGroupCache(tableName, true); + if (realTableName.equalsIgnoreCase(newRealTableName)) { + throw new ObTableNotExistException("multi column-family operations contain not existed table name", ResultCodes.OB_ERR_UNKNOWN_TABLE.errorCode); + } else { + realTableName = newRealTableName; + } } if (obTableClient.isRetryOnChangeMasterTimes() && (tryTimes - 1) < obTableClient.getRuntimeRetryTimes()) {