Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.alipay.oceanbase.rpc.protocol.payload;

import com.alipay.oceanbase.rpc.util.ObByteBuf;
import com.alipay.oceanbase.rpc.util.Serialization;
import io.netty.buffer.ByteBuf;

Expand Down Expand Up @@ -44,6 +45,7 @@ public abstract class AbstractPayload implements ObPayload {
private long version = 1;
protected long timeout = RPC_OPERATION_TIMEOUT.getDefaultLong();
protected int groupId = 0;
protected long payLoadContentSize = -1;

/*
* Get pcode.
Expand Down Expand Up @@ -176,11 +178,15 @@ public void setSequence(long sequence) {
* encode unis header
*/
protected int encodeHeader(byte[] bytes, int idx) {
int headerLen = (int) getObUniVersionHeaderLength(getVersion(), getPayloadContentSize());
System.arraycopy(encodeObUniVersionHeader(getVersion(), getPayloadContentSize()), 0, bytes,
idx, headerLen);
idx += headerLen;
byte[] versionHeaderBytes = encodeObUniVersionHeader(getVersion(), getPayloadContentSize());
System.arraycopy(versionHeaderBytes, 0, bytes,
idx, versionHeaderBytes.length);
idx += versionHeaderBytes.length;
return idx;
}

protected void encodeHeader(ObByteBuf buf) {
encodeObUniVersionHeader(buf, getVersion(), getPayloadContentSize());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,30 @@
import com.alipay.oceanbase.rpc.util.ObVString;
import io.netty.buffer.ByteBuf;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class ObObj implements ObSimplePayload {

private static ObObj MAX_OBJECT;
private static ObObj MIN_OBJECT;
private static ObObj NULL_OBJECT;
private static long MAX_OBJECT_VALUE = -2L;
private static long MIN_OBJECT_VALUE = -3L;

static {
MAX_OBJECT = new ObObj(ObObjType.ObExtendType.getDefaultObjMeta(), MAX_OBJECT_VALUE);
MIN_OBJECT = new ObObj(ObObjType.ObExtendType.getDefaultObjMeta(), MIN_OBJECT_VALUE);
NULL_OBJECT = new ObObj(ObObjType.ObNullType.getDefaultObjMeta(), null);
}
private static final int OBJ_POOL_SIZE = 100000;
private static final AtomicInteger CURRENT_INDEX = new AtomicInteger(0);
private static final ConcurrentLinkedQueue<ObObj> OBJ_POOL = new ConcurrentLinkedQueue<>();
static {
// 初始化对象池
for (int i = 0; i < OBJ_POOL_SIZE; i++) {
OBJ_POOL.offer(new ObObj());
}
}

/*
Expand Down Expand Up @@ -158,14 +172,33 @@ public void setValue(Object value) {
* Get instance.
*/
public static ObObj getInstance(Object value) {
ObObjMeta meta = ObObjType.defaultObjMeta(value);
ObObjType type = ObObjType.valueOfType(value);
ObObjMeta meta = null;
if (type == ObObjType.ObVarcharType) {
meta = ObObjMetaPool.varchrObjMeta;
} else {
meta = ObObjType.defaultObjMeta(value);
}
ObObj obj = OBJ_POOL.poll();
if (obj == null) {
// 如果池为空,创建新对象
obj = new ObObj();
}
// 初始化对象
obj.setMeta(meta);
if (value instanceof ObObj) {
return new ObObj(meta, ((ObObj) value).getValue());
obj.setValue(((ObObj) value).getValue());
} else {
return new ObObj(meta, value);
obj.setValue(value);
}
return obj;
}

public static ObObj getNullObject() {
return NULL_OBJECT;
}


/*
* Get max.
*/
Expand Down
Loading