diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java index 5da49bbc..d2d4ead9 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java @@ -271,6 +271,13 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger, return result; } + /* + * RenewLease. + */ + public void renewLease() throws Exception { + throw new IllegalStateException("renew only support stream query"); + } + /* * Next. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObQueryOperationType.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObQueryOperationType.java index 0f384754..b329e3ef 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObQueryOperationType.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObQueryOperationType.java @@ -21,7 +21,7 @@ import java.util.Map; public enum ObQueryOperationType { - QUERY_START(0), QUERY_NEXT(1), QUERY_END(2); + QUERY_START(0), QUERY_NEXT(1), QUERY_END(2), QUERY_RENEW(3); private int value; private static Map map = new HashMap(); diff --git a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java index cae7f39c..a526c47e 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java @@ -181,6 +181,32 @@ protected Map> refreshPartition(ObTableQuery ta return buildPartitions(client, tableQuery, tableName); } + // This function is designed for HBase-type requests. + // It is used to extend the session duration of a scan + @Override + public void renewLease() throws Exception { + if (!isEnd() && !expectant.isEmpty()) { + Iterator>> it = expectant.entrySet() + .iterator(); + Map.Entry> lastEntry = it.next(); + ObPair partIdWithObTable = lastEntry.getValue(); + // try access new partition, async will not remove useless expectant + ObTableParam obTableParam = partIdWithObTable.getRight(); + ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest(); + + // refresh request info + queryRequest.setPartitionId(obTableParam.getPartitionId()); + queryRequest.setTableId(obTableParam.getTableId()); + + // refresh async query request + asyncRequest.setQueryType(ObQueryOperationType.QUERY_RENEW); + asyncRequest.setQuerySessionId(sessionId); + executeAsync(partIdWithObTable, asyncRequest); + } else { + throw new ObTableException("query end or expectant is null"); + } + } + @Override public boolean next() throws Exception { checkStatus(); diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java index 5777c948..bb409328 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java @@ -17,9 +17,14 @@ package com.alipay.oceanbase.rpc.table; +import com.alipay.oceanbase.rpc.location.model.partition.ObPair; +import com.alipay.oceanbase.rpc.util.ObBytesString; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.List; + import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader; public class ObHBaseParams extends ObKVParamsBase { @@ -32,11 +37,17 @@ public class ObHBaseParams extends ObKVParamsBase { private static final int FLAG_ALLOW_PARTIAL_RESULTS = 1 << 0; private static final int FLAG_IS_CACHE_BLOCK = 1 << 1; private static final int FLAG_CHECK_EXISTENCE_ONLY = 1 << 2; + List>> timeRangeMap = new ArrayList<>(); + public ObHBaseParams() { pType = paramType.HBase; } + public void addFamilyTimeRange(ObBytesString family, long min, long max) { + timeRangeMap.add(new ObPair<>(family, new ObPair<>(min, max))); + } + public ObKVParamsBase.paramType getType() { return pType; } @@ -111,6 +122,21 @@ public byte[] encode() { System.arraycopy(booleansToByteArray(), 0, bytes, idx, 1); idx += 1; + int len = Serialization.getNeedBytes(timeRangeMap.size()); + System.arraycopy(Serialization.encodeVi64(timeRangeMap.size()), 0, bytes, idx, len); + idx += len; + for (ObPair> timeRange : timeRangeMap) { + len = Serialization.getNeedBytes(timeRange.getLeft()); + System.arraycopy(Serialization.encodeBytesString(timeRange.getLeft()), 0, bytes, idx, len); + idx += len; + len = Serialization.getNeedBytes(timeRange.getRight().getLeft()); + System.arraycopy(Serialization.encodeVi64(timeRange.getRight().getLeft()), 0, bytes, idx, len); + idx += len; + len = Serialization.getNeedBytes(timeRange.getRight().getRight()); + System.arraycopy(Serialization.encodeVi64(timeRange.getRight().getRight()), 0, bytes, idx, len); + idx += len; + } + return bytes; } @@ -125,19 +151,31 @@ public Object decode(ByteBuf buf) { caching = Serialization.decodeVi32(buf); callTimeout = Serialization.decodeVi32(buf); byteArrayToBooleans(buf); + long size = Serialization.decodeVi64(buf); + this.timeRangeMap = new ArrayList<>((int) size); + for (int i = 0; i < size; i++) { + this.timeRangeMap.add(new ObPair<>(Serialization.decodeBytesString(buf), new ObPair<>(Serialization.decodeVi64(buf), Serialization.decodeVi64(buf)))); + } return this; } public long getPayloadContentSize() { - return 1 + Serialization.getNeedBytes(caching) + Serialization.getNeedBytes(callTimeout) - + 1; // all boolean to one byte + long contentSize = 1 + Serialization.getNeedBytes(caching) + Serialization.getNeedBytes(callTimeout) + + 1 // all boolean to one byte + + Serialization.getNeedBytes(timeRangeMap.size()); + for (ObPair> timeRange : timeRangeMap) { + contentSize += Serialization.getNeedBytes(timeRange.getLeft()); + contentSize += Serialization.getNeedBytes(timeRange.getRight().getLeft()); + contentSize += Serialization.getNeedBytes(timeRange.getRight().getRight()); + } + return contentSize; } public String toString() { return "ObParams: {\n pType = " + pType + ", \n caching = " + caching + ", \n callTimeout = " + callTimeout + ", \n allowPartialResult = " + allowPartialResults + ", \n isCacheBlock = " + isCacheBlock - + ", \n checkExistenceOnly = " + checkExistenceOnly + "\n}\n"; + + ", \n checkExistenceOnly = " + checkExistenceOnly + ", \n timeRangeMap = " + timeRangeMap + "\n}\n"; } }