Skip to content

Commit 1cb9927

Browse files
maochongxinGroundWushenyunlongWeiXinChanJackShi148
authored
feat: HBase Compatibility Phase 3 - Add Meta Executor Support for HBase Table Operations (#353)
* MetaExecutor interface definition and demo (#337) * lsop encode opt (#339) * opt for lsop getPayLoadContentSize * add performa test for ObTableLsOperationRequest encode * use ObByteBuf to encode lsop * lsop request encode perf opt * add default bytes cache for obkvparam and ohtablefilter * Revert "add default bytes cache for obkvparam and ohtablefilter" This reverts commit 8a555ef. * add default bytes cache for obkvparam and ohtablefilter * fix bug for indexName encode * adapt for disable/enable table (#340) * support hot only (#341) Co-authored-by: WeiXinChan <chenwx6728@163.com> * Fix TableQuery cannot refresh tablet location (#345) * add debug log * refresh tablet location for atomic query * throw the same type original Exception when login (#346) * update netty to 4.1.118.Final to avoid security problem (#349) * Replace Fastjson with Jackson (#347) * change fastjson to jackson * use jackson 2.14.2, compile pass * succeed to print log * update jackson and maven-shade-plugin to the lateset version * revert OcpResponseData constructors * diable reduce pom dependencies * [OBKV] add hbase admin ddl errcode --------- Co-authored-by: GroundWu <1175416256@qq.com> Co-authored-by: Shen Yunlong <44497386+shenyunlong@users.noreply.github.com> Co-authored-by: WeiXinChan <chenwx6728@163.com> Co-authored-by: Ziyu Shi <57038180+JackShi148@users.noreply.github.com> Co-authored-by: shenyunlong.syl <shenyunlong.syl@antgroup.com>
1 parent 4697300 commit 1cb9927

37 files changed

+1788
-343
lines changed

pom.xml

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@
9797
<dependency>
9898
<groupId>io.netty</groupId>
9999
<artifactId>netty-all</artifactId>
100-
<version>4.1.100.Final</version>
100+
<version>4.1.118.Final</version>
101101
</dependency>
102102

103103
<dependency>
@@ -112,9 +112,9 @@
112112
</dependency>
113113

114114
<dependency>
115-
<groupId>com.alibaba</groupId>
116-
<artifactId>fastjson</artifactId>
117-
<version>1.2.83</version>
115+
<groupId>com.fasterxml.jackson.core</groupId>
116+
<artifactId>jackson-databind</artifactId>
117+
<version>2.19.0</version>
118118
</dependency>
119119

120120
<dependency>
@@ -164,6 +164,10 @@
164164
<artifactId>visible-assertions</artifactId>
165165
<groupId>org.rnorth.visible-assertions</groupId>
166166
</exclusion>
167+
<exclusion>
168+
<artifactId>jackson-annotations</artifactId>
169+
<groupId>com.fasterxml.jackson.core</groupId>
170+
</exclusion>
167171
</exclusions>
168172
</dependency>
169173
<dependency>
@@ -388,14 +392,15 @@
388392
</plugin>
389393
<plugin>
390394
<artifactId>maven-shade-plugin</artifactId>
391-
<version>3.2.4</version>
395+
<version>3.6.0</version>
392396
<executions>
393397
<execution>
394398
<phase>package</phase>
395399
<goals>
396400
<goal>shade</goal>
397401
</goals>
398402
<configuration>
403+
<createDependencyReducedPom>false</createDependencyReducedPom>
399404
<shadedArtifactAttached>true</shadedArtifactAttached>
400405
<shadedClassifierName>shade</shadedClassifierName>
401406
<artifactSet>

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import com.alipay.remoting.util.StringUtils;
4949
import org.slf4j.Logger;
5050

51+
import java.lang.reflect.Array;
5152
import java.util.*;
5253
import java.util.concurrent.ConcurrentHashMap;
5354
import java.util.concurrent.TimeUnit;
@@ -143,7 +144,11 @@ public void init() throws Exception {
143144
} catch (Throwable t) {
144145
BOOT.warn("failed to init ObTableClient", t);
145146
RUNTIME.warn("failed to init ObTableClient", t);
146-
throw new RuntimeException(t);
147+
if (t instanceof ObTableException) {
148+
throw t;
149+
} else {
150+
throw new RuntimeException(t);
151+
}
147152
} finally {
148153
BOOT.info("init ObTableClient successfully");
149154
statusLock.unlock();
@@ -476,6 +481,7 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
476481
tableParam = new ObTableParam(odpTable);
477482
} else {
478483
if (tryTimes > 1 && needRefreshPartitionLocation) {
484+
needRefreshPartitionLocation = false;
479485
// refresh partition location
480486
TableEntry entry = tableRoute.getTableEntry(tableName);
481487
long partId = tableRoute.getPartId(entry, rowKey);
@@ -484,8 +490,8 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
484490
}
485491
tableParam = getTableParamWithRoute(tableName, rowKey, route);
486492
}
487-
logger.debug("tableName: {}, tableParam obTable ip:port is {}:{}",
488-
tableName, tableParam.getObTable().getIp(), tableParam.getObTable().getPort());
493+
logger.debug("tableName: {}, tableParam obTable ip:port is {}:{}, ls_id: {}, tablet_id: {}",
494+
tableName, tableParam.getObTable().getIp(), tableParam.getObTable().getPort(), tableParam.getLsId(), tableParam.getTabletId());
489495
T t = callback.execute(tableParam);
490496
resetExecuteContinuousFailureCount(tableName);
491497
return t;
@@ -686,6 +692,7 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
686692
} else {
687693
if (null != callback.getRowKey()) {
688694
if (tryTimes > 1 && needRefreshPartitionLocation) {
695+
needRefreshPartitionLocation = false;
689696
// refresh partition location
690697
TableEntry entry = tableRoute.getTableEntry(tableName);
691698
long partId = tableRoute.getPartId(entry, callback.getRowKey());
@@ -695,6 +702,11 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
695702
// using row key
696703
tableParam = tableRoute.getTableParamWithRoute(tableName, callback.getRowKey(), route);
697704
} else if (null != callback.getQuery()) {
705+
if (tryTimes > 1 && needRefreshPartitionLocation) {
706+
needRefreshPartitionLocation = false;
707+
boolean isHKV = callback.getQuery().getEntityType() == ObTableEntityType.HKV;
708+
tableRoute.refreshTabletLocationForAtomicQuery(tableName, callback.getQuery().getObTableQuery(), isHKV);
709+
}
698710
ObTableQuery tableQuery = callback.getQuery().getObTableQuery();
699711
// using scan range
700712
tableParam = tableRoute.getTableParam(tableName, tableQuery.getScanRangeColumns(),
@@ -703,8 +715,8 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
703715
throw new ObTableException("RowKey or scan range is null");
704716
}
705717
}
706-
logger.debug("tableName: {}, tableParam obTable ip:port is {}:{}",
707-
tableName, tableParam.getObTable().getIp(), tableParam.getObTable().getPort());
718+
logger.debug("tableName: {}, tableParam obTable ip:port is {}:{}, ls_id: {}, tablet_id: {}",
719+
tableName, tableParam.getObTable().getIp(), tableParam.getObTable().getPort(), tableParam.getLsId(), tableParam.getTabletId());
708720
T t = callback.execute(tableParam);
709721
resetExecuteContinuousFailureCount(tableName);
710722
return t;
@@ -1112,6 +1124,17 @@ public ObTable getTable(ObTableApiMove moveResponse) throws Exception {
11121124
// If the node address does not exist, a new table is created
11131125
return addTable(addr);
11141126
}
1127+
1128+
public ObTable getRandomTable() {
1129+
ObTable anyTable;
1130+
if (odpMode) {
1131+
anyTable = tableRoute.getOdpTable();
1132+
} else {
1133+
ConcurrentHashMap<ObServerAddr, ObTable> tableRoster = tableRoute.getTableRoster().getTables();
1134+
anyTable = tableRoster.values().stream().findAny().orElse(null);
1135+
}
1136+
return anyTable;
1137+
}
11151138

11161139
public ObTable addTable(ObServerAddr addr){
11171140

@@ -2248,8 +2271,8 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
22482271
return getOdpTable().execute(request);
22492272
} else {
22502273
int tryTimes = 0;
2274+
boolean needRefreshTabletLocation = false;
22512275
long startExecute = System.currentTimeMillis();
2252-
Map<Long, ObTableParam> partIdMapObTable = new HashMap<Long, ObTableParam>();
22532276
while (true) {
22542277
long currentExecute = System.currentTimeMillis();
22552278
long costMillis = currentExecute - startExecute;
@@ -2266,40 +2289,14 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
22662289
}
22672290
try {
22682291
// Recalculate partIdMapObTable
2269-
// Clear the map before recalculating
2270-
partIdMapObTable.clear();
2271-
for (ObNewRange rang : tableQuery.getKeyRanges()) {
2272-
ObRowKey startKey = rang.getStartKey();
2273-
int startKeySize = startKey.getObjs().size();
2274-
ObRowKey endKey = rang.getEndKey();
2275-
int endKeySize = endKey.getObjs().size();
2276-
Object[] start = new Object[startKeySize];
2277-
Object[] end = new Object[endKeySize];
2278-
for (int i = 0; i < startKeySize; i++) {
2279-
ObObj curStart = startKey.getObj(i);
2280-
if (curStart.isMinObj()) {
2281-
start[i] = curStart;
2282-
} else {
2283-
start[i] = curStart.getValue();
2284-
}
2285-
}
2286-
2287-
for (int i = 0; i < endKeySize; i++) {
2288-
ObObj curEnd = endKey.getObj(i);
2289-
if (curEnd.isMaxObj()) {
2290-
end[i] = curEnd;
2291-
} else {
2292-
end[i] = curEnd.getValue();
2293-
}
2294-
}
2295-
ObBorderFlag borderFlag = rang.getBorderFlag();
2296-
List<ObTableParam> params = getTableParams(request.getTableName(),
2297-
tableQuery, start, borderFlag.isInclusiveStart(), end,
2298-
borderFlag.isInclusiveEnd());
2299-
for (ObTableParam param : params) {
2300-
partIdMapObTable.put(param.getPartId(), param);
2301-
}
2292+
if (needRefreshTabletLocation) {
2293+
needRefreshTabletLocation = false;
2294+
boolean isHKV = request.getEntityType() == ObTableEntityType.HKV;
2295+
tableRoute.refreshTabletLocationForAtomicQuery(request.getTableName(), tableQuery, isHKV);
23022296
}
2297+
Map<Long, ObTableParam> partIdMapObTable = tableRoute.getPartIdParamMapForQuery(
2298+
request.getTableName(), tableQuery.getScanRangeColumns(),
2299+
tableQuery.getKeyRanges());
23032300

23042301
// Check if partIdMapObTable size is greater than 1
23052302
boolean isDistributedExecuteSupported = getServerCapacity().isSupportDistributedExecute();
@@ -2332,9 +2329,12 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
23322329
request.getTableName(), request.getPartitionId(), ((ObTableException) ex).getErrorCode(),
23332330
tryTimes, ex);
23342331

2335-
if (ex instanceof ObTableNeedFetchMetaException) {
2336-
// Refresh table info
2337-
refreshMeta(request.getTableName());
2332+
if (((ObTableException) ex).isNeedRefreshTableEntry()) {
2333+
needRefreshTabletLocation = true;
2334+
if (ex instanceof ObTableNeedFetchMetaException) {
2335+
// Refresh table info
2336+
refreshMeta(request.getTableName());
2337+
}
23382338
}
23392339
} else {
23402340
calculateContinuousFailure(request.getTableName(), ex.getMessage());

src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package com.alipay.oceanbase.rpc.bolt.protocol;
1919

2020
import com.alipay.oceanbase.rpc.exception.ObTableRoutingWrongException;
21+
import com.alipay.oceanbase.rpc.meta.ObTableMetaRequest;
22+
import com.alipay.oceanbase.rpc.meta.ObTableMetaResponse;
2123
import com.alipay.oceanbase.rpc.protocol.packet.ObRpcPacketHeader;
2224
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
2325
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
@@ -33,6 +35,8 @@
3335
import com.alipay.oceanbase.rpc.protocol.payload.impl.login.ObTableLoginResult;
3436
import com.alipay.remoting.CommandCode;
3537

38+
import static com.alipay.oceanbase.rpc.protocol.payload.Pcodes.OB_TABLE_API_META_INFO_EXECUTE;
39+
3640
public enum ObTablePacketCode implements CommandCode {
3741

3842
OB_TABLE_API_LOGIN(Pcodes.OB_TABLE_API_LOGIN) {
@@ -133,6 +137,12 @@ public ObPayload newPayload(ObRpcPacketHeader header) {
133137
public ObPayload newPayload(ObRpcPacketHeader header) {
134138
throw new IllegalArgumentException("OB_ERROR_PACKET has no payload implementation");
135139
}
140+
},
141+
OB_TABLE_META_INFO_EXECUTE(Pcodes.OB_TABLE_API_META_INFO_EXECUTE) {
142+
@Override
143+
public ObPayload newPayload(ObRpcPacketHeader header) {
144+
return new ObTableMetaResponse();
145+
}
136146
};
137147

138148
private short value;
@@ -175,6 +185,8 @@ public static ObTablePacketCode valueOf(short value) {
175185
return OB_TABLE_API_MOVE;
176186
case Pcodes.OB_ERROR_PACKET:
177187
return OB_ERROR_PACKET;
188+
case OB_TABLE_API_META_INFO_EXECUTE:
189+
return OB_TABLE_META_INFO_EXECUTE;
178190
}
179191
throw new IllegalArgumentException("Unknown Rpc command code value ," + value);
180192
}

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.alipay.oceanbase.rpc.bolt.transport;
1919

20-
import com.alibaba.fastjson.JSONObject;
2120
import com.alipay.oceanbase.rpc.ObGlobal;
2221
import com.alipay.oceanbase.rpc.exception.*;
2322
import com.alipay.oceanbase.rpc.location.LocationUtil;
@@ -26,6 +25,7 @@
2625
import com.alipay.oceanbase.rpc.table.ObTable;
2726
import com.alipay.oceanbase.rpc.util.*;
2827
import com.alipay.remoting.Connection;
28+
import com.fasterxml.jackson.databind.ObjectMapper;
2929
import org.slf4j.Logger;
3030

3131
import java.net.ConnectException;
@@ -40,6 +40,7 @@ public class ObTableConnection {
4040

4141
private static final Logger LOGGER = TableClientLoggerFactory
4242
.getLogger(ObTableConnection.class);
43+
private static ObjectMapper objectMapper = new ObjectMapper();
4344
private ObBytesString credential;
4445
private long tenantId = 1; //默认值切勿不要随意改动
4546
private Connection connection;
@@ -154,8 +155,8 @@ private void login() throws Exception {
154155
// When the caller doesn't provide any parameters, configsMap is empty.
155156
// In this case, we don't generate any JSON to avoid creating an empty object.
156157
if (loginWithConfigs && !obTable.getConfigs().isEmpty()) {
157-
JSONObject json = new JSONObject(obTable.getConfigs());
158-
request.setConfigsStr(json.toJSONString());
158+
String configStr = objectMapper.writeValueAsString(obTable.getConfigs());
159+
request.setConfigsStr(configStr);
159160
loginWithConfigs = false;
160161
}
161162
generatePassSecret(request);

0 commit comments

Comments
 (0)