From 575aa00e01ce3fb6d204a84f3ef453faecccb6ba Mon Sep 17 00:00:00 2001 From: hexueyu Date: Wed, 21 Aug 2024 17:48:50 +0800 Subject: [PATCH] add ObParams --- .../impl/execute/query/ObTableQuery.java | 26 ++++ .../oceanbase/rpc/table/ObHBaseParams.java | 144 ++++++++++++++++++ .../oceanbase/rpc/table/ObKVParams.java | 78 ++++++++++ .../oceanbase/rpc/table/ObKVParamsBase.java | 54 +++++++ 4 files changed, 302 insertions(+) create mode 100644 src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java create mode 100644 src/main/java/com/alipay/oceanbase/rpc/table/ObKVParams.java create mode 100644 src/main/java/com/alipay/oceanbase/rpc/table/ObKVParamsBase.java diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java index 1dfd2a5c..a4f91992 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java @@ -17,6 +17,7 @@ package com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query; +import com.alipay.oceanbase.rpc.table.ObKVParams; import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationSingle; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationType; @@ -64,6 +65,8 @@ public class ObTableQuery extends AbstractPayload { private List aggregations = new LinkedList<>(); + private ObKVParams obKVParams; + /* * Check filter. */ @@ -173,6 +176,15 @@ public byte[] encode() { idx += len; } + if (isHbaseQuery) { + len = (int) obKVParams.getPayloadSize(); + System.arraycopy(obKVParams.encode(), 0, bytes, idx, len); + idx += len; + } else { + len = HTABLE_FILTER_DUMMY_BYTES.length; + System.arraycopy(HTABLE_FILTER_DUMMY_BYTES, 0, bytes, idx, len); + } + return bytes; } @@ -230,6 +242,10 @@ public Object decode(ByteBuf buf) { String agg_column = Serialization.decodeVString(buf); this.aggregations.add(new ObTableAggregationSingle(ObTableAggregationType.fromByte(agg_type), agg_column)); } + if (isHbaseQuery) { + obKVParams = new ObKVParams(); + this.obKVParams.decode(buf); + } return this; } @@ -258,6 +274,7 @@ public long getPayloadContentSize() { if (isHbaseQuery) { contentSize += hTableFilter.getPayloadSize(); + contentSize += obKVParams.getPayloadSize(); } else { contentSize += HTABLE_FILTER_DUMMY_BYTES.length; } @@ -466,4 +483,13 @@ public void setScanRangeColumns(String... scanRangeColumns) { public void setScanRangeColumns(List scanRangeColumns) { this.scanRangeColumns = scanRangeColumns; } + + public void setObKVParams(ObKVParams obKVParams) { + this.isHbaseQuery = true; + this.obKVParams = obKVParams; + } + + public ObKVParams getObKVParams() { + return obKVParams; + } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java new file mode 100644 index 00000000..3a6aec0d --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java @@ -0,0 +1,144 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.table; + +import com.alipay.oceanbase.rpc.util.Serialization; +import io.netty.buffer.ByteBuf; + +import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader; + +public class ObHBaseParams extends ObKVParamsBase { + int caching = -1; // 限制scan返回的行的数量 + int callTimeout = -1; // scannerLeasePeriodTimeout,代表客户端scan的单个rpc超时时间以及服务端的scan的超时时间的一部分 + boolean allowPartialResults = true; // 是否允许行内部分返回 + boolean isCacheBlock = false; // 是否启用缓存 + boolean checkExistenceOnly = false; // 查看是否存在不返回数据 + + public ObHBaseParams() { + pType = paramType.HBase; + } + + public ObKVParamsBase.paramType getType() { + return pType; + } + + public void setCaching(int caching) { + this.caching = caching; + } + + public void setCallTimeout(int callTimeout) { + this.callTimeout = callTimeout; + } + + public void setAllowPartialResults(boolean allowPartialResults) { + this.allowPartialResults = allowPartialResults; + } + + public void setCacheBlock(boolean isCacheBlock) { + this.isCacheBlock = isCacheBlock; + } + + public void setCheckExistenceOnly(boolean checkExistenceOnly) { + this.checkExistenceOnly = checkExistenceOnly; + } + + private int getContentSize() { + return 4 + Serialization.getNeedBytes(caching) + Serialization.getNeedBytes(callTimeout) + + 1; + } + + public int getCaching() { + return caching; + } + + public int getCallTimeout() { + return callTimeout; + } + + public boolean getAllowPartialResults() { + return allowPartialResults; + } + + public boolean getCacheBlock() { + return isCacheBlock; + } + + public boolean isCheckExistenceOnly() { + return checkExistenceOnly; + } + + // encode all boolean type to one byte + public byte[] booleansToByteArray() { + byte[] bytes = new byte[1]; // 1 byte for 4 booleans + + if (allowPartialResults) + bytes[0] |= 0x01; // 00000010 + if (isCacheBlock) + bytes[0] |= 0x02; // 00000100 + if (checkExistenceOnly) + bytes[0] |= 0x04; // 00001000 + + return bytes; + } + + public byte[] encode() { + byte[] bytes = new byte[(int) getPayloadContentSize()]; + int idx = 0; + + byte[] b = new byte[] { (byte) pType.ordinal() }; + System.arraycopy(b, 0, bytes, idx, 1); + idx += 1; + System.arraycopy(Serialization.encodeVi32(caching), 0, bytes, idx, + Serialization.getNeedBytes(caching)); + idx += Serialization.getNeedBytes(caching); + System.arraycopy(Serialization.encodeVi32(callTimeout), 0, bytes, idx, + Serialization.getNeedBytes(callTimeout)); + idx += Serialization.getNeedBytes(callTimeout); + System.arraycopy(booleansToByteArray(), 0, bytes, idx, 1); + idx += 1; + + return bytes; + } + + public void byteArrayToBooleans(ByteBuf bytes) { + byte b = bytes.readByte(); + allowPartialResults = (b & 0x01) != 0; + isCacheBlock = (b & 0x02) != 0; + checkExistenceOnly = (b & 0x04) != 0; + } + + public Object decode(ByteBuf buf) { + caching = Serialization.decodeVi32(buf); + callTimeout = Serialization.decodeVi32(buf); + byteArrayToBooleans(buf); + return this; + } + + public long getPayloadContentSize() { + return 1 + Serialization.getNeedBytes(caching) + Serialization.getNeedBytes(callTimeout) + + 1; // all boolean to one byte + } + + public String toString() { + return "ObParams: {\n pType = " + pType + ", \n caching = " + caching + + ", \n callTimeout = " + callTimeout + ", \n allowPartialResult = " + + allowPartialResults + ", \n isCacheBlock = " + isCacheBlock + + ", \n checkExistenceOnly = " + checkExistenceOnly + "\n}\n"; + } + +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObKVParams.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObKVParams.java new file mode 100644 index 00000000..3b0a0bb8 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObKVParams.java @@ -0,0 +1,78 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.table; + +import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; +import io.netty.buffer.ByteBuf; + +import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader; +import static com.alipay.oceanbase.rpc.util.Serialization.getObUniVersionHeaderLength; + +public class ObKVParams extends AbstractPayload { + + public ObKVParamsBase obKVParamsBase; + + public ObKVParamsBase getObParams(ObKVParamsBase.paramType pType) { + switch (pType) { + case HBase: + return new ObHBaseParams(); + case Redis: + default: + throw new RuntimeException("Currently does not support other types except HBase"); + } + } + + public void setObParamsBase(ObKVParamsBase obKVParamsBase) { + this.obKVParamsBase = obKVParamsBase; + } + + public ObKVParamsBase getObParamsBase() { + return obKVParamsBase; + } + + @Override + public byte[] encode() { + byte[] bytes = new byte[(int) getPayloadSize()]; + int idx = 0; + + // 0. encode header + int headerLen = (int) getObUniVersionHeaderLength(getVersion(), getPayloadContentSize()); + System.arraycopy(encodeObUniVersionHeader(getVersion(), getPayloadContentSize()), 0, bytes, + idx, headerLen); + idx += headerLen; + + int len = (int) obKVParamsBase.getPayloadContentSize(); + System.arraycopy(obKVParamsBase.encode(), 0, bytes, idx, len); + + return bytes; + } + + public Object decode(ByteBuf buf) { + super.decode(buf); + byte b = buf.readByte(); + ObKVParamsBase.paramType pType = ObKVParamsBase.paramType.values()[b]; + obKVParamsBase = getObParams(pType); + obKVParamsBase.decode(buf); + return this; + } + + @Override + public long getPayloadContentSize() { + return obKVParamsBase.getPayloadContentSize(); + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObKVParamsBase.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObKVParamsBase.java new file mode 100644 index 00000000..796e4cf9 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObKVParamsBase.java @@ -0,0 +1,54 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.table; + +import io.netty.buffer.ByteBuf; + +public abstract class ObKVParamsBase { + public enum paramType { + HBase((byte) 0), Redis((byte) 1); + private final byte value; + + paramType(byte value) { + this.value = value; + } + + public byte getValue() { + return value; + } + } + + public int byteSize; + public paramType pType; + + public paramType getType() { + return pType; + } + + public byte[] encode() { + return null; + } + + public Object decode(ByteBuf buf) { + return null; + } + + public long getPayloadContentSize() { + return 0; + } +} \ No newline at end of file