Skip to content

Commit 7693544

Browse files
committed
QueryAndMutate retry
1 parent 2056891 commit 7693544

File tree

1 file changed

+105
-40
lines changed

1 file changed

+105
-40
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 105 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.alipay.oceanbase.rpc.mutation.*;
2828
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
2929
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
30+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
3031
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
3132
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
3233
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregation;
@@ -2997,36 +2998,36 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
29972998
if (request instanceof ObTableOperationRequest) {
29982999
ObTableBatchOperation batchOperation = new ObTableBatchOperation();
29993000
batchOperation.addTableOperation(((ObTableOperationRequest) request)
3000-
.getTableOperation());
3001+
.getTableOperation());
30013002
ObTableClientBatchOpsImpl batchOps = new ObTableClientBatchOpsImpl(
3002-
request.getTableName(), batchOperation, this);
3003+
request.getTableName(), batchOperation, this);
30033004
batchOps.setEntityType(request.getEntityType());
30043005
ObTableBatchOperationResult batchOpsResult = new ObClusterTableBatchOps(batchOps)
3005-
.executeInternal();
3006+
.executeInternal();
30063007
return batchOpsResult.getResults().get(0);
30073008
} else if (request instanceof ObTableQueryRequest) {
30083009
// TableGroup -> TableName
30093010
String tableName = request.getTableName();
30103011
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
3011-
((ObTableQueryRequest) request).getTableQuery(), this);
3012+
((ObTableQueryRequest) request).getTableQuery(), this);
30123013
tableQuery.setEntityType(request.getEntityType());
30133014
return new ObClusterTableQuery(tableQuery).executeInternal();
30143015
} else if (request instanceof ObTableQueryAsyncRequest) {
30153016
// TableGroup -> TableName
30163017
String tableName = request.getTableName();
30173018
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
3018-
((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery(), this);
3019+
((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery(), this);
30193020
tableQuery.setEntityType(request.getEntityType());
30203021
return new ObClusterTableQuery(tableQuery).asyncExecuteInternal();
30213022
} else if (request instanceof ObTableBatchOperationRequest) {
30223023
ObTableClientBatchOpsImpl batchOps = new ObTableClientBatchOpsImpl(
3023-
request.getTableName(),
3024-
((ObTableBatchOperationRequest) request).getBatchOperation(), this);
3024+
request.getTableName(),
3025+
((ObTableBatchOperationRequest) request).getBatchOperation(), this);
30253026
batchOps.setEntityType(request.getEntityType());
30263027
return new ObClusterTableBatchOps(runtimeBatchExecutor, batchOps).executeInternal();
30273028
} else if (request instanceof ObTableQueryAndMutateRequest) {
30283029
ObTableQueryAndMutate tableQueryAndMutate = ((ObTableQueryAndMutateRequest) request)
3029-
.getTableQueryAndMutate();
3030+
.getTableQueryAndMutate();
30303031
ObTableQuery tableQuery = tableQueryAndMutate.getTableQuery();
30313032
// fill a whole range if no range is added explicitly.
30323033
if (tableQuery.getKeyRanges().isEmpty()) {
@@ -3036,47 +3037,111 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
30363037
request.setTimeout(getOdpTable().getObTableOperationTimeout());
30373038
return getOdpTable().execute(request);
30383039
} else {
3040+
int maxRetries = getRuntimeRetryTimes(); // Define the maximum number of retries
3041+
int tryTimes = 0;
3042+
long startExecute = System.currentTimeMillis();
3043+
boolean needRefreshTableEntry = false;
30393044
Map<Long, ObTableParam> partIdMapObTable = new HashMap<Long, ObTableParam>();
3040-
for (ObNewRange rang : tableQuery.getKeyRanges()) {
3041-
ObRowKey startKey = rang.getStartKey();
3042-
int startKeySize = startKey.getObjs().size();
3043-
ObRowKey endKey = rang.getEndKey();
3044-
int endKeySize = endKey.getObjs().size();
3045-
Object[] start = new Object[startKeySize];
3046-
Object[] end = new Object[endKeySize];
3047-
for (int i = 0; i < startKeySize; i++) {
3048-
start[i] = startKey.getObj(i).getValue();
3045+
while (true) {
3046+
long currentExecute = System.currentTimeMillis();
3047+
long costMillis = currentExecute - startExecute;
3048+
if (costMillis > getRuntimeMaxWait()) {
3049+
logger.error(
3050+
"tablename:{} it has tried " + tryTimes
3051+
+ " times and it has waited " + costMillis
3052+
+ "/ms which exceeds response timeout "
3053+
+ getRuntimeMaxWait() + "/ms", request.getTableName());
3054+
throw new ObTableTimeoutExcetion("it has tried " + tryTimes
3055+
+ " times and it has waited " + costMillis
3056+
+ "/ms which exceeds response timeout "
3057+
+ getRuntimeMaxWait() + "/ms");
30493058
}
3059+
try {
3060+
// Recalculate partIdMapObTable
3061+
// Clear the map before recalculating
3062+
partIdMapObTable.clear();
3063+
for (ObNewRange rang : tableQuery.getKeyRanges()) {
3064+
ObRowKey startKey = rang.getStartKey();
3065+
int startKeySize = startKey.getObjs().size();
3066+
ObRowKey endKey = rang.getEndKey();
3067+
int endKeySize = endKey.getObjs().size();
3068+
Object[] start = new Object[startKeySize];
3069+
Object[] end = new Object[endKeySize];
3070+
for (int i = 0; i < startKeySize; i++) {
3071+
ObObj curStart = startKey.getObj(i);
3072+
if (curStart.isMinObj()) {
3073+
start[i] = curStart;
3074+
} else {
3075+
start[i] = curStart.getValue();
3076+
}
3077+
}
30503078

3051-
for (int i = 0; i < endKeySize; i++) {
3052-
end[i] = endKey.getObj(i).getValue();
3053-
}
3054-
ObBorderFlag borderFlag = rang.getBorderFlag();
3055-
List<ObPair<Long, ObTableParam>> pairList = getTables(request.getTableName(),
3056-
tableQuery, start, borderFlag.isInclusiveStart(), end,
3057-
borderFlag.isInclusiveEnd(), false, false);
3058-
for (ObPair<Long, ObTableParam> pair : pairList) {
3059-
partIdMapObTable.put(pair.getLeft(), pair.getRight());
3060-
}
3061-
}
3062-
if (partIdMapObTable.size() > 1) {
3063-
throw new ObTablePartitionConsistentException(
3064-
"query and mutate must be a atomic operation");
3065-
}
3079+
for (int i = 0; i < endKeySize; i++) {
3080+
ObObj curEnd = endKey.getObj(i);
3081+
if (curEnd.isMaxObj()) {
3082+
end[i] = curEnd;
3083+
} else {
3084+
end[i] = curEnd.getValue();
3085+
}
3086+
}
3087+
ObBorderFlag borderFlag = rang.getBorderFlag();
3088+
List<ObPair<Long, ObTableParam>> pairList = getTables(request.getTableName(),
3089+
tableQuery, start, borderFlag.isInclusiveStart(), end,
3090+
borderFlag.isInclusiveEnd(), needRefreshTableEntry, isTableEntryRefreshIntervalWait());
3091+
for (ObPair<Long, ObTableParam> pair : pairList) {
3092+
partIdMapObTable.put(pair.getLeft(), pair.getRight());
3093+
}
3094+
}
30663095

3067-
for (Long partId : partIdMapObTable.keySet()) {
3068-
ObTableParam tableParam = partIdMapObTable.get(partId);
3069-
request.setTableId(tableParam.getTableId());
3070-
request.setPartitionId(tableParam.getPartitionId());
3071-
request.setTimeout(tableParam.getObTable().getObTableOperationTimeout());
3072-
ObTable obTable = tableParam.getObTable();
3073-
return executeWithRetry(obTable, request, request.getTableName());
3096+
// Check if partIdMapObTable size is greater than 1
3097+
if (partIdMapObTable.size() > 1) {
3098+
throw new ObTablePartitionConsistentException(
3099+
"query and mutate must be a atomic operation");
3100+
}
3101+
// Proceed with the operation
3102+
Map.Entry<Long, ObTableParam> entry = partIdMapObTable.entrySet().iterator().next();
3103+
ObTableParam tableParam = entry.getValue();
3104+
request.setTableId(tableParam.getTableId());
3105+
request.setPartitionId(tableParam.getPartitionId());
3106+
request.setTimeout(tableParam.getObTable().getObTableOperationTimeout());
3107+
ObTable obTable = tableParam.getObTable();
3108+
3109+
// Attempt to execute the operation
3110+
return executeWithRetry(obTable, request, request.getTableName());
3111+
} catch (Exception ex) {
3112+
tryTimes++;
3113+
if (ex instanceof ObTableException && ((ObTableException) ex).isNeedRefreshTableEntry()) {
3114+
needRefreshTableEntry = true;
3115+
logger.warn(
3116+
"tablename:{} partition id:{} batch ops refresh table while meet ObTableMasterChangeException, errorCode: {}",
3117+
request.getTableName(), request.getPartitionId(), ((ObTableException) ex).getErrorCode(), ex);
3118+
3119+
if (isRetryOnChangeMasterTimes() && tryTimes <= maxRetries) {
3120+
logger.warn(
3121+
"tablename:{} partition id:{} batch ops retry while meet ObTableMasterChangeException, errorCode: {} , retry times {}",
3122+
request.getTableName(), request.getPartitionId(), ((ObTableException) ex).getErrorCode(),
3123+
tryTimes, ex);
3124+
3125+
if (ex instanceof ObTableNeedFetchAllException) {
3126+
// Refresh table info
3127+
getOrRefreshTableEntry(request.getTableName(), needRefreshTableEntry, isTableEntryRefreshIntervalWait(), true);
3128+
}
3129+
} else {
3130+
calculateContinuousFailure(request.getTableName(), ex.getMessage());
3131+
throw ex;
3132+
}
3133+
} else {
3134+
calculateContinuousFailure(request.getTableName(), ex.getMessage());
3135+
// Handle other exceptions or rethrow
3136+
throw ex;
3137+
}
3138+
}
30743139
}
30753140
}
30763141
}
30773142

30783143
throw new FeatureNotSupportedException("request type " + request.getClass().getSimpleName()
3079-
+ "is not supported. make sure the correct version");
3144+
+ "is not supported. make sure the correct version");
30803145
}
30813146

30823147
private ObTableQueryAndMutate buildObTableQueryAndMutate(ObTableQuery obTableQuery,

0 commit comments

Comments
 (0)