From 1381fa5508840789070ef7c0867120bc57e27525 Mon Sep 17 00:00:00 2001 From: stuBirdFly <84010733+stuBirdFly@users.noreply.github.com> Date: Wed, 6 Nov 2024 11:05:01 +0800 Subject: [PATCH 1/5] support hbase scan renewLease (#211) --- .../query/AbstractQueryStreamResult.java | 7 +++++ .../syncquery/ObQueryOperationType.java | 2 +- .../ObTableClientQueryAsyncStreamResult.java | 26 +++++++++++++++++++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java index 5da49bbc..d2d4ead9 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java @@ -271,6 +271,13 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger, return result; } + /* + * RenewLease. + */ + public void renewLease() throws Exception { + throw new IllegalStateException("renew only support stream query"); + } + /* * Next. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObQueryOperationType.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObQueryOperationType.java index 0f384754..b329e3ef 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObQueryOperationType.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObQueryOperationType.java @@ -21,7 +21,7 @@ import java.util.Map; public enum ObQueryOperationType { - QUERY_START(0), QUERY_NEXT(1), QUERY_END(2); + QUERY_START(0), QUERY_NEXT(1), QUERY_END(2), QUERY_RENEW(3); private int value; private static Map map = new HashMap(); diff --git a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java index cae7f39c..a526c47e 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java @@ -181,6 +181,32 @@ protected Map> refreshPartition(ObTableQuery ta return buildPartitions(client, tableQuery, tableName); } + // This function is designed for HBase-type requests. + // It is used to extend the session duration of a scan + @Override + public void renewLease() throws Exception { + if (!isEnd() && !expectant.isEmpty()) { + Iterator>> it = expectant.entrySet() + .iterator(); + Map.Entry> lastEntry = it.next(); + ObPair partIdWithObTable = lastEntry.getValue(); + // try access new partition, async will not remove useless expectant + ObTableParam obTableParam = partIdWithObTable.getRight(); + ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest(); + + // refresh request info + queryRequest.setPartitionId(obTableParam.getPartitionId()); + queryRequest.setTableId(obTableParam.getTableId()); + + // refresh async query request + asyncRequest.setQueryType(ObQueryOperationType.QUERY_RENEW); + asyncRequest.setQuerySessionId(sessionId); + executeAsync(partIdWithObTable, asyncRequest); + } else { + throw new ObTableException("query end or expectant is null"); + } + } + @Override public boolean next() throws Exception { checkStatus(); From a473887dc125edc4611bd43223a635a6e1d73d46 Mon Sep 17 00:00:00 2001 From: miyuan-ljr Date: Thu, 14 Nov 2024 15:14:14 +0800 Subject: [PATCH 2/5] add time range map --- .../oceanbase/rpc/table/ObHBaseParams.java | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java index 5777c948..3ea7b2da 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java @@ -17,8 +17,13 @@ package com.alipay.oceanbase.rpc.table; +import com.alipay.oceanbase.rpc.util.ObBytesString; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; +import jdk.internal.net.http.common.Pair; + +import java.util.ArrayList; +import java.util.List; import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader; @@ -32,6 +37,8 @@ public class ObHBaseParams extends ObKVParamsBase { private static final int FLAG_ALLOW_PARTIAL_RESULTS = 1 << 0; private static final int FLAG_IS_CACHE_BLOCK = 1 << 1; private static final int FLAG_CHECK_EXISTENCE_ONLY = 1 << 2; + List>> timeRangeMap = new ArrayList<>(); + public ObHBaseParams() { pType = paramType.HBase; @@ -111,6 +118,21 @@ public byte[] encode() { System.arraycopy(booleansToByteArray(), 0, bytes, idx, 1); idx += 1; + int len = Serialization.getNeedBytes(timeRangeMap.size()); + System.arraycopy(Serialization.encodeVi64(timeRangeMap.size()), 0, bytes, idx, len); + idx += len; + for (Pair> timeRange : timeRangeMap) { + len = Serialization.getNeedBytes(timeRange.first); + System.arraycopy(Serialization.encodeBytesString(timeRange.first), 0, bytes, idx, len); + idx += len; + len = Serialization.getNeedBytes(timeRange.second.first); + System.arraycopy(Serialization.encodeVi64(timeRange.second.first), 0, bytes, idx, len); + idx += len; + len = Serialization.getNeedBytes(timeRange.second.second); + System.arraycopy(Serialization.encodeVi64(timeRange.second.second), 0, bytes, idx, len); + idx += len; + } + return bytes; } @@ -125,6 +147,11 @@ public Object decode(ByteBuf buf) { caching = Serialization.decodeVi32(buf); callTimeout = Serialization.decodeVi32(buf); byteArrayToBooleans(buf); + long size = Serialization.decodeVi64(buf); + this.timeRangeMap = new ArrayList<>((int) size); + for (int i = 0; i < size; i++) { + this.timeRangeMap.add(new Pair<>(Serialization.decodeBytesString(buf), new Pair<>(Serialization.decodeVi64(buf), Serialization.decodeVi64(buf)))); + } return this; } @@ -137,7 +164,7 @@ public String toString() { return "ObParams: {\n pType = " + pType + ", \n caching = " + caching + ", \n callTimeout = " + callTimeout + ", \n allowPartialResult = " + allowPartialResults + ", \n isCacheBlock = " + isCacheBlock - + ", \n checkExistenceOnly = " + checkExistenceOnly + "\n}\n"; + + ", \n checkExistenceOnly = " + checkExistenceOnly + ", \n timeRangeMap = " + timeRangeMap + "\n}\n"; } } From 1550d6a9b7241e1f72dd8f06d417779a228e2a40 Mon Sep 17 00:00:00 2001 From: miyuan-ljr Date: Thu, 14 Nov 2024 15:26:32 +0800 Subject: [PATCH 3/5] fix --- .../oceanbase/rpc/table/ObHBaseParams.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java index 3ea7b2da..295b8022 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java @@ -17,10 +17,10 @@ package com.alipay.oceanbase.rpc.table; +import com.alipay.oceanbase.rpc.location.model.partition.ObPair; import com.alipay.oceanbase.rpc.util.ObBytesString; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; -import jdk.internal.net.http.common.Pair; import java.util.ArrayList; import java.util.List; @@ -37,7 +37,7 @@ public class ObHBaseParams extends ObKVParamsBase { private static final int FLAG_ALLOW_PARTIAL_RESULTS = 1 << 0; private static final int FLAG_IS_CACHE_BLOCK = 1 << 1; private static final int FLAG_CHECK_EXISTENCE_ONLY = 1 << 2; - List>> timeRangeMap = new ArrayList<>(); + List>> timeRangeMap = new ArrayList<>(); public ObHBaseParams() { @@ -121,15 +121,15 @@ public byte[] encode() { int len = Serialization.getNeedBytes(timeRangeMap.size()); System.arraycopy(Serialization.encodeVi64(timeRangeMap.size()), 0, bytes, idx, len); idx += len; - for (Pair> timeRange : timeRangeMap) { - len = Serialization.getNeedBytes(timeRange.first); - System.arraycopy(Serialization.encodeBytesString(timeRange.first), 0, bytes, idx, len); + for (ObPair> timeRange : timeRangeMap) { + len = Serialization.getNeedBytes(timeRange.getLeft()); + System.arraycopy(Serialization.encodeBytesString(timeRange.getLeft()), 0, bytes, idx, len); idx += len; - len = Serialization.getNeedBytes(timeRange.second.first); - System.arraycopy(Serialization.encodeVi64(timeRange.second.first), 0, bytes, idx, len); + len = Serialization.getNeedBytes(timeRange.getRight().getLeft()); + System.arraycopy(Serialization.encodeVi64(timeRange.getRight().getLeft()), 0, bytes, idx, len); idx += len; - len = Serialization.getNeedBytes(timeRange.second.second); - System.arraycopy(Serialization.encodeVi64(timeRange.second.second), 0, bytes, idx, len); + len = Serialization.getNeedBytes(timeRange.getRight().getRight()); + System.arraycopy(Serialization.encodeVi64(timeRange.getRight().getRight()), 0, bytes, idx, len); idx += len; } @@ -150,7 +150,7 @@ public Object decode(ByteBuf buf) { long size = Serialization.decodeVi64(buf); this.timeRangeMap = new ArrayList<>((int) size); for (int i = 0; i < size; i++) { - this.timeRangeMap.add(new Pair<>(Serialization.decodeBytesString(buf), new Pair<>(Serialization.decodeVi64(buf), Serialization.decodeVi64(buf)))); + this.timeRangeMap.add(new ObPair<>(Serialization.decodeBytesString(buf), new ObPair<>(Serialization.decodeVi64(buf), Serialization.decodeVi64(buf)))); } return this; } From 60457d5c1348ef84ce3cb89f10beb061c3acc386 Mon Sep 17 00:00:00 2001 From: miyuan-ljr Date: Thu, 14 Nov 2024 15:31:43 +0800 Subject: [PATCH 4/5] refine --- .../java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java index 295b8022..f2fcea07 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java @@ -44,6 +44,10 @@ public ObHBaseParams() { pType = paramType.HBase; } + public void addFamilyTimeRange(ObBytesString family, long min, long max) { + timeRangeMap.add(new ObPair<>(family, new ObPair<>(min, max))); + } + public ObKVParamsBase.paramType getType() { return pType; } From 0bfe37d496f384734b0fecfde23afe70315f7d8f Mon Sep 17 00:00:00 2001 From: miyuan-ljr Date: Thu, 14 Nov 2024 18:12:33 +0800 Subject: [PATCH 5/5] fix --- .../com/alipay/oceanbase/rpc/table/ObHBaseParams.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java index f2fcea07..bb409328 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java @@ -160,8 +160,15 @@ public Object decode(ByteBuf buf) { } public long getPayloadContentSize() { - return 1 + Serialization.getNeedBytes(caching) + Serialization.getNeedBytes(callTimeout) - + 1; // all boolean to one byte + long contentSize = 1 + Serialization.getNeedBytes(caching) + Serialization.getNeedBytes(callTimeout) + + 1 // all boolean to one byte + + Serialization.getNeedBytes(timeRangeMap.size()); + for (ObPair> timeRange : timeRangeMap) { + contentSize += Serialization.getNeedBytes(timeRange.getLeft()); + contentSize += Serialization.getNeedBytes(timeRange.getRight().getLeft()); + contentSize += Serialization.getNeedBytes(timeRange.getRight().getRight()); + } + return contentSize; } public String toString() {