Skip to content

Commit 5e6889b

Browse files
stuBirdFlymiyuan-ljr
authored andcommitted
support hbase scan renewLease (#211)
1 parent 14677e5 commit 5e6889b

File tree

3 files changed

+34
-1
lines changed

3 files changed

+34
-1
lines changed

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,13 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
284284
return result;
285285
}
286286

287+
/*
288+
* RenewLease.
289+
*/
290+
public void renewLease() throws Exception {
291+
throw new IllegalStateException("renew only support stream query");
292+
}
293+
287294
/*
288295
* Next.
289296
*/

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObQueryOperationType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.Map;
2222

2323
public enum ObQueryOperationType {
24-
QUERY_START(0), QUERY_NEXT(1), QUERY_END(2);
24+
QUERY_START(0), QUERY_NEXT(1), QUERY_END(2), QUERY_RENEW(3);
2525

2626
private int value;
2727
private static Map<Integer, ObQueryOperationType> map = new HashMap<Integer, ObQueryOperationType>();

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,32 @@ protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery ta
181181
return buildPartitions(client, tableQuery, tableName);
182182
}
183183

184+
// This function is designed for HBase-type requests.
185+
// It is used to extend the session duration of a scan
186+
@Override
187+
public void renewLease() throws Exception {
188+
if (!isEnd() && !expectant.isEmpty()) {
189+
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
190+
.iterator();
191+
Map.Entry<Long, ObPair<Long, ObTableParam>> lastEntry = it.next();
192+
ObPair<Long, ObTableParam> partIdWithObTable = lastEntry.getValue();
193+
// try access new partition, async will not remove useless expectant
194+
ObTableParam obTableParam = partIdWithObTable.getRight();
195+
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
196+
197+
// refresh request info
198+
queryRequest.setPartitionId(obTableParam.getPartitionId());
199+
queryRequest.setTableId(obTableParam.getTableId());
200+
201+
// refresh async query request
202+
asyncRequest.setQueryType(ObQueryOperationType.QUERY_RENEW);
203+
asyncRequest.setQuerySessionId(sessionId);
204+
executeAsync(partIdWithObTable, asyncRequest);
205+
} else {
206+
throw new ObTableException("query end or expectant is null");
207+
}
208+
}
209+
184210
@Override
185211
public boolean next() throws Exception {
186212
checkStatus();

0 commit comments

Comments
 (0)