diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/AbstractPayload.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/AbstractPayload.java index fa17103e..49076ded 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/AbstractPayload.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/AbstractPayload.java @@ -17,6 +17,7 @@ package com.alipay.oceanbase.rpc.protocol.payload; +import com.alipay.oceanbase.rpc.util.ObByteBuf; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; @@ -44,6 +45,7 @@ public abstract class AbstractPayload implements ObPayload { private long version = 1; protected long timeout = RPC_OPERATION_TIMEOUT.getDefaultLong(); protected int groupId = 0; + protected long payLoadContentSize = -1; /* * Get pcode. @@ -176,11 +178,15 @@ public void setSequence(long sequence) { * encode unis header */ protected int encodeHeader(byte[] bytes, int idx) { - int headerLen = (int) getObUniVersionHeaderLength(getVersion(), getPayloadContentSize()); - System.arraycopy(encodeObUniVersionHeader(getVersion(), getPayloadContentSize()), 0, bytes, - idx, headerLen); - idx += headerLen; + byte[] versionHeaderBytes = encodeObUniVersionHeader(getVersion(), getPayloadContentSize()); + System.arraycopy(versionHeaderBytes, 0, bytes, + idx, versionHeaderBytes.length); + idx += versionHeaderBytes.length; return idx; } + protected void encodeHeader(ObByteBuf buf) { + encodeObUniVersionHeader(buf, getVersion(), getPayloadContentSize()); + } + } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObj.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObj.java index 7fa4e1b5..178fa43c 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObj.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObj.java @@ -22,16 +22,30 @@ import com.alipay.oceanbase.rpc.util.ObVString; import io.netty.buffer.ByteBuf; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + public class ObObj implements ObSimplePayload { private static ObObj MAX_OBJECT; private static ObObj MIN_OBJECT; + private static ObObj NULL_OBJECT; private static long MAX_OBJECT_VALUE = -2L; private static long MIN_OBJECT_VALUE = -3L; static { MAX_OBJECT = new ObObj(ObObjType.ObExtendType.getDefaultObjMeta(), MAX_OBJECT_VALUE); MIN_OBJECT = new ObObj(ObObjType.ObExtendType.getDefaultObjMeta(), MIN_OBJECT_VALUE); + NULL_OBJECT = new ObObj(ObObjType.ObNullType.getDefaultObjMeta(), null); + } + private static final int OBJ_POOL_SIZE = 100000; + private static final AtomicInteger CURRENT_INDEX = new AtomicInteger(0); + private static final ConcurrentLinkedQueue OBJ_POOL = new ConcurrentLinkedQueue<>(); + static { + // 初始化对象池 + for (int i = 0; i < OBJ_POOL_SIZE; i++) { + OBJ_POOL.offer(new ObObj()); + } } /* @@ -158,14 +172,33 @@ public void setValue(Object value) { * Get instance. */ public static ObObj getInstance(Object value) { - ObObjMeta meta = ObObjType.defaultObjMeta(value); + ObObjType type = ObObjType.valueOfType(value); + ObObjMeta meta = null; + if (type == ObObjType.ObVarcharType) { + meta = ObObjMetaPool.varchrObjMeta; + } else { + meta = ObObjType.defaultObjMeta(value); + } + ObObj obj = OBJ_POOL.poll(); + if (obj == null) { + // 如果池为空,创建新对象 + obj = new ObObj(); + } + // 初始化对象 + obj.setMeta(meta); if (value instanceof ObObj) { - return new ObObj(meta, ((ObObj) value).getValue()); + obj.setValue(((ObObj) value).getValue()); } else { - return new ObObj(meta, value); + obj.setValue(value); } + return obj; } + public static ObObj getNullObject() { + return NULL_OBJECT; + } + + /* * Get max. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java index 218f45f2..a88b8ddc 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java @@ -19,16 +19,22 @@ import com.alipay.oceanbase.rpc.exception.FeatureNotSupportedException; import com.alipay.oceanbase.rpc.protocol.payload.Constants; -import com.alipay.oceanbase.rpc.util.ObBytesString; -import com.alipay.oceanbase.rpc.util.ObVString; -import com.alipay.oceanbase.rpc.util.Serialization; -import com.alipay.oceanbase.rpc.util.TimeUtils; +import com.alipay.oceanbase.rpc.util.*; import io.netty.buffer.ByteBuf; import java.sql.Timestamp; import java.time.*; import java.util.*; +class ObObjMetaPool { + public static ObObjMeta varchrObjMeta = null; + static { + varchrObjMeta = new ObObjMeta(ObObjType.ObVarcharType, ObCollationLevel.CS_LEVEL_EXPLICIT, + ObCollationType.CS_TYPE_UTF8MB4_GENERAL_CI, (byte) 10); + } +} + + public enum ObObjType { ObNullType(0) { @@ -40,6 +46,11 @@ public byte[] encode(Object obj) { return new byte[0]; } + @Override + public void encode(ObByteBuf buf, Object obj) { + buf.writeBytes(new byte[0]); + } + /* * Decode. */ @@ -90,6 +101,15 @@ public byte[] encode(Object obj) { } } + @Override + public void encode(ObByteBuf buf, Object obj) { + if (obj instanceof Boolean) { + buf.writeByte((Boolean) obj ? (byte) 1 : (byte) 0 ); + } else { + buf.writeByte((Byte) obj); + } + } + /* * Decode. */ @@ -141,6 +161,11 @@ public byte[] encode(Object obj) { return Serialization.encodeVi32(((Number) obj).intValue()); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVi32(buf, ((Number) obj).intValue()); + } + /* * Decode. */ @@ -195,7 +220,12 @@ public byte[] encode(Object obj) { return Serialization.encodeVi32(((Number) obj).intValue()); } - /* + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVi32(buf, ((Number) obj).intValue()); + } + + /* * Decode. */ @Override @@ -239,6 +269,11 @@ public byte[] encode(Object obj) { return Serialization.encodeVi32(((Number) obj).intValue()); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVi32(buf, ((Number) obj).intValue()); + } + /* * Decode. */ @@ -291,6 +326,11 @@ public byte[] encode(Object obj) { return Serialization.encodeVi64(((Number) obj).longValue()); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVi64(buf, ((Number) obj).longValue()); + } + /* * Decode. */ @@ -336,6 +376,11 @@ public byte[] encode(Object obj) { return new byte[] { (Byte) obj }; } + @Override + public void encode(ObByteBuf buf, Object obj) { + buf.writeByte((Byte) obj); + } + /* * Decode. */ @@ -388,6 +433,11 @@ public byte[] encode(Object obj) { return Serialization.encodeVi32(((Number) obj).intValue()); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVi32(buf, ((Number) obj).intValue()); + } + /* * Decode. */ @@ -442,6 +492,11 @@ public byte[] encode(Object obj) { return Serialization.encodeVi32(((Number) obj).intValue()); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVi32(buf, ((Number) obj).intValue()); + } + /* * Decode. */ @@ -496,6 +551,11 @@ public byte[] encode(Object obj) { return Serialization.encodeVi32(((Number) obj).intValue()); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVi32(buf, ((Number) obj).intValue()); + } + /* * Decode. */ @@ -550,6 +610,11 @@ public byte[] encode(Object obj) { return Serialization.encodeVi64(((Number) obj).longValue()); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVi64(buf, ((Number) obj).longValue()); + } + /* * Decode. */ @@ -596,6 +661,11 @@ public byte[] encode(Object obj) { return Serialization.encodeFloat(((Float) obj)); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeFloat(buf, ((Float) obj)); + } + /* * Decode. */ @@ -653,6 +723,11 @@ public byte[] encode(Object obj) { return Serialization.encodeDouble((Double) obj); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeDouble(buf, (Double) obj); + } + /* * Decode. */ @@ -712,6 +787,11 @@ public byte[] encode(Object obj) { return new byte[0]; } + @Override + public void encode(ObByteBuf buf, Object obj) { + buf.writeBytes(new byte[0]); + } + /* * Decode. */ @@ -757,6 +837,11 @@ public byte[] encode(Object obj) { return new byte[0]; } + @Override + public void encode(ObByteBuf buf, Object obj) { + buf.writeBytes(new byte[0]); + } + /* * Decode. */ @@ -803,6 +888,11 @@ public byte[] encode(Object obj) { return new byte[0]; } + @Override + public void encode(ObByteBuf buf, Object obj) { + buf.writeBytes(new byte[0]); + } + /* * Decode. */ @@ -848,6 +938,11 @@ public byte[] encode(Object obj) { return new byte[0]; } + @Override + public void encode(ObByteBuf buf, Object obj) { + buf.writeBytes(new byte[0]); + } + /* * Decode. */ @@ -899,6 +994,13 @@ public byte[] encode(Object obj) { return Serialization.encodeVi64(targetTs * 1000L); } + @Override + public void encode(ObByteBuf buf, Object obj) { + long targetTs = ((Date) obj).getTime() + + OffsetDateTime.now().getOffset().getTotalSeconds() * 1000L; + Serialization.encodeVi64(buf,targetTs * 1000L); + } + /* * Decode. */ @@ -953,6 +1055,14 @@ public byte[] encode(Object obj) { return Serialization.encodeVi64(timeInMicroseconds); } + @Override + public void encode(ObByteBuf buf, Object obj) { + long timeInMicroseconds = ((Timestamp)obj).getTime() * 1_000; + int nanoSeconds = ((Timestamp)obj).getNanos() % 1_000_000; + timeInMicroseconds += nanoSeconds / 1_000; + Serialization.encodeVi64(buf, timeInMicroseconds); + } + /* * Decode. */ @@ -1005,6 +1115,11 @@ public byte[] encode(Object obj) { return Serialization.encodeVi32((int) ((Date) obj).getTime()); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVi32(buf, (int) ((Date) obj).getTime()); + } + /* * Decode. */ @@ -1067,6 +1182,11 @@ public byte[] encode(Object obj) { return Serialization.encodeVi64((int) ((Date) obj).getTime()); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVi64(buf, (int) ((Date) obj).getTime()); + } + /* * Decode. */ @@ -1112,6 +1232,11 @@ public byte[] encode(Object obj) { return new byte[] { (Byte) obj }; } + @Override + public void encode(ObByteBuf buf, Object obj) { + buf.writeByte((Byte) obj); + } + /* * Decode. */ @@ -1164,6 +1289,18 @@ public byte[] encode(Object obj) { } } + @Override + public void encode(ObByteBuf buf, Object obj) { + if (obj instanceof byte[]) { + ObBytesString bytesString = new ObBytesString((byte[]) obj); + Serialization.encodeBytesString(buf, bytesString); + } else if (obj instanceof ObVString) { + buf.writeBytes(((ObVString) obj).getEncodeBytes()); + } else { + Serialization.encodeVString(buf, (String) obj); + } + } + /* * Decode. */ @@ -1219,6 +1356,11 @@ public byte[] encode(Object obj) { return Serialization.encodeVString((String) obj); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVString(buf, (String) obj); + } + /* * Decode. */ @@ -1274,6 +1416,11 @@ public byte[] encode(Object obj) { return new byte[0]; } + @Override + public void encode(ObByteBuf buf, Object obj) { + buf.writeBytes(new byte[0]); + } + /* * Decode. */ @@ -1321,6 +1468,11 @@ public byte[] encode(Object obj) { return Serialization.encodeVi64((Long) obj); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVi64(buf, (Long) obj); + } + /* * Decode. */ @@ -1369,6 +1521,11 @@ public byte[] encode(Object obj) { return new byte[0]; } + @Override + public void encode(ObByteBuf buf, Object obj) { + buf.writeBytes(new byte[0]); + } + /* * Decode. */ @@ -1415,7 +1572,12 @@ public byte[] encode(Object obj) { return Serialization.encodeVString((String) obj); } - /* + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVString(buf, (String) obj); + } + + /* * Decode. */ @Override @@ -1470,6 +1632,11 @@ public byte[] encode(Object obj) { return Serialization.encodeVString((String) obj); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVString(buf, (String) obj); + } + /* * Decode. */ @@ -1524,6 +1691,11 @@ public byte[] encode(Object obj) { return Serialization.encodeVString((String) obj); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVString(buf, (String) obj); + } + /* * Decode. */ @@ -1576,6 +1748,11 @@ public byte[] encode(Object obj) { return Serialization.encodeVString((String) obj); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVString(buf, (String) obj); + } + /* * Decode. */ @@ -1629,6 +1806,11 @@ public byte[] encode(Object obj) { return Serialization.encodeVi64(((Number) obj).longValue()); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVi64(buf, ((Number) obj).longValue()); + } + /* * Decode. */ @@ -1679,6 +1861,11 @@ public byte[] encode(Object obj) { return Serialization.encodeVi32((int) ((Date) obj).getTime()); } + @Override + public void encode(ObByteBuf buf, Object obj) { + Serialization.encodeVi32(buf, (int) ((Date) obj).getTime()); + } + /* * Decode. */ @@ -1744,6 +1931,15 @@ public byte[] encode(Object obj) { return Serialization.encodeVi64(targetTs * 1000L); } + @Override + public void encode(ObByteBuf buf, Object obj) { + // Date do not have timezone, when we use getTime, system will recognize it as our system timezone and transform it into UTC Time, which will changed the time. + // We should add back the lose part. + long targetTs = ((Date) obj).getTime() + + OffsetDateTime.now().getOffset().getTotalSeconds() * 1000L; + Serialization.encodeVi64(buf,targetTs * 1000L); + } + /* * Decode. */ @@ -1871,6 +2067,8 @@ public byte getByteValue() { public abstract byte[] encode(Object obj); + public abstract void encode(ObByteBuf buf, Object obj); + public abstract Object decode(ByteBuf buf, ObCollationType type); public abstract int getEncodedSize(Object obj); diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableObjType.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableObjType.java index 0ef94ab3..fda8d354 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableObjType.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableObjType.java @@ -17,6 +17,7 @@ package com.alipay.oceanbase.rpc.protocol.payload.impl; +import com.alipay.oceanbase.rpc.util.ObByteBuf; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; @@ -66,6 +67,11 @@ public byte[] encode(ObObj obj) { return encodeWithMeta(obj); } + @Override + public void encode(ObByteBuf buf, ObObj obj) { + encodeWithMeta(buf, obj); + } + @Override public void decode(ByteBuf buf, ObObj obj) { decodeWithMeta(buf, obj); @@ -83,6 +89,11 @@ public byte[] encode(ObObj obj) { return encodeWithMeta(obj); } + @Override + public void encode(ObByteBuf buf, ObObj obj) { + encodeWithMeta(buf, obj); + } + @Override public void decode(ByteBuf buf, ObObj obj) { decodeWithMeta(buf, obj); @@ -103,6 +114,10 @@ public byte[] encode(ObObj obj) { return bytes; } + public void encode(ObByteBuf buf, ObObj obj) { + Serialization.encodeI8(buf, this.getValue()); + } + public void decode(ByteBuf buf, ObObj obj) { ObObjType objType = getObjType(this); ObObjMeta objMeta = objType.getDefaultObjMeta(); @@ -124,6 +139,10 @@ public byte[] encode(ObObj obj) { return bytes; } + public void encode(ObByteBuf buf, ObObj obj) { + Serialization.encodeI8(buf, this.getValue()); + } + public void decode(ByteBuf buf, ObObj obj) { ObObjType objType = getObjType(this); ObObjMeta objMeta = objType.getDefaultObjMeta(); @@ -303,6 +322,13 @@ public byte[] encode(ObObj obj) { return bytes; } + public void encode(ObByteBuf buf, ObObj obj) { + ObObjType objType = obj.getMeta().getType(); + Serialization.encodeI8(buf, this.getValue()); + + objType.encode(buf, obj.getValue()); + } + public void decode(ByteBuf buf, ObObj obj) { ObObjType objType = getObjType(this); ObObjMeta objMeta = objType.getDefaultObjMeta(); @@ -337,6 +363,20 @@ public byte[] encodeWithMeta(ObObj obj) { return bytes; } + public void encodeWithMeta(ObByteBuf buf, ObObj obj) { + ObObjType objType = obj.getMeta().getType(); + + Serialization.encodeI8(buf, this.getValue()); + + Serialization.encodeI8(buf, obj.getMeta().getCsLevel().getByteValue()); + + Serialization.encodeI8(buf, obj.getMeta().getCsType().getByteValue()); + + Serialization.encodeI8(buf, obj.getMeta().getScale()); + + objType.encode(buf, obj.getValue()); + } + public void decodeWithMeta(ByteBuf buf, ObObj obj) { ObObjType objType = getObjType(this); ObObjMeta meta = obj.getMeta(); diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableSerialUtil.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableSerialUtil.java index e32c5ffb..fe9684ab 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableSerialUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableSerialUtil.java @@ -20,6 +20,7 @@ import com.alipay.oceanbase.rpc.ObGlobal; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObBorderFlag; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange; +import com.alipay.oceanbase.rpc.util.ObByteBuf; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; @@ -34,6 +35,10 @@ static public byte[] encode(ObObj obj) { return getTableObjType(obj).encode(obj); } + static public void encode(ObByteBuf buf, ObObj obj) { + getTableObjType(obj).encode(buf, obj); + } + static public void decode(ByteBuf buf, ObObj obj) { ObTableObjType tableObjType = decodeTableObjType(buf); tableObjType.decode(buf, obj); @@ -124,6 +129,35 @@ static public byte[] encode(ObNewRange range) { return bytes; } + static public void encode(ObByteBuf buf, ObNewRange range) { + long tableId = range.getTableId(); + Serialization.encodeVi64(buf, tableId); + + Serialization.encodeI8(buf, range.getBorderFlag().getValue()); + + ObRowKey startKey = range.getStartKey(); + long rowkeySize = startKey.getObjCount(); + Serialization.encodeVi64(buf, rowkeySize); + for (int i = 0; i < rowkeySize; i++) { + ObObj obObj = startKey.getObj(i); + ObTableSerialUtil.encode(buf, obObj); + } + + ObRowKey endKey = range.getEndKey(); + rowkeySize = endKey.getObjCount(); + Serialization.encodeVi64(buf, rowkeySize); + for (int i = 0; i < rowkeySize; i++) { + ObObj obObj = endKey.getObj(i); + ObTableSerialUtil.encode(buf, obObj); + } + + if (ObGlobal.obVsnMajor() >= 4) { + long flag = range.getFlag(); + Serialization.encodeVi64(buf, flag); + } + + } + static public void decode(ByteBuf buf, ObNewRange range) { range.setTableId(Serialization.decodeVi64(buf)); range.setBorderFlag(ObBorderFlag.valueOf(Serialization.decodeI8(buf.readByte()))); diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpRequest.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpRequest.java index ab196b76..9af12771 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpRequest.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpRequest.java @@ -20,6 +20,7 @@ import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; import com.alipay.oceanbase.rpc.protocol.payload.Credentialable; import com.alipay.oceanbase.rpc.protocol.payload.Pcodes; +import com.alipay.oceanbase.rpc.util.ObByteBuf; import com.alipay.oceanbase.rpc.util.ObBytesString; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; @@ -39,7 +40,6 @@ public class ObTableLSOpRequest extends AbstractPayload implements Credentialabl protected ObTableEntityType entityType = ObTableEntityType.KV; protected ObTableConsistencyLevel consistencyLevel = ObTableConsistencyLevel.STRONG; private ObTableLSOperation lsOperation = null; - /* * Get pcode. */ @@ -53,31 +53,27 @@ public int getPcode() { */ @Override public byte[] encode() { - byte[] bytes = new byte[(int) getPayloadSize()]; - int idx = 0; + ObByteBuf buf = new ObByteBuf((int) getPayloadSize()); - // 0. encode ObTableLSOpRequest header - idx = encodeHeader(bytes, idx); + // 0. encode ObTableLSOpRequest header + encodeHeader(buf); // 1. encode credential - byte[] strbytes = Serialization.encodeBytesString(credential); - System.arraycopy(strbytes, 0, bytes, idx, strbytes.length); - idx += strbytes.length; + Serialization.encodeBytesString(buf, credential); // 2. encode entity_type - System.arraycopy(Serialization.encodeI8(entityType.getByteValue()), 0, bytes, idx, 1); - idx++; + Serialization.encodeI8(buf, entityType.getByteValue()); - // 3. encode consistencyLevel level - System.arraycopy(Serialization.encodeI8(consistencyLevel.getByteValue()), 0, bytes, idx, 1); - idx++; + // 3. encode consistencyLevel + Serialization.encodeI8(buf, consistencyLevel.getByteValue()); // 4. encode lsOperation - int len = (int) lsOperation.getPayloadSize(); - System.arraycopy(lsOperation.encode(), 0, bytes, idx, len); - idx += len; - - return bytes; + lsOperation.encode(buf); + if (buf.pos != buf.bytes.length) { + throw new IllegalArgumentException("error in encode lsOperationRequest (" + + "pos:" + buf.pos + ", buf.capacity:" + buf.bytes.length + ")"); + } + return buf.bytes; } /* @@ -100,8 +96,11 @@ public Object decode(ByteBuf buf) { */ @Override public long getPayloadContentSize() { - return lsOperation.getPayloadSize() + Serialization.getNeedBytes(credential) + 1 // entityType - + 1; // consistencyLevel + if (payLoadContentSize == -1) { + payLoadContentSize = lsOperation.getPayloadSize() + Serialization.getNeedBytes(credential) + 1 // entityType + + 1; // consistencyLevel + } + return payLoadContentSize; } /* diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOperation.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOperation.java index 86a52e61..f49c04d0 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOperation.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOperation.java @@ -19,6 +19,7 @@ import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; import com.alipay.oceanbase.rpc.protocol.payload.Constants; +import com.alipay.oceanbase.rpc.util.ObByteBuf; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; @@ -46,6 +47,7 @@ public class ObTableLSOperation extends AbstractPayload { private static final int LS_ID_SIZE = 8; private static final long INVALID_LS_ID = -1; + private boolean isHbase = false; /* OB_UNIS_DEF_SERIALIZE(ObTableLSOp, @@ -70,53 +72,96 @@ public byte[] encode() { idx += 8; // 2. encode table name - int len = Serialization.getNeedBytes(tableName); - System.arraycopy(Serialization.encodeVString(tableName), 0, bytes, idx, len); - idx += len; + byte[] tableNameBytes = Serialization.encodeVString(tableName); + System.arraycopy(tableNameBytes, 0, bytes, idx, tableNameBytes.length); + idx += tableNameBytes.length; // 3. encode table id - len = Serialization.getNeedBytes(tableId); - System.arraycopy(Serialization.encodeVi64(tableId), 0, bytes, idx, len); - idx += len; + byte[] tableIdBytes = Serialization.encodeVi64(tableId); + System.arraycopy(tableIdBytes, 0, bytes, idx, tableIdBytes.length); + idx += tableIdBytes.length; // 4. encode rowKey names - len = Serialization.getNeedBytes(rowKeyNames.size()); - System.arraycopy(Serialization.encodeVi64(rowKeyNames.size()), 0, bytes, idx, len); - idx += len; + byte[] rowKeyNameLenBytes = Serialization.encodeVi64(rowKeyNames.size()); + System.arraycopy(rowKeyNameLenBytes, 0, bytes, idx, rowKeyNameLenBytes.length); + idx += rowKeyNameLenBytes.length; for (String rowKeyName : rowKeyNames) { - len = Serialization.getNeedBytes(rowKeyName); - System.arraycopy(Serialization.encodeVString(rowKeyName), 0, bytes, idx, len); - idx += len; + byte[] rowKeyNameBytes = Serialization.encodeVString(rowKeyName); + System.arraycopy(rowKeyNameBytes, 0, bytes, idx, rowKeyNameBytes.length); + idx += rowKeyNameBytes.length; } // 5. encode properties names - len = Serialization.getNeedBytes(propertiesNames.size()); - System.arraycopy(Serialization.encodeVi64(propertiesNames.size()), 0, bytes, idx, len); - idx += len; + byte[] propertiesNamesLenBytes = Serialization.encodeVi64(propertiesNames.size()); + System.arraycopy(propertiesNamesLenBytes, 0, bytes, idx, propertiesNamesLenBytes.length); + idx += propertiesNamesLenBytes.length; for (String propertyName : propertiesNames) { - len = Serialization.getNeedBytes(propertyName); - System.arraycopy(Serialization.encodeVString(propertyName), 0, bytes, idx, len); - idx += len; + byte[] propertyNameByte = Serialization.encodeVString(propertyName); + System.arraycopy(propertyNameByte, 0, bytes, idx, propertyNameByte.length); + idx += propertyNameByte.length; } // 6. encode option flag - len = Serialization.getNeedBytes(optionFlag.getValue()); - System.arraycopy(Serialization.encodeVi64(optionFlag.getValue()), 0, bytes, idx, len); - idx += len; + byte[] optionFlagBytes = Serialization.encodeVi64(optionFlag.getValue()); + System.arraycopy(optionFlagBytes, 0, bytes, idx, optionFlagBytes.length); + idx += optionFlagBytes.length; // 7. encode Operation - len = Serialization.getNeedBytes(tabletOperations.size()); - System.arraycopy(Serialization.encodeVi64(tabletOperations.size()), 0, bytes, idx, len); - idx += len; + byte[] tabletOperationsLenBytes = Serialization.encodeVi64(tabletOperations.size()); + System.arraycopy(tabletOperationsLenBytes, 0, bytes, idx, tabletOperationsLenBytes.length); + idx += tabletOperationsLenBytes.length; for (ObTableTabletOp tabletOperation : tabletOperations) { - len = (int) tabletOperation.getPayloadSize(); - System.arraycopy(tabletOperation.encode(), 0, bytes, idx, len); - idx += len; + byte[] tabletOperationBytes = tabletOperation.encode(); + System.arraycopy(tabletOperationBytes, 0, bytes, idx, tabletOperationBytes.length); + idx += tabletOperationBytes.length; } return bytes; } + public void encode(ObByteBuf buf) { + + // 0. encode header + encodeHeader(buf); + + // 1. encode ls id + Serialization.encodeI64(buf, lsId); + + // 2. encode table name + Serialization.encodeVString(buf, tableName); + + // 3. encode table id + Serialization.encodeVi64(buf, tableId); + if (isHbase) { + buf.writeBytes(StaticHbaseColumns.RowkeyColumSize); + buf.writeBytes(StaticHbaseColumns.getKQTBytes()); + buf.writeBytes(StaticHbaseColumns.VColumSize); + buf.writeBytes(StaticHbaseColumns.getVBytes()); + } else { + // 4. encode rowKey names + Serialization.encodeVi64(buf, rowKeyNames.size()); + for (String rowKeyName : rowKeyNames) { + Serialization.encodeVString(buf, rowKeyName); + } + + // 5. encode properties names + Serialization.encodeVi64(buf, propertiesNames.size()); + for (String propertyName : propertiesNames) { + Serialization.encodeVString(buf, propertyName); + } + } + + + // 6. encode option flag + Serialization.encodeVi64(buf, optionFlag.getValue()); + + // 7. encode Operation + Serialization.encodeVi64(buf, tabletOperations.size()); + for (ObTableTabletOp tabletOperation : tabletOperations) { + tabletOperation.encode(buf); + } + } + /* * Decode. */ @@ -169,24 +214,35 @@ public Object decode(ByteBuf buf) { */ @Override public long getPayloadContentSize() { - long payloadContentSize = 0; - payloadContentSize += Serialization.getNeedBytes(tabletOperations.size()); - for (ObTableTabletOp operation : tabletOperations) { - payloadContentSize += operation.getPayloadSize(); - } + if (this.payLoadContentSize == -1) { + long payloadContentSize = 0; + payloadContentSize += Serialization.getNeedBytes(tabletOperations.size()); + for (ObTableTabletOp operation : tabletOperations) { + payloadContentSize += operation.getPayloadSize(); + } + if (isHbase) { + payloadContentSize += StaticHbaseColumns.RowkeyColumSize.length; + payloadContentSize += StaticHbaseColumns.getKQTBytes().length; + payloadContentSize += StaticHbaseColumns.VColumSize.length; + payloadContentSize += StaticHbaseColumns.getVBytes().length; + } else { + payloadContentSize += Serialization.getNeedBytes(rowKeyNames.size()); + for (String rowKeyName : rowKeyNames) { + payloadContentSize += Serialization.getNeedBytes(rowKeyName); + } - payloadContentSize += Serialization.getNeedBytes(rowKeyNames.size()); - for (String rowKeyName : rowKeyNames) { - payloadContentSize += Serialization.getNeedBytes(rowKeyName); - } + payloadContentSize += Serialization.getNeedBytes(propertiesNames.size()); + for (String propertyName : propertiesNames) { + payloadContentSize += Serialization.getNeedBytes(propertyName); + } + } - payloadContentSize += Serialization.getNeedBytes(propertiesNames.size()); - for (String propertyName : propertiesNames) { - payloadContentSize += Serialization.getNeedBytes(propertyName); + + this.payLoadContentSize = payloadContentSize + LS_ID_SIZE + Serialization.getNeedBytes(optionFlag.getValue()) + + Serialization.getNeedBytes(tableName) + Serialization.getNeedBytes(tableId); } + return this.payLoadContentSize; - return payloadContentSize + LS_ID_SIZE + Serialization.getNeedBytes(optionFlag.getValue()) - + Serialization.getNeedBytes(tableName) + Serialization.getNeedBytes(tableId); } /* @@ -327,10 +383,18 @@ public void prepareColumnNamesBitMap() { } public void prepare() { - this.collectColumnNamesIdxMap(); - this.beforeOption(); - this.prepareOption(); - this.prepareColumnNamesBitMap(); + if (isHbase) { + this.rowKeyNames.add("K"); + this.rowKeyNames.add("Q"); + this.rowKeyNames.add("T"); + this.propertiesNames.add("V"); + this.setIsSamePropertiesNames(true); + } else { + this.collectColumnNamesIdxMap(); + this.beforeOption(); + this.prepareOption(); + this.prepareColumnNamesBitMap(); + } } public long getLsId() { @@ -341,4 +405,12 @@ public void setTableName(String tableName) { this.tableName = tableName; } + public boolean isHbase() { + return isHbase; + } + + public void setHbase(boolean hbase) { + isHbase = hbase; + } + } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOp.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOp.java index 2140df88..15cf10d6 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOp.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOp.java @@ -21,6 +21,7 @@ import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange; +import com.alipay.oceanbase.rpc.util.ObByteBuf; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; @@ -52,30 +53,54 @@ public byte[] encode() { // 2. encode op flag long flag = singleOpFlag.getValue(); - int len = Serialization.getNeedBytes(flag); - System.arraycopy(Serialization.encodeVi64(flag), 0, bytes, idx, len); - idx += len; + byte[] singleOpFlagLenBytes = Serialization.encodeVi64(flag); + System.arraycopy(singleOpFlagLenBytes, 0, bytes, idx, singleOpFlagLenBytes.length); + idx += singleOpFlagLenBytes.length; // 3. encode single op query if (ObTableOperationType.needEncodeQuery(singleOpType)) { - len = (int) query.getPayloadSize(); - System.arraycopy(query.encode(), 0, bytes, idx, len); - idx += len; + byte[] queryBytes = query.encode(); + System.arraycopy(queryBytes, 0, bytes, idx, queryBytes.length); + idx += queryBytes.length; } // 4. encode entities - len = Serialization.getNeedBytes(entities.size()); - System.arraycopy(Serialization.encodeVi64(entities.size()), 0, bytes, idx, len); - idx += len; + byte[] entitiesSizeBytes = Serialization.encodeVi64(entities.size()); + System.arraycopy(entitiesSizeBytes, 0, bytes, idx, entitiesSizeBytes.length); + idx += entitiesSizeBytes.length; for (ObTableSingleOpEntity entity : entities) { - len = (int) entity.getPayloadSize(); - System.arraycopy(entity.encode(), 0, bytes, idx, len); - idx += len; + byte[] entityBytes = entity.encode(); + System.arraycopy(entityBytes, 0, bytes, idx, entityBytes.length); + idx += entityBytes.length; } return bytes; } + public void encode(ObByteBuf buf) { + // 0. encode header + encodeHeader(buf); + + // 1. encode op type + byte opTypeVal = singleOpType.getByteValue(); + Serialization.encodeI8(buf, opTypeVal); + + // 2. encode op flag + long flag = singleOpFlag.getValue(); + Serialization.encodeVi64(buf, flag); + + // 3. encode single op query + if (ObTableOperationType.needEncodeQuery(singleOpType)) { + query.encode(buf); + } + + // 4. encode entities + Serialization.encodeVi64(buf, entities.size()); + for (ObTableSingleOpEntity entity : entities) { + entity.encode(buf); + } + } + /* * Decode. */ @@ -103,16 +128,19 @@ public Object decode(ByteBuf buf) { */ @Override public long getPayloadContentSize() { - long payloadContentSize = Serialization.getNeedBytes(singleOpType.getByteValue()); - payloadContentSize += Serialization.getNeedBytes(singleOpFlag.getValue()); - if (ObTableOperationType.needEncodeQuery(singleOpType)) { - payloadContentSize += query.getPayloadSize(); - } - payloadContentSize += Serialization.getNeedBytes(entities.size()); - for (ObTableSingleOpEntity entity : entities) { - payloadContentSize += entity.getPayloadSize(); + if (this.payLoadContentSize == -1) { + long payloadContentSize = Serialization.getNeedBytes(singleOpType.getByteValue()); + payloadContentSize += Serialization.getNeedBytes(singleOpFlag.getValue()); + if (ObTableOperationType.needEncodeQuery(singleOpType)) { + payloadContentSize += query.getPayloadSize(); + } + payloadContentSize += Serialization.getNeedBytes(entities.size()); + for (ObTableSingleOpEntity entity : entities) { + payloadContentSize += entity.getPayloadSize(); + } + this.payLoadContentSize = payloadContentSize; } - return payloadContentSize; + return this.payLoadContentSize; } public List getScanRange() { diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpEntity.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpEntity.java index 37e98e38..7cc31b82 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpEntity.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpEntity.java @@ -22,6 +22,7 @@ import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjMeta; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjType; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObTableSerialUtil; +import com.alipay.oceanbase.rpc.util.ObByteBuf; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; @@ -41,9 +42,17 @@ public class ObTableSingleOpEntity extends AbstractPayload { private List propertiesValues = new ArrayList<>(); private boolean ignoreEncodePropertiesColumnNames = false; - + private boolean isHbase = false; public ObTableSingleOpEntity() {} + public boolean isHbase() { + return isHbase; + } + + public void setHbase(boolean hbase) { + isHbase = hbase; + } + /* * Encode. */ @@ -56,33 +65,33 @@ public byte[] encode() { idx = encodeHeader(bytes, idx); // 1. encode rowKey bitmap - int len = Serialization.getNeedBytes(rowKeyBitLen); - System.arraycopy(Serialization.encodeVi64(rowKeyBitLen), 0, bytes, idx, len); - idx += len; + byte[] tmpBytes = Serialization.encodeVi64(rowKeyBitLen); + System.arraycopy(tmpBytes, 0, bytes, idx, tmpBytes.length); + idx += tmpBytes.length; for (byte b : rowKeyBitMap) { System.arraycopy(Serialization.encodeI8(b), 0, bytes, idx, 1); idx += 1; } // 2. encode rowkey - len = Serialization.getNeedBytes(rowkey.size()); - System.arraycopy(Serialization.encodeVi64(rowkey.size()), 0, bytes, idx, len); - idx += len; + tmpBytes = Serialization.encodeVi64(rowkey.size()); + System.arraycopy(tmpBytes, 0, bytes, idx, tmpBytes.length); + idx += tmpBytes.length; for (ObObj obj : rowkey) { - len = ObTableSerialUtil.getEncodedSize(obj); - System.arraycopy(ObTableSerialUtil.encode(obj), 0, bytes, idx, len); - idx += len; + tmpBytes = ObTableSerialUtil.encode(obj); + System.arraycopy(tmpBytes, 0, bytes, idx, tmpBytes.length); + idx += tmpBytes.length; } // 3. encode property bitmap if (ignoreEncodePropertiesColumnNames) { - len = Serialization.getNeedBytes(0L); - System.arraycopy(Serialization.encodeVi64(0L), 0, bytes, idx, len); - idx += len; + tmpBytes = Serialization.encodeVi64(0L); + System.arraycopy(tmpBytes, 0, bytes, idx, tmpBytes.length); + idx += tmpBytes.length; } else { - len = Serialization.getNeedBytes(propertiesBitLen); - System.arraycopy(Serialization.encodeVi64(propertiesBitLen), 0, bytes, idx, len); - idx += len; + tmpBytes = Serialization.encodeVi64(propertiesBitLen); + System.arraycopy(tmpBytes, 0, bytes, idx, tmpBytes.length); + idx += tmpBytes.length; for (byte b : propertiesBitMap) { System.arraycopy(Serialization.encodeI8(b), 0, bytes, idx, 1); idx += 1; @@ -90,18 +99,60 @@ public byte[] encode() { } // 4. encode properties values - len = Serialization.getNeedBytes(propertiesValues.size()); - System.arraycopy(Serialization.encodeVi64(propertiesValues.size()), 0, bytes, idx, len); - idx += len; + tmpBytes = Serialization.encodeVi64(propertiesValues.size()); + System.arraycopy(tmpBytes, 0, bytes, idx, tmpBytes.length); + idx += tmpBytes.length; for (ObObj obj : propertiesValues) { - len = ObTableSerialUtil.getEncodedSize(obj); - System.arraycopy(ObTableSerialUtil.encode(obj), 0, bytes, idx, len); - idx += len; + tmpBytes = ObTableSerialUtil.encode(obj); + System.arraycopy(tmpBytes, 0, bytes, idx, tmpBytes.length); + idx += tmpBytes.length; } return bytes; } + public void encode(ObByteBuf buf) { + // 0. encode header + encodeHeader(buf); + + // 1. encode rowKey bitmap + if (isHbase) { + Serialization.encodeVi64(buf, 3L); + Serialization.encodeI8(buf, (byte) 0b00000111); + } else { + Serialization.encodeVi64(buf, rowKeyBitLen); + for (byte b : rowKeyBitMap) { + Serialization.encodeI8(buf, b); + } + } + + + // 2. encode rowkey + Serialization.encodeVi64(buf, rowkey.size()); + for (ObObj obj : rowkey) { + ObTableSerialUtil.encode(buf, obj); + } + + // 3. encode property bitmap + if (ignoreEncodePropertiesColumnNames) { + Serialization.encodeVi64(buf,0L); + } else if (isHbase) { + Serialization.encodeVi64(buf, 1); + Serialization.encodeI8(buf, (byte) 0b00000001); + } else { + Serialization.encodeVi64(buf, propertiesBitLen); + for (byte b : propertiesBitMap) { + Serialization.encodeI8(buf, b); + } + } + + // 4. encode properties values + Serialization.encodeVi64(buf, propertiesValues.size()); + for (ObObj obj : propertiesValues) { + ObTableSerialUtil.encode(buf, obj); + } + } + private byte[] parseBitMap(long bitLen, List aggColumnNames, List columnNames, ByteBuf buf) { byte[] bitMap = new byte[(int) Math.ceil(bitLen / 8.0)]; if (bitLen == 0) { @@ -162,29 +213,39 @@ public Object decode(ByteBuf buf) { */ @Override public long getPayloadContentSize() { - long payloadContentSize = 0; - - payloadContentSize += Serialization.getNeedBytes(rowKeyBitLen); - payloadContentSize += rowKeyBitMap.length; + if (this.payLoadContentSize == -1) { + long payloadContentSize = 0; + if (isHbase) { + payloadContentSize += Serialization.getNeedBytes(3L); + payloadContentSize += 1; + } else { + payloadContentSize += Serialization.getNeedBytes(rowKeyBitLen); + payloadContentSize += rowKeyBitMap.length; + } - payloadContentSize += Serialization.getNeedBytes(rowkey.size()); - for (ObObj obj : rowkey) { - payloadContentSize += ObTableSerialUtil.getEncodedSize(obj); - } + payloadContentSize += Serialization.getNeedBytes(rowkey.size()); + for (ObObj obj : rowkey) { + payloadContentSize += ObTableSerialUtil.getEncodedSize(obj); + } - if (ignoreEncodePropertiesColumnNames) { - payloadContentSize += Serialization.getNeedBytes(0L); - } else { - payloadContentSize += Serialization.getNeedBytes(propertiesBitLen); - payloadContentSize += propertiesBitMap.length; - } + if (ignoreEncodePropertiesColumnNames) { + payloadContentSize += Serialization.getNeedBytes(0L); + } else if (isHbase) { + payloadContentSize += Serialization.getNeedBytes(1L); + payloadContentSize += 1; + } else { + payloadContentSize += Serialization.getNeedBytes(propertiesBitLen); + payloadContentSize += propertiesBitMap.length; + } - payloadContentSize += Serialization.getNeedBytes(propertiesValues.size()); - for (ObObj obj : propertiesValues) { - payloadContentSize += ObTableSerialUtil.getEncodedSize(obj); + payloadContentSize += Serialization.getNeedBytes(propertiesValues.size()); + for (ObObj obj : propertiesValues) { + payloadContentSize += ObTableSerialUtil.getEncodedSize(obj); + } + this.payLoadContentSize = payloadContentSize; } - return payloadContentSize; + return this.payLoadContentSize; } public static boolean areArraysSameLengthOrBothNull(Object[] a, Object[] b) { diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpQuery.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpQuery.java index 9a6215fc..7a7fe9dc 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpQuery.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpQuery.java @@ -20,16 +20,104 @@ import com.alipay.oceanbase.rpc.ObGlobal; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObTableSerialUtil; -import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObHTableFilter; -import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange; -import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObScanOrder; -import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.*; +import com.alipay.oceanbase.rpc.table.ObHBaseParams; import com.alipay.oceanbase.rpc.table.ObKVParams; +import com.alipay.oceanbase.rpc.util.ObByteBuf; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; import java.util.*; +class StaticObHTableFilter { + public static ObHTableFilter hFilter = new ObHTableFilter(); + public static byte[] HFilterBytes = null; + static { + hFilter.setMaxVersions(1); + HFilterBytes = hFilter.encode(); + } + + public static byte[] getHfilterBytes() { + if (HFilterBytes == null) { + hFilter.setMaxVersions(1); + HFilterBytes = hFilter.encode(); + } + return HFilterBytes; + } +} + +class StaticObKvParams { + public static ObKVParams kvParams = new ObKVParams(); + public static ObHBaseParams hBaseParams = new ObHBaseParams(); + public static byte[] kvParamBytes = null; + static { + kvParams.setObParamsBase(hBaseParams); + kvParamBytes = kvParams.encode(); + } + public static byte[] getkvParamBytes() { + if (kvParamBytes == null) { + kvParams.setObParamsBase(hBaseParams); + kvParamBytes = kvParams.encode(); + } + return kvParamBytes; + } + +} + +class StaticHbaseColumns { + public static byte[] selectColumnSize = Serialization.encodeVi64(4L); + public static byte[] RowkeyColumSize = Serialization.encodeVi64(3L); + public static byte[] VColumSize = Serialization.encodeVi64(1L); + public static byte[] KQTVBytes = null; + public static byte[] KQTBytes = null; + public static byte[] VBytes = null; + public static byte[] getKQTVBytes() { + if (KQTVBytes == null) { + byte[] K = Serialization.encodeVString("K"); + byte[] Q = Serialization.encodeVString("Q"); + byte[] T = Serialization.encodeVString("T"); + byte[] V = Serialization.encodeVString("V"); + KQTVBytes = new byte[K.length + Q.length + T.length + V.length]; + ObByteBuf buf = new ObByteBuf(KQTVBytes); + buf.writeBytes(K); + buf.writeBytes(Q); + buf.writeBytes(T); + buf.writeBytes(V); + } + return KQTVBytes; + } + public static byte[] getKQTBytes() { + if (KQTBytes == null) { + byte[] K = Serialization.encodeVString("K"); + byte[] Q = Serialization.encodeVString("Q"); + byte[] T = Serialization.encodeVString("T"); + KQTBytes = new byte[K.length + Q.length + T.length]; + ObByteBuf buf = new ObByteBuf(KQTBytes); + buf.writeBytes(K); + buf.writeBytes(Q); + buf.writeBytes(T); + } + return KQTBytes; + } + public static byte[] getVBytes() { + if (VBytes == null) { + byte[] V = Serialization.encodeVString("V"); + VBytes = new byte[V.length]; + ObByteBuf buf = new ObByteBuf(VBytes); + buf.writeBytes(V); + } + return VBytes; + } +} + +class StaticIndexName { + public static byte[] indexName = Serialization.encodeVString("PRIMARY"); +} + +class StaticEmptyStr { + public static byte[] EmptyStr = Serialization.encodeVString(""); +} + public class ObTableSingleOpQuery extends ObTableQuery { private List scanRangeColumns = new ArrayList<>(); private byte[] scanRangeBitMap = null; @@ -48,82 +136,126 @@ public byte[] encode() { idx = encodeHeader(bytes, idx); // 1. encode index name - int len = Serialization.getNeedBytes(indexName); - System.arraycopy(Serialization.encodeVString(indexName), 0, bytes, idx, len); - idx += len; + byte[] indexNameBytes = Serialization.encodeVString(indexName); + System.arraycopy(indexNameBytes, 0, bytes, idx, indexNameBytes.length); + idx += indexNameBytes.length; // 2. encode scan ranges columns - len = Serialization.getNeedBytes(scanRangeBitLen); - System.arraycopy(Serialization.encodeVi64(scanRangeBitLen), 0, bytes, idx, len); - idx += len; + byte[] scanRangeBitLenBytes = Serialization.encodeVi64(scanRangeBitLen); + System.arraycopy(scanRangeBitLenBytes, 0, bytes, idx, scanRangeBitLenBytes.length); + idx += scanRangeBitLenBytes.length; for (byte b : scanRangeBitMap) { System.arraycopy(Serialization.encodeI8(b), 0, bytes, idx, 1); idx += 1; } // 3. encode scan ranges - len = Serialization.getNeedBytes(keyRanges.size()); - System.arraycopy(Serialization.encodeVi64(keyRanges.size()), 0, bytes, idx, len); - idx += len; + byte[] keyRangesBytes = Serialization.encodeVi64(keyRanges.size()); + System.arraycopy(keyRangesBytes, 0, bytes, idx, keyRangesBytes.length); + idx += keyRangesBytes.length; for (ObNewRange range : keyRanges) { - len = ObTableSerialUtil.getEncodedSize(range); - System.arraycopy(ObTableSerialUtil.encode(range), 0, bytes, idx, len); - idx += len; + byte[] rangeBytes = ObTableSerialUtil.encode(range); + System.arraycopy(rangeBytes, 0, bytes, idx, rangeBytes.length); + idx += rangeBytes.length; } // 4. encode filter string - len = Serialization.getNeedBytes(filterString); - System.arraycopy(Serialization.encodeVString(filterString), 0, bytes, idx, len); - idx += len; + byte[] filterStringBytes = Serialization.encodeVString(filterString); + System.arraycopy(filterStringBytes, 0, bytes, idx, filterStringBytes.length); + idx += filterStringBytes.length; // encode HBase Batch Get required if (isHbaseQuery && ObGlobal.isHBaseBatchGetSupport()) { - len = Serialization.getNeedBytes(selectColumns.size()); - System.arraycopy(Serialization.encodeVi64(selectColumns.size()), 0, bytes, idx, len); - idx += len; + byte[] selectColumnsLenByets = Serialization.encodeVi64(selectColumns.size()); + System.arraycopy(selectColumnsLenByets, 0, bytes, idx, selectColumnsLenByets.length); + idx += selectColumnsLenByets.length; for (String selectColumn : selectColumns) { - len = Serialization.getNeedBytes(selectColumn); - System.arraycopy(Serialization.encodeVString(selectColumn), 0, bytes, idx, len); - idx += len; + byte[] selectColumnLenBytes = Serialization.encodeVString(selectColumn); + System.arraycopy(selectColumnLenBytes, 0, bytes, idx, selectColumnLenBytes.length); + idx += selectColumnLenBytes.length; } System.arraycopy(Serialization.encodeI8(scanOrder.getByteValue()), 0, bytes, idx, 1); idx += 1; - len = (int) hTableFilter.getPayloadSize(); - System.arraycopy(hTableFilter.encode(), 0, bytes, idx, len); - idx += len; + byte[] hTableFilterLenBytes = hTableFilter.encode(); + System.arraycopy(hTableFilterLenBytes, 0, bytes, idx, hTableFilterLenBytes.length); + idx += hTableFilterLenBytes.length; if (obKVParams != null) { - len = (int) obKVParams.getPayloadSize(); - System.arraycopy(obKVParams.encode(), 0, bytes, idx, len); - idx += len; + byte[] obKVParamsBytes = obKVParams.encode(); + System.arraycopy(obKVParamsBytes, 0, bytes, idx, obKVParamsBytes.length); } else { - len = HTABLE_DUMMY_BYTES.length; - System.arraycopy(HTABLE_DUMMY_BYTES, 0, bytes, idx, len); - idx += len; + System.arraycopy(HTABLE_DUMMY_BYTES, 0, bytes, idx, HTABLE_DUMMY_BYTES.length); } } return bytes; } + public void encode(ObByteBuf buf) { + // 0. encode header + encodeHeader(buf); + + // 1. encode index name +// Serialization.encodeVString(buf, indexName); + buf.writeBytes(StaticIndexName.indexName); + + // 2. encode scan ranges columns + if (isHbaseQuery && ObGlobal.isHBaseBatchGetSupport()) { + Serialization.encodeVi64(buf, 3L); + Serialization.encodeI8(buf, (byte) 0); + } else { + Serialization.encodeVi64(buf, scanRangeBitLen); + for (byte b : scanRangeBitMap) { + Serialization.encodeI8(buf, b); + } + } + + + // 3. encode scan ranges + Serialization.encodeVi64(buf, keyRanges.size()); + for (ObNewRange range : keyRanges) { + ObTableSerialUtil.encode(buf, range); + } + + // 4. encode filter string +// Serialization.encodeVString(buf, filterString); + buf.writeBytes(StaticEmptyStr.EmptyStr); + + // encode HBase Batch Get required + if (isHbaseQuery && ObGlobal.isHBaseBatchGetSupport()) { + buf.writeBytes(StaticHbaseColumns.selectColumnSize); + buf.writeBytes(StaticHbaseColumns.getKQTVBytes()); + Serialization.encodeI8(buf, scanOrder.getByteValue()); + + buf.writeBytes(StaticObHTableFilter.getHfilterBytes()); + + if (obKVParams != null) { +// obKVParams.encode(buf); + buf.writeBytes(StaticObKvParams.getkvParamBytes()); + } else { + buf.writeBytes(HTABLE_DUMMY_BYTES); + } + } + } + /* * Decode. */ @Override public Object decode(ByteBuf buf) { // 0. decode header - super.decode(buf); - + Serialization.decodeVi64(buf); + Serialization.decodeVi64(buf); // 1. decode tablet id this.indexName = Serialization.decodeVString(buf); // 2. decode scan ranges columns scanRangeBitLen = Serialization.decodeVi64(buf); - scanRangeBitMap = new byte[(int) Math.ceil(scanRangeBitLen / 8.0)]; + scanRangeBitMap = new byte[(int)(scanRangeBitLen / 8.0) + 1]; for (int i = 0; i < scanRangeBitMap.length; i++) { scanRangeBitMap[i] = Serialization.decodeI8(buf); - for (int j = 0; j < 8; i++) { + for (int j = 0; j < 8; j++) { if ((scanRangeBitMap[i] & (1 << j)) != 0) { if (i * 8 + j < aggColumnNames.size()) { scanRangeColumns.add(aggColumnNames.get(i * 8 + j)); @@ -151,40 +283,47 @@ public Object decode(ByteBuf buf) { */ @Override public long getPayloadContentSize() { - long payloadContentSize = 0; + if (this.payLoadContentSize == -1) { + long payloadContentSize = 0; + if (isHbaseQuery && ObGlobal.isHBaseBatchGetSupport()) { + payloadContentSize += Serialization.getNeedBytes(3L); + payloadContentSize += 1; + } else { + payloadContentSize += Serialization.getNeedBytes(scanRangeBitLen); + payloadContentSize += scanRangeBitMap.length; + } - payloadContentSize += Serialization.getNeedBytes(scanRangeBitLen); - payloadContentSize += scanRangeBitMap.length; - payloadContentSize += Serialization.getNeedBytes(keyRanges.size()); - for (ObNewRange range : keyRanges) { - payloadContentSize += ObTableSerialUtil.getEncodedSize(range); - } + payloadContentSize += Serialization.getNeedBytes(keyRanges.size()); + for (ObNewRange range : keyRanges) { + payloadContentSize += ObTableSerialUtil.getEncodedSize(range); + } - payloadContentSize += Serialization.getNeedBytes(indexName); - payloadContentSize += Serialization.getNeedBytes(filterString); + payloadContentSize += StaticIndexName.indexName.length; + payloadContentSize += StaticEmptyStr.EmptyStr.length; - // calculate part required by HBase Batch Get - if (isHbaseQuery && ObGlobal.isHBaseBatchGetSupport()) { - payloadContentSize += Serialization.getNeedBytes(selectColumns.size()); - for (String selectColumn : selectColumns) { - payloadContentSize += Serialization.getNeedBytes(selectColumn); - } - payloadContentSize += 1; // scanOrder + // calculate part required by HBase Batch Get + if (isHbaseQuery && ObGlobal.isHBaseBatchGetSupport()) { + payloadContentSize += StaticHbaseColumns.selectColumnSize.length; + payloadContentSize += StaticHbaseColumns.getKQTVBytes().length; + payloadContentSize += 1; // scanOrder - if (isHbaseQuery) { - payloadContentSize += hTableFilter.getPayloadSize(); - } else { - payloadContentSize += HTABLE_DUMMY_BYTES.length; - } - if (isHbaseQuery && obKVParams != null) { - payloadContentSize += obKVParams.getPayloadSize(); - } else { - payloadContentSize += HTABLE_DUMMY_BYTES.length; + if (isHbaseQuery) { + payloadContentSize += StaticObHTableFilter.getHfilterBytes().length; + } else { + payloadContentSize += HTABLE_DUMMY_BYTES.length; + } + if (isHbaseQuery && obKVParams != null) { +// payloadContentSize += obKVParams.getPayloadSize(); + payloadContentSize += StaticObKvParams.getkvParamBytes().length; + } else { + payloadContentSize += HTABLE_DUMMY_BYTES.length; + } } + this.payLoadContentSize = payloadContentSize; } - return payloadContentSize; + return this.payLoadContentSize; } // Support class, which is used for column name sorted diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOp.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOp.java index 0cf7848a..78229099 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOp.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOp.java @@ -19,6 +19,7 @@ import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; import com.alipay.oceanbase.rpc.protocol.payload.Constants; +import com.alipay.oceanbase.rpc.util.ObByteBuf; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; @@ -57,23 +58,40 @@ public byte[] encode() { idx += 8; // 2. encode option flag - int len = Serialization.getNeedBytes(optionFlag.getValue()); - System.arraycopy(Serialization.encodeVi64(optionFlag.getValue()), 0, bytes, idx, len); - idx += len; - - // 4. encode Operation - len = Serialization.getNeedBytes(singleOperations.size()); - System.arraycopy(Serialization.encodeVi64(singleOperations.size()), 0, bytes, idx, len); - idx += len; + byte[] optionFlagLenBytes = Serialization.encodeVi64(optionFlag.getValue()); + System.arraycopy(optionFlagLenBytes, 0, bytes, idx, optionFlagLenBytes.length); + idx += optionFlagLenBytes.length; + + // 3. encode Operation + byte[] singleOperationsLenBytes = Serialization.encodeVi64(singleOperations.size()); + System.arraycopy(singleOperationsLenBytes, 0, bytes, idx, singleOperationsLenBytes.length); + idx += singleOperationsLenBytes.length; for (ObTableSingleOp singleOperation : singleOperations) { - len = (int) singleOperation.getPayloadSize(); - System.arraycopy(singleOperation.encode(), 0, bytes, idx, len); - idx += len; + byte[] singleOperationLenBytes = singleOperation.encode(); + System.arraycopy(singleOperationLenBytes, 0, bytes, idx, singleOperationLenBytes.length); + idx += singleOperationLenBytes.length; } return bytes; } + public void encode(ObByteBuf buf) { + encodeHeader(buf); + + // 1. encode tablet id + Serialization.encodeI64(buf, tabletId); + + // 2. encode option flag + Serialization.encodeVi64(buf, optionFlag.getValue()); + + // 3. encode Operation + Serialization.encodeVi64(buf, singleOperations.size()); + for (ObTableSingleOp singleOperation : singleOperations) { + singleOperation.encode(buf); + } + } + + /* * Decode. */ @@ -105,13 +123,16 @@ public Object decode(ByteBuf buf) { */ @Override public long getPayloadContentSize() { - long payloadContentSize = 0; - payloadContentSize += Serialization.getNeedBytes(singleOperations.size()); - for (ObTableSingleOp operation : singleOperations) { - payloadContentSize += operation.getPayloadSize(); - } + if (this.payLoadContentSize == -1) { + long payloadContentSize = 0; + payloadContentSize += Serialization.getNeedBytes(singleOperations.size()); + for (ObTableSingleOp operation : singleOperations) { + payloadContentSize += operation.getPayloadSize(); + } - return payloadContentSize + tabletIdSize + Serialization.getNeedBytes(optionFlag.getValue()); + this.payLoadContentSize = payloadContentSize + tabletIdSize + Serialization.getNeedBytes(optionFlag.getValue()); + } + return this.payLoadContentSize; } /* diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObHTableFilter.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObHTableFilter.java index a4ace55c..48075d79 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObHTableFilter.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObHTableFilter.java @@ -18,6 +18,7 @@ package com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query; import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; +import com.alipay.oceanbase.rpc.util.ObByteBuf; import com.alipay.oceanbase.rpc.util.ObBytesString; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; @@ -61,46 +62,71 @@ public byte[] encode() { int idx = 0; // 0. encode header - int headerLen = (int) getObUniVersionHeaderLength(getVersion(), getPayloadContentSize()); - System.arraycopy(encodeObUniVersionHeader(getVersion(), getPayloadContentSize()), 0, bytes, - idx, headerLen); - idx += headerLen; + byte[] headerBytes = encodeObUniVersionHeader(getVersion(), getPayloadContentSize()); + System.arraycopy(headerBytes, 0, bytes, + idx, headerBytes.length); + idx += headerBytes.length; // 1. encode System.arraycopy(Serialization.encodeI8(isValid ? (byte) 1 : (byte) 0), 0, bytes, idx, 1); idx++; - int len = Serialization.getNeedBytes(selectColumnQualifier.size()); + byte[] selectColumnQualifierBytes = Serialization.encodeVi64(selectColumnQualifier.size()); System - .arraycopy(Serialization.encodeVi64(selectColumnQualifier.size()), 0, bytes, idx, len); - idx += len; + .arraycopy(selectColumnQualifierBytes, 0, bytes, idx, selectColumnQualifierBytes.length); + idx += selectColumnQualifierBytes.length; for (ObBytesString q : selectColumnQualifier) { - len = Serialization.getNeedBytes(q); - System.arraycopy(Serialization.encodeBytesString(q), 0, bytes, idx, len); - idx += len; + byte[] QualifierBytes = Serialization.encodeBytesString(q); + System.arraycopy(QualifierBytes, 0, bytes, idx, QualifierBytes.length); + idx += QualifierBytes.length; } - len = Serialization.getNeedBytes(minStamp); - System.arraycopy(Serialization.encodeVi64(minStamp), 0, bytes, idx, len); - idx += len; - len = Serialization.getNeedBytes(maxStamp); - System.arraycopy(Serialization.encodeVi64(maxStamp), 0, bytes, idx, len); - idx += len; - len = Serialization.getNeedBytes(maxVersions); - System.arraycopy(Serialization.encodeVi32(maxVersions), 0, bytes, idx, len); - idx += len; - len = Serialization.getNeedBytes(limitPerRowPerCf); - System.arraycopy(Serialization.encodeVi32(limitPerRowPerCf), 0, bytes, idx, len); - idx += len; - len = Serialization.getNeedBytes(offsetPerRowPerCf); - System.arraycopy(Serialization.encodeVi32(offsetPerRowPerCf), 0, bytes, idx, len); - idx += len; - len = Serialization.getNeedBytes(filterString); - System.arraycopy(Serialization.encodeBytesString(filterString), 0, bytes, idx, len); - idx += len; + byte[] minStampBytes = Serialization.encodeVi64(minStamp); + System.arraycopy(minStampBytes, 0, bytes, idx, minStampBytes.length); + idx += minStampBytes.length; + byte[] maxStampBytes = Serialization.encodeVi64(maxStamp); + System.arraycopy(maxStampBytes, 0, bytes, idx, maxStampBytes.length); + idx += maxStampBytes.length; + byte[] maxVersionsBytes = Serialization.encodeVi32(maxVersions); + System.arraycopy(maxVersionsBytes, 0, bytes, idx, maxVersionsBytes.length); + idx += maxVersionsBytes.length; + byte[] limitPerRowPerCfBytes = Serialization.encodeVi32(limitPerRowPerCf); + System.arraycopy(Serialization.encodeVi32(limitPerRowPerCf), 0, bytes, idx, limitPerRowPerCfBytes.length); + idx += limitPerRowPerCfBytes.length; + byte[] offsetPerRowPerCfBytes = Serialization.encodeVi32(offsetPerRowPerCf); + System.arraycopy(offsetPerRowPerCfBytes, 0, bytes, idx, offsetPerRowPerCfBytes.length); + idx += offsetPerRowPerCfBytes.length; + byte[] filterStringBytes = Serialization.encodeBytesString(filterString); + System.arraycopy(filterStringBytes, 0, bytes, idx, filterStringBytes.length); + idx += filterStringBytes.length; return bytes; } + public void encode(ObByteBuf buf) { + // 0. encode header + encodeObUniVersionHeader(buf, getVersion(), getPayloadContentSize()); + if (!isValid) { + Serialization.encodeI8(buf, (byte) 0); + byte[] dummy = new byte[20]; + buf.writeBytes(dummy); + } else { + // 1. encode + Serialization.encodeI8(buf, isValid ? (byte) 1 : (byte) 0); + Serialization.encodeVi64(buf, selectColumnQualifier.size()); + + for (ObBytesString q : selectColumnQualifier) { + Serialization.encodeBytesString(buf, q); + } + + Serialization.encodeVi64(buf, minStamp); + Serialization.encodeVi64(buf, maxStamp); + Serialization.encodeVi32(buf, maxVersions); + Serialization.encodeVi32(buf, limitPerRowPerCf); + Serialization.encodeVi32(buf, offsetPerRowPerCf); + Serialization.encodeBytesString(buf, filterString); + } + } + /* * Decode. */ @@ -130,21 +156,26 @@ public Object decode(ByteBuf buf) { */ @Override public long getPayloadContentSize() { - long contentSize = 0; - contentSize += 1; // isValid - - contentSize += Serialization.getNeedBytes(selectColumnQualifier.size()); - for (ObBytesString q : selectColumnQualifier) { - contentSize += Serialization.getNeedBytes(q); + if (this.payLoadContentSize == -1) { + long contentSize = 0; + contentSize += 1; // isValid + if (!isValid) { + contentSize += 20; // dummy + } else { + contentSize += Serialization.getNeedBytes(selectColumnQualifier.size()); // 1 + for (ObBytesString q : selectColumnQualifier) { + contentSize += Serialization.getNeedBytes(q); + } + contentSize += Serialization.getNeedBytes(minStamp); // 1 + contentSize += Serialization.getNeedBytes(maxStamp); // 9 + contentSize += Serialization.getNeedBytes(maxVersions); // 1 + contentSize += Serialization.getNeedBytes(limitPerRowPerCf); // 5 + contentSize += Serialization.getNeedBytes(offsetPerRowPerCf); // 1 + contentSize += Serialization.getNeedBytes(filterString); // 2 + } + this.payLoadContentSize = contentSize; } - - contentSize += Serialization.getNeedBytes(minStamp); - contentSize += Serialization.getNeedBytes(maxStamp); - contentSize += Serialization.getNeedBytes(maxVersions); - contentSize += Serialization.getNeedBytes(limitPerRowPerCf); - contentSize += Serialization.getNeedBytes(offsetPerRowPerCf); - contentSize += Serialization.getNeedBytes(filterString); - return contentSize; + return this.payLoadContentSize; } /* diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java index 190fa736..1ef9f8c0 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java @@ -153,90 +153,89 @@ public byte[] encode() { int idx = 0; // 0. encode header - int headerLen = (int) getObUniVersionHeaderLength(getVersion(), getPayloadContentSize()); - System.arraycopy(encodeObUniVersionHeader(getVersion(), getPayloadContentSize()), 0, bytes, - idx, headerLen); - idx += headerLen; + byte[] headerBytes = encodeObUniVersionHeader(getVersion(), getPayloadContentSize()); + System.arraycopy(headerBytes, 0, bytes, + idx, headerBytes.length); + idx += headerBytes.length; // 1. encode - int len = Serialization.getNeedBytes(keyRanges.size()); - System.arraycopy(Serialization.encodeVi64(keyRanges.size()), 0, bytes, idx, len); - idx += len; + byte[] keyRangesByte = Serialization.encodeVi64(keyRanges.size()); + System.arraycopy(keyRangesByte, 0, bytes, idx, keyRangesByte.length); + idx += keyRangesByte.length; for (ObNewRange keyRange : keyRanges) { - len = keyRange.getEncodedSize(); - System.arraycopy(keyRange.encode(), 0, bytes, idx, len); - idx += len; + byte[] keyRangeBytes = keyRange.encode(); + System.arraycopy(keyRangeBytes, 0, bytes, idx, keyRangeBytes.length); + idx += keyRangeBytes.length; } - len = Serialization.getNeedBytes(selectColumns.size()); - System.arraycopy(Serialization.encodeVi64(selectColumns.size()), 0, bytes, idx, len); - idx += len; + byte[] selectColumnsBytes = Serialization.encodeVi64(selectColumns.size()); + System.arraycopy(selectColumnsBytes, 0, bytes, idx, selectColumnsBytes.length); + idx += selectColumnsBytes.length; for (String selectColumn : selectColumns) { - len = Serialization.getNeedBytes(selectColumn); - System.arraycopy(Serialization.encodeVString(selectColumn), 0, bytes, idx, len); - idx += len; + byte[] selectColumnBytes = Serialization.encodeVString(selectColumn); + System.arraycopy(selectColumnBytes, 0, bytes, idx, selectColumnBytes.length); + idx += selectColumnBytes.length; } - len = Serialization.getNeedBytes(filterString); - System.arraycopy(Serialization.encodeVString(filterString), 0, bytes, idx, len); - idx += len; + byte[] filterStringBytes = Serialization.encodeVString(filterString); + System.arraycopy(filterStringBytes, 0, bytes, idx, filterStringBytes.length); + idx += filterStringBytes.length; - len = Serialization.getNeedBytes(limit); - System.arraycopy(Serialization.encodeVi32(limit), 0, bytes, idx, len); - idx += len; - len = Serialization.getNeedBytes(offset); - System.arraycopy(Serialization.encodeVi32(offset), 0, bytes, idx, len); - idx += len; + byte[] limitBytes = Serialization.encodeVi32(limit); + System.arraycopy(limitBytes, 0, bytes, idx, limitBytes.length); + idx += limitBytes.length; + byte[] offsetBytes = Serialization.encodeVi32(offset); + System.arraycopy(offsetBytes, 0, bytes, idx, offsetBytes.length); + idx += offsetBytes.length; System.arraycopy(Serialization.encodeI8(scanOrder.getByteValue()), 0, bytes, idx, 1); idx += 1; - len = Serialization.getNeedBytes(indexName); - System.arraycopy(Serialization.encodeVString(indexName), 0, bytes, idx, len); - idx += len; + byte[] indexNameBytes = Serialization.encodeVString(indexName); + System.arraycopy(indexNameBytes, 0, bytes, idx, indexNameBytes.length); + idx += indexNameBytes.length; - len = Serialization.getNeedBytes(batchSize); - System.arraycopy(Serialization.encodeVi32(batchSize), 0, bytes, idx, len); - idx += len; - len = Serialization.getNeedBytes(maxResultSize); - System.arraycopy(Serialization.encodeVi64(maxResultSize), 0, bytes, idx, len); - idx += len; + byte[] batchSizeBytes = Serialization.encodeVi32(batchSize); + System.arraycopy(batchSizeBytes, 0, bytes, idx, batchSizeBytes.length); + idx += batchSizeBytes.length; + byte[] maxResultSizeBytes = Serialization.encodeVi64(maxResultSize); + System.arraycopy(maxResultSizeBytes, 0, bytes, idx, maxResultSizeBytes.length); + idx += maxResultSizeBytes.length; if (isHbaseQuery) { - len = (int) hTableFilter.getPayloadSize(); - System.arraycopy(hTableFilter.encode(), 0, bytes, idx, len); + byte[] hTableFilterBytes = hTableFilter.encode(); + System.arraycopy(hTableFilterBytes, 0, bytes, idx, hTableFilterBytes.length); + idx += hTableFilterBytes.length; } else { - len = HTABLE_DUMMY_BYTES.length; - System.arraycopy(HTABLE_DUMMY_BYTES, 0, bytes, idx, len); + System.arraycopy(HTABLE_DUMMY_BYTES, 0, bytes, idx, HTABLE_DUMMY_BYTES.length); + idx += HTABLE_DUMMY_BYTES.length; } - idx += len; - len = Serialization.getNeedBytes(scanRangeColumns.size()); - System.arraycopy(Serialization.encodeVi64(scanRangeColumns.size()), 0, bytes, idx, len); - idx += len; + byte[] scanRangeColumnsBytes = Serialization.encodeVi64(scanRangeColumns.size()); + System.arraycopy(scanRangeColumnsBytes, 0, bytes, idx, scanRangeColumnsBytes.length); + idx += scanRangeColumnsBytes.length; for (String keyRangeColumn : scanRangeColumns) { - len = Serialization.getNeedBytes(keyRangeColumn); - System.arraycopy(Serialization.encodeVString(keyRangeColumn), 0, bytes, idx, len); - idx += len; + byte[] keyRangeColumnBytes = Serialization.encodeVString(keyRangeColumn); + System.arraycopy(keyRangeColumnBytes, 0, bytes, idx, keyRangeColumnBytes.length); + idx += keyRangeColumnBytes.length; } //Aggregation - len = Serialization.getNeedBytes(aggregations.size()); - System.arraycopy(Serialization.encodeVi64(aggregations.size()), 0, bytes, idx, len); - idx += len; + byte[] aggregationsSizeBytes = Serialization.encodeVi64(aggregations.size()); + System.arraycopy(aggregationsSizeBytes, 0, bytes, idx, aggregationsSizeBytes.length); + idx += aggregationsSizeBytes.length; for (ObTableAggregationSingle obTableAggregationSingle : aggregations) { - len = (int) obTableAggregationSingle.getPayloadSize(); - System.arraycopy(obTableAggregationSingle.encode(), 0, bytes, idx, len); - idx += len; + byte[] obTableAggregationSingleBytes = obTableAggregationSingle.encode(); + System.arraycopy(obTableAggregationSingleBytes, 0, bytes, idx, obTableAggregationSingleBytes.length); + idx += obTableAggregationSingleBytes.length; } if (obKVParams != null) { // hbaseQuery or FTSQuery will use obKVParams - len = (int) obKVParams.getPayloadSize(); - System.arraycopy(obKVParams.encode(), 0, bytes, idx, len); - idx += len; + byte[] obKVParamsBytes = obKVParams.encode(); + System.arraycopy(obKVParamsBytes, 0, bytes, idx, obKVParamsBytes.length); + idx += obKVParamsBytes.length; } else { - len = HTABLE_DUMMY_BYTES.length; - System.arraycopy(HTABLE_DUMMY_BYTES, 0, bytes, idx, len); - idx += len; + System.arraycopy(HTABLE_DUMMY_BYTES, 0, bytes, idx, HTABLE_DUMMY_BYTES.length); + idx += HTABLE_DUMMY_BYTES.length; } return bytes; @@ -312,44 +311,47 @@ public Object decode(ByteBuf buf) { */ @Override public long getPayloadContentSize() { - long contentSize = 0; - contentSize += Serialization.getNeedBytes(keyRanges.size()); - for (ObNewRange obNewRange : keyRanges) { - contentSize += obNewRange.getEncodedSize(); - } - contentSize += Serialization.getNeedBytes(selectColumns.size()); - for (String selectColumn : selectColumns) { - contentSize += Serialization.getNeedBytes(selectColumn); - } - contentSize += Serialization.getNeedBytes(filterString); - contentSize += Serialization.getNeedBytes(limit); - contentSize += Serialization.getNeedBytes(offset); - contentSize += 1; // scanOrder - contentSize += Serialization.getNeedBytes(indexName); - - contentSize += Serialization.getNeedBytes(batchSize); - contentSize += Serialization.getNeedBytes(maxResultSize); - - if (isHbaseQuery) { - contentSize += hTableFilter.getPayloadSize(); - } else { - contentSize += HTABLE_DUMMY_BYTES.length; - } - if (obKVParams != null) { - contentSize += obKVParams.getPayloadSize(); - } else { - contentSize += HTABLE_DUMMY_BYTES.length; - } - contentSize += Serialization.getNeedBytes(scanRangeColumns.size()); - for (String scanRangeColumn : scanRangeColumns) { - contentSize += Serialization.getNeedBytes(scanRangeColumn); - } + if (this.payLoadContentSize == -1) { + long contentSize = 0; + contentSize += Serialization.getNeedBytes(keyRanges.size()); + for (ObNewRange obNewRange : keyRanges) { + contentSize += obNewRange.getEncodedSize(); + } + contentSize += Serialization.getNeedBytes(selectColumns.size()); + for (String selectColumn : selectColumns) { + contentSize += Serialization.getNeedBytes(selectColumn); + } + contentSize += Serialization.getNeedBytes(filterString); + contentSize += Serialization.getNeedBytes(limit); + contentSize += Serialization.getNeedBytes(offset); + contentSize += 1; // scanOrder + contentSize += Serialization.getNeedBytes(indexName); + + contentSize += Serialization.getNeedBytes(batchSize); + contentSize += Serialization.getNeedBytes(maxResultSize); + + if (isHbaseQuery) { + contentSize += hTableFilter.getPayloadSize(); + } else { + contentSize += HTABLE_DUMMY_BYTES.length; + } + if (obKVParams != null) { + contentSize += obKVParams.getPayloadSize(); + } else { + contentSize += HTABLE_DUMMY_BYTES.length; + } + contentSize += Serialization.getNeedBytes(scanRangeColumns.size()); + for (String scanRangeColumn : scanRangeColumns) { + contentSize += Serialization.getNeedBytes(scanRangeColumn); + } - contentSize += Serialization.getNeedBytes(aggregations.size()); - for (ObTableAggregationSingle obTableAggregationSingle : aggregations) { - contentSize += obTableAggregationSingle.getPayloadSize(); + contentSize += Serialization.getNeedBytes(aggregations.size()); + for (ObTableAggregationSingle obTableAggregationSingle : aggregations) { + contentSize += obTableAggregationSingle.getPayloadSize(); + } + this.payLoadContentSize = contentSize; } - return contentSize; + return this.payLoadContentSize; } /* diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObFTSParams.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObFTSParams.java index 854074e3..5f380f31 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObFTSParams.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObFTSParams.java @@ -17,6 +17,7 @@ package com.alipay.oceanbase.rpc.table; +import com.alipay.oceanbase.rpc.util.ObByteBuf; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; @@ -47,6 +48,11 @@ public byte[] encode() { return bytes; } + public void encode(ObByteBuf buf) { + buf.writeByte((byte)pType.ordinal()); + Serialization.encodeVString(buf, searchText); + } + public Object decode(ByteBuf buf) { // pType is read by ObKVParams this.searchText = Serialization.decodeVString(buf); diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java index 22361eb7..e105aad4 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java @@ -17,6 +17,7 @@ package com.alipay.oceanbase.rpc.table; +import com.alipay.oceanbase.rpc.util.ObByteBuf; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; @@ -111,21 +112,35 @@ public byte[] encode() { byte[] b = new byte[] { (byte) pType.ordinal() }; System.arraycopy(b, 0, bytes, idx, 1); idx += 1; - System.arraycopy(Serialization.encodeVi32(caching), 0, bytes, idx, - Serialization.getNeedBytes(caching)); - idx += Serialization.getNeedBytes(caching); - System.arraycopy(Serialization.encodeVi32(callTimeout), 0, bytes, idx, - Serialization.getNeedBytes(callTimeout)); - idx += Serialization.getNeedBytes(callTimeout); + byte[] tmpBytes = Serialization.encodeVi32(caching); + System.arraycopy(tmpBytes, 0, bytes, idx, + tmpBytes.length); + idx += tmpBytes.length; + tmpBytes = Serialization.encodeVi32(callTimeout); + System.arraycopy(tmpBytes, 0, bytes, idx, tmpBytes.length); + idx += tmpBytes.length; System.arraycopy(booleansToByteArray(), 0, bytes, idx, 1); idx += 1; - System.arraycopy(Serialization.encodeVString(hbaseVersion), 0, bytes, idx, - Serialization.getNeedBytes(hbaseVersion)); - idx += Serialization.getNeedBytes(hbaseVersion); + tmpBytes = Serialization.encodeVString(hbaseVersion); + System.arraycopy(tmpBytes, 0, bytes, idx, + tmpBytes.length); + idx += tmpBytes.length; return bytes; } + public void encode(ObByteBuf buf) { + buf.writeByte((byte) pType.ordinal()); + + Serialization.encodeVi32(buf, caching); + + Serialization.encodeVi32(buf, callTimeout); + + buf.writeBytes(booleansToByteArray()); + + Serialization.encodeVString(buf, hbaseVersion); + } + public void byteArrayToBooleans(ByteBuf bytes) { byte b = bytes.readByte(); allowPartialResults = (b & FLAG_ALLOW_PARTIAL_RESULTS) != 0; diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObKVParams.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObKVParams.java index f5964a41..2ece8868 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObKVParams.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObKVParams.java @@ -18,6 +18,7 @@ package com.alipay.oceanbase.rpc.table; import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; +import com.alipay.oceanbase.rpc.util.ObByteBuf; import io.netty.buffer.ByteBuf; import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader; @@ -54,17 +55,24 @@ public byte[] encode() { int idx = 0; // 0. encode header - int headerLen = (int) getObUniVersionHeaderLength(getVersion(), getPayloadContentSize()); - System.arraycopy(encodeObUniVersionHeader(getVersion(), getPayloadContentSize()), 0, bytes, - idx, headerLen); - idx += headerLen; + byte[] headerBytes = encodeObUniVersionHeader(getVersion(), getPayloadContentSize()); + System.arraycopy(headerBytes, 0, bytes, + idx, headerBytes.length); + idx += headerBytes.length; - int len = (int) obKVParamsBase.getPayloadContentSize(); - System.arraycopy(obKVParamsBase.encode(), 0, bytes, idx, len); + byte[] obKVParamsBaseBytes = obKVParamsBase.encode(); + System.arraycopy(obKVParamsBaseBytes, 0, bytes, idx, obKVParamsBaseBytes.length); return bytes; } + public void encode(ObByteBuf buf) { + // 0. encode header + encodeObUniVersionHeader(buf, getVersion(), getPayloadContentSize()); + + obKVParamsBase.encode(buf); + } + public Object decode(ByteBuf buf) { super.decode(buf); byte b = buf.readByte(); @@ -76,6 +84,9 @@ public Object decode(ByteBuf buf) { @Override public long getPayloadContentSize() { + if (this.payLoadContentSize == -1) { + this.payLoadContentSize = obKVParamsBase.getPayloadContentSize(); + } return obKVParamsBase.getPayloadContentSize(); } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObKVParamsBase.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObKVParamsBase.java index 453a2939..cbf856e4 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObKVParamsBase.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObKVParamsBase.java @@ -18,6 +18,7 @@ package com.alipay.oceanbase.rpc.table; import com.alipay.oceanbase.rpc.direct_load.protocol.payload.ObTableLoadClientStatus; +import com.alipay.oceanbase.rpc.util.ObByteBuf; import io.netty.buffer.ByteBuf; import java.util.HashMap; @@ -55,6 +56,8 @@ public byte[] encode() { return null; } + public void encode(ObByteBuf buf) { } + public Object decode(ByteBuf buf) { return null; } diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java index e7a588e4..cd275e0a 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java @@ -194,8 +194,7 @@ public void addOperation(TableQuery query) throws Exception { } Object[] rowKey = query.getRowKey().getValues(); String[] propertiesNames = query.getSelectColumns().toArray(new String[0]); - ObTableSingleOpEntity entity = ObTableSingleOpEntity.getInstance(rowKeyNames, rowKey, - propertiesNames, null); + ObTableSingleOpEntity entity = null; if (propertiesNames.length == 0) { needAllProp = true; } @@ -209,7 +208,14 @@ public void addOperation(TableQuery query) throws Exception { obTableQuery.getObKVParams(), obTableQuery.getFilterString()); singleOp.setQuery(singleOpQuery); singleOp.setSingleOpType(ObTableOperationType.SCAN); + entity = new ObTableSingleOpEntity(); + entity.setHbase(true); + entity.addRowKeyValue("K", obTableQuery.getKeyRanges().get(0).getStartKey().getObj(0)); + entity.addRowKeyValue("Q", ObObj.getNullObject()); + entity.addRowKeyValue("T", ObObj.getNullObject()); } else { + entity = ObTableSingleOpEntity.getInstance(rowKeyNames, rowKey, + propertiesNames, null); singleOp.setSingleOpType(ObTableOperationType.GET); } singleOp.addEntity(entity); @@ -286,6 +292,7 @@ public void addOperation(Mutation mutation) throws Exception { ObTableSingleOpEntity entity = ObTableSingleOpEntity.getInstance(rowKeyNames, rowKeyValues, propertiesNames, propertiesValues); + entity.setHbase(entityType == ObTableEntityType.HKV); ObTableSingleOp singleOp = new ObTableSingleOp(); singleOp.setSingleOpType(type); singleOp.addEntity(entity); @@ -435,6 +442,7 @@ public void partitionExecute(ObTableSingleOpResult[] results, ObTableLSOpRequest tableLsOpRequest = new ObTableLSOpRequest(); ObTableLSOperation tableLsOp = new ObTableLSOperation(); + tableLsOp.setHbase(entityType == ObTableEntityType.HKV); tableLsOp.setLsId(lsId); tableLsOp.setReturnOneResult(returnOneResult); tableLsOp.setNeedAllProp(needAllProp); diff --git a/src/main/java/com/alipay/oceanbase/rpc/util/Serialization.java b/src/main/java/com/alipay/oceanbase/rpc/util/Serialization.java index df9ff66e..5e9ef5d4 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/util/Serialization.java +++ b/src/main/java/com/alipay/oceanbase/rpc/util/Serialization.java @@ -441,15 +441,14 @@ public static void encodeFloat(ObByteBuf buf, float f) { * @return bytes need for serialize long data */ public static int getNeedBytes(long l) { - if (l < 0) + if (l < 0) { return 10; - int needBytes = 0; - for (long max : OB_MAX) { - needBytes++; - if (l <= max) - break; } - return needBytes; + if (l == 0 || l == 1 /* opt for version header */) { + return 1; + } + // 计算有效位数,然后除以7向上取整 + return (Long.SIZE - Long.numberOfLeadingZeros(l) + 6) / 7; } /** @@ -458,15 +457,14 @@ public static int getNeedBytes(long l) { * @return bytes need for serialize int data */ public static int getNeedBytes(int l) { - if (l < 0) + if (l < 0) { return 5; - int needBytes = 0; - for (long max : OB_MAX) { - needBytes++; - if (l <= max) - break; } - return needBytes; + if (l == 0 || l == 1 /* opt for version header */) { + return 1; + } + // 计算有效位数,然后除以7向上取整 + return (Integer.SIZE - Integer.numberOfLeadingZeros(l) + 6) / 7; } /** @@ -571,6 +569,14 @@ public static byte[] encodeVi32(int i) { return ret; } + public static void encodeVi32(int i, ByteBuf buf) { + while (i < 0 || i > OB_MAX_V1B) { + buf.writeByte((byte) (i | 0x80)); + i >>>= 7; + } + buf.writeByte((byte) (i & 0x7f)); + } + /** * Encode vi32. * @param buf encoded buf @@ -706,6 +712,9 @@ public static byte[] encodeVString(String str) { * @param str input data */ public static void encodeVString(ObByteBuf buf, String str) { + if (str == null) { + str = ""; + } encodeVString(buf, str, StandardCharsets.UTF_8); } @@ -850,15 +859,13 @@ public static byte[] encodeVString(String str, Charset charset) { str = ""; byte[] data = str.getBytes(charset); int dataLen = data.length; - int strLen = getNeedBytes(dataLen); - byte[] ret = new byte[strLen + dataLen + 1]; + byte[] dataLenBytes = encodeVi32(dataLen); + byte[] ret = new byte[dataLenBytes.length + dataLen + 1]; int index = 0; - for (byte b : encodeVi32(dataLen)) { - ret[index++] = b; - } - for (byte b : data) { - ret[index++] = b; - } + System.arraycopy(dataLenBytes, 0, ret, index, dataLenBytes.length); + index += dataLenBytes.length; + System.arraycopy(data, 0, ret, index, dataLen); + index += dataLen; ret[index] = 0; return ret; } @@ -1010,14 +1017,15 @@ public static long getObUniVersionHeaderLength(long version, long payloadLen) { * @return output data buffer */ public static byte[] encodeObUniVersionHeader(long version, long payloadLen) { - byte[] bytes = new byte[(int) getObUniVersionHeaderLength(version, payloadLen)]; + byte[] versionBytes = Serialization.encodeVi64(version); + byte[] payloadBytes = Serialization.encodeVi64(payloadLen); + byte[] bytes = new byte[versionBytes.length + payloadBytes.length]; int idx = 0; - - int len = Serialization.getNeedBytes(version); - System.arraycopy(Serialization.encodeVi64(version), 0, bytes, idx, len); + int len = versionBytes.length; + System.arraycopy(versionBytes, 0, bytes, idx, len); idx += len; - len = Serialization.getNeedBytes(payloadLen); - System.arraycopy(Serialization.encodeVi64(payloadLen), 0, bytes, idx, len); + len = payloadBytes.length; + System.arraycopy(payloadBytes, 0, bytes, idx, len); return bytes; } diff --git a/src/test/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLsOperationRequestTest.java b/src/test/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLsOperationRequestTest.java new file mode 100644 index 00000000..87b0af6d --- /dev/null +++ b/src/test/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLsOperationRequestTest.java @@ -0,0 +1,201 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2021 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ +package com.alipay.oceanbase.rpc.protocol.payload.impl.execute; + +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObHTableFilter; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObScanOrder; +import com.alipay.oceanbase.rpc.table.ObKVParams; +import com.alipay.oceanbase.rpc.util.ObBytesString; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.SCAN; +import static com.alipay.oceanbase.rpc.table.ObKVParamsBase.paramType.HBase; + +public class ObTableLsOperationRequestTest { + private int lsOpReqSize = 200; + private int tabletOpSize = 10; + private int singleOpSize = 100; + private static final Random random = new Random(); + + @Test + public void testEncodePerformance() { + ObTableLSOpRequest[] lsReq = new ObTableLSOpRequest[lsOpReqSize]; + for (int i = 0; i < lsOpReqSize; i++) { + lsReq[i] = buildLsOp(); + } + long startTime = System.nanoTime(); + for (int i = 0; i < lsOpReqSize; i++) { + lsReq[i].encode(); + } + long endTime = System.nanoTime(); + long duration = endTime - startTime; + System.out.println("each request encode took: " + (duration / lsOpReqSize) +" nanoseconds in average"); + } + private ObTableLSOpRequest buildLsOp() { + ObTableLSOpRequest lsOpReq = new ObTableLSOpRequest(); + lsOpReq.setCredential(new ObBytesString(generateRandomString(100).getBytes())); + lsOpReq.setConsistencyLevel(ObTableConsistencyLevel.EVENTUAL); + ObTableLSOperation lsOp = new ObTableLSOperation(); + lsOp.setLsId(1001); + lsOp.setTableName(generateRandomString(10)); + lsOpReq.setLsOperation(lsOp); + lsOpReq.setTableId(50001); + lsOpReq.setEntityType(ObTableEntityType.HKV); + lsOpReq.setTimeout(10000); + lsOp.setNeedAllProp(true); + lsOp.setReturnOneResult(false); + for (int i = 0; i < tabletOpSize; i++) { + lsOp.addTabletOperation(buildTabletOp()); + } + lsOp.prepare(); + return lsOpReq; + } + + private ObTableTabletOp buildTabletOp() { + ObTableTabletOp tabletOp = new ObTableTabletOp(); + tabletOp.setTabletId(random.nextLong()); + List singleOperations = new ArrayList<>(); + for (int i = 0; i < singleOpSize; i++) { + singleOperations.add(budilSingleOp()); + } + tabletOp.setSingleOperations(singleOperations); + tabletOp.setIsSameType(random.nextBoolean()); + tabletOp.setIsSamePropertiesNames(random.nextBoolean()); + tabletOp.setIsReadOnly(random.nextBoolean()); + return tabletOp; + } + + private ObTableSingleOp budilSingleOp() { + ObTableSingleOp singleOp = new ObTableSingleOp(); + singleOp.setSingleOpType(SCAN); + ObTableSingleOpQuery query = buildSingleOpQuery(); + singleOp.setQuery(query); + List entities = new ArrayList<>(); + entities.add(buildSingleOpEntity()); + singleOp.setEntities(entities); + singleOp.setIsCheckNoExists(random.nextBoolean()); + singleOp.setIsRollbackWhenCheckFailed(random.nextBoolean()); + return singleOp; + } + + private ObTableSingleOpQuery buildSingleOpQuery() { + String indexName = generateRandomString(10); + + List keyRanges = new ArrayList<>(); + keyRanges.add(buildRandomRange()); + + List selectColumns = new ArrayList<>(); + selectColumns.add("K"); + selectColumns.add("Q"); + selectColumns.add("T"); + selectColumns.add("V"); + + ObScanOrder scanOrder = ObScanOrder.Forward; + + boolean isHbaseQuery = random.nextBoolean(); + + ObHTableFilter obHTableFilter = new ObHTableFilter(); + ObKVParams obKVParams = new ObKVParams(); + obKVParams.setObParamsBase(obKVParams.getObParams(HBase)); + + String filterString = generateRandomString(20); + + return ObTableSingleOpQuery.getInstance( + indexName, + keyRanges, + selectColumns, + scanOrder, + isHbaseQuery, + obHTableFilter, + obKVParams, + filterString + ); + } + + private ObTableSingleOpEntity buildSingleOpEntity() { + String[] rowKeyNames = {"K", "Q", "T"}; + Object[] rowKey = { + generateRandomString(10), + generateRandomString(10), + generateRandomString(10) + }; + + String[] propertiesNames = {"V"}; + Object[] propertiesValues = { generateRandomString(20) }; + + return ObTableSingleOpEntity.getInstance( + rowKeyNames, + rowKey, + propertiesNames, + propertiesValues + ); + } + + public static String generateRandomString(int length) { + Random random = new Random(); + StringBuilder sb = new StringBuilder(length); + + for (int i = 0; i < length; i++) { + // 生成一个随机字符(ASCII 码范围:32-126,包含字母、数字和符号) + char randomChar = (char) (random.nextInt(95) + 32); + sb.append(randomChar); + } + + return sb.toString(); + } + + private static ObNewRange buildRandomRange() { + ObNewRange range = new ObNewRange(); + + List startKey = new ArrayList<>(); + startKey.add(ObObj.getInstance(generateRandomString(10))); + startKey.add(ObObj.getInstance(generateRandomString(10))); + startKey.add(ObObj.getInstance(generateRandomString(10))); + + List endKey = new ArrayList<>(); + endKey.add(ObObj.getInstance(generateRandomString(10))); + endKey.add(ObObj.getInstance(generateRandomString(10))); + endKey.add(ObObj.getInstance(generateRandomString(10))); + + ObRowKey startRk = new ObRowKey(); + startRk.setObjs(startKey); + range.setStartKey(startRk); + ObRowKey endRk = new ObRowKey(); + endRk.setObjs(endKey); + range.setEndKey(endRk); + + return range; + } + +// @Test +// public void testOHTableFilter() { +// ObHTableFilter hTableFilter = new ObHTableFilter(); +// hTableFilter.setMaxVersions(1); +// byte[] htableBytes = hTableFilter.encode(); +// for (int i = 0; i < htableBytes.length; i++) { +// System.out.println(htableBytes[i]); +// } +// System.out.println(htableBytes.length); +// } +}