From 3d13dfc8dbc5214bcec93058a89623e952870e7f Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Fri, 13 Sep 2024 11:36:34 +0800 Subject: [PATCH 1/8] add pool parameter to set in batch operation --- .../impl/execute/ObTableBatchOperationRequest.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperationRequest.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperationRequest.java index 25eeca53..1d12f0b4 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperationRequest.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperationRequest.java @@ -22,6 +22,8 @@ import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; +import java.util.concurrent.ExecutorService; + /* * OB_SERIALIZE_MEMBER(ObTableBatchOperationRequest, @@ -43,6 +45,7 @@ public class ObTableBatchOperationRequest extends ObTableAbstractOperationReques private ObTableBatchOperation batchOperation; private boolean batchOperationAsAtomic = false; + private ExecutorService pool = null; /* * Get pcode. @@ -169,4 +172,12 @@ public void setBatchOpReturnOneResult(boolean returnOneResult) { this.option_flag.setReturnOneResult(false); } } + + public void setPool(ExecutorService pool) { + this.pool = pool; + } + + public ExecutorService getPool() { + return this.pool; + } } From 89b8e848396c1fc16c87436ac7d12882011ecfe6 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Fri, 13 Sep 2024 11:53:06 +0800 Subject: [PATCH 2/8] ObTableClient getPool when execute Hbase batch request --- src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index 50137e6f..a365716f 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -2995,7 +2995,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E request.getTableName(), ((ObTableBatchOperationRequest) request).getBatchOperation(), this); batchOps.setEntityType(request.getEntityType()); - return new ObClusterTableBatchOps(batchOps).executeInternal(); + return new ObClusterTableBatchOps(((ObTableBatchOperationRequest) request).getPool(), batchOps).executeInternal(); } else if (request instanceof ObTableQueryAndMutateRequest) { ObTableQueryAndMutate tableQueryAndMutate = ((ObTableQueryAndMutateRequest) request) .getTableQueryAndMutate(); From c39acf9e0c06557482c5ea97dd6f2ad2fc1af2c8 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Wed, 18 Sep 2024 11:41:57 +0800 Subject: [PATCH 3/8] rebase from obkv_params2 --- .../oceanbase/rpc/ObClusterTableBatchOps.java | 9 +- .../com/alipay/oceanbase/rpc/ObGlobal.java | 3 +- .../alipay/oceanbase/rpc/ObTableClient.java | 2 +- .../rpc/bolt/transport/ObTableConnection.java | 17 +-- .../rpc/bolt/transport/ObTableRemoting.java | 45 +++--- .../rpc/mutation/BatchOperation.java | 9 +- .../rpc/protocol/payload/ResultCodes.java | 3 +- .../execute/ObTableBatchOperationRequest.java | 9 -- .../query/AbstractQueryStreamResult.java | 4 +- .../impl/login/ObTableLoginRequest.java | 2 +- .../oceanbase/rpc/table/AbstractObTable.java | 6 +- .../rpc/table/AbstractObTableClient.java | 3 +- .../rpc/table/ObTableClientBatchOpsImpl.java | 3 +- .../rpc/ObAtomicBatchOperationTest.java | 10 +- .../oceanbase/rpc/ObTableAuditTest.java | 78 ++++------ .../rpc/ObTableClientAutoIncTest.java | 10 +- .../oceanbase/rpc/ObTableClientInfoTest.java | 44 +++--- .../rpc/ObTableClientPartitionHashTest.java | 16 +-- .../rpc/ObTableClientPartitionKeyTest.java | 7 +- .../oceanbase/rpc/ObTableClientTest.java | 46 +++--- .../oceanbase/rpc/ObTableGroupCommitTest.java | 28 ++-- .../oceanbase/rpc/ObTableLsBatchTest.java | 12 +- .../alipay/oceanbase/rpc/ObTableModeTest.java | 75 +++++----- .../alipay/oceanbase/rpc/ObTableP99Test.java | 136 ++++++++---------- .../alipay/oceanbase/rpc/ObTableTTLTest.java | 8 +- 25 files changed, 283 insertions(+), 302 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java b/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java index 2a258b9c..ab1c048d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java @@ -193,11 +193,12 @@ void preCheck() { ObTableOperationType lastType = operations.get(0).getOperationType(); if (returnOneResult && !ObGlobal.isReturnOneResultSupport()) { throw new FeatureNotSupportedException( - "returnOneResult is not supported in this Observer version [" + ObGlobal.obVsnString() +"]"); + "returnOneResult is not supported in this Observer version [" + + ObGlobal.obVsnString() + "]"); } else if (returnOneResult - && !(this.tableBatchOps.getObTableBatchOperation().isSameType() && (lastType == ObTableOperationType.INSERT - || lastType == ObTableOperationType.PUT - || lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) { + && !(this.tableBatchOps.getObTableBatchOperation().isSameType() && (lastType == ObTableOperationType.INSERT + || lastType == ObTableOperationType.PUT + || lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) { throw new IllegalArgumentException( "returnOneResult only support multi-insert/put/replace/del"); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java b/src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java index d0877821..59b46e4b 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java @@ -87,7 +87,8 @@ public static boolean isLsOpSupport() { } public static boolean isReturnOneResultSupport() { - return OB_VERSION >= OB_VERSION_4_2_3_0 && OB_VERSION < OB_VERSION_4_3_0_0 || OB_VERSION >= OB_VERSION_4_3_4_0; + return OB_VERSION >= OB_VERSION_4_2_3_0 && OB_VERSION < OB_VERSION_4_3_0_0 + || OB_VERSION >= OB_VERSION_4_3_4_0; } public static final long OB_VERSION_4_2_1_0 = calcVersion(4, (short) 2, (byte) 1, (byte) 0); diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index a365716f..e83f919c 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -2995,7 +2995,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E request.getTableName(), ((ObTableBatchOperationRequest) request).getBatchOperation(), this); batchOps.setEntityType(request.getEntityType()); - return new ObClusterTableBatchOps(((ObTableBatchOperationRequest) request).getPool(), batchOps).executeInternal(); + return new ObClusterTableBatchOps(runtimeBatchExecutor, batchOps).executeInternal(); } else if (request instanceof ObTableQueryAndMutateRequest) { ObTableQueryAndMutate tableQueryAndMutate = ((ObTableQueryAndMutateRequest) request) .getTableQueryAndMutate(); diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java index ded4ba90..22630c51 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java @@ -38,18 +38,19 @@ public class ObTableConnection { - private static final Logger LOGGER = TableClientLoggerFactory - .getLogger(ObTableConnection.class); + private static final Logger LOGGER = TableClientLoggerFactory + .getLogger(ObTableConnection.class); private ObBytesString credential; - private long tenantId = 1; //默认值切勿不要随意改动 + private long tenantId = 1; //默认值切勿不要随意改动 private Connection connection; private final ObTable obTable; - private long uniqueId; // as trace0 in rpc header - private AtomicLong sequence; // as trace1 in rpc header - private AtomicBoolean isReConnecting = new AtomicBoolean(false); // indicate is re-connecting or not - private AtomicBoolean isExpired = new AtomicBoolean(false); + private long uniqueId; // as trace0 in rpc header + private AtomicLong sequence; // as trace1 in rpc header + private AtomicBoolean isReConnecting = new AtomicBoolean(false); // indicate is re-connecting or not + private AtomicBoolean isExpired = new AtomicBoolean(false); private LocalDateTime lastConnectionTime; private boolean loginWithConfigs = false; + public static long ipToLong(String strIp) { String[] ip = strIp.split("\\."); return (Long.parseLong(ip[0]) << 24) + (Long.parseLong(ip[1]) << 16) @@ -69,10 +70,10 @@ public void setExpired(boolean expired) { isExpired.set(expired); } - public void enableLoginWithConfigs() { loginWithConfigs = true; } + /* * Ob table connection. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java index 89ee4bcb..e3d26f0d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java @@ -124,7 +124,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques // If response indicates the request is routed to wrong server, we should refresh the routing meta. if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) { String errMessage = TraceUtil.formatTraceMessage(conn, request, - "routed to the wrong server: " + response.getMessage()); + "routed to the wrong server: " + response.getMessage()); logger.warn(errMessage); if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) { throw new ObTableNeedFetchAllException(errMessage); @@ -141,16 +141,16 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques } if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) { String errMessage = TraceUtil.formatTraceMessage(conn, request, - "routed to the wrong server: " + response.getMessage()); + "routed to the wrong server: " + response.getMessage()); logger.warn(errMessage); if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) { throw new ObTableNeedFetchAllException(errMessage); } else if (needFetchPartial(resultCode.getRcode())) { throw new ObTableRoutingWrongException(errMessage); } else { - ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn.getObTable() - .getPort(), response.getHeader().getTraceId1(), response.getHeader() - .getTraceId0(), resultCode.getRcode(), resultCode.getErrMsg()); + ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn + .getObTable().getPort(), response.getHeader().getTraceId1(), response + .getHeader().getTraceId0(), resultCode.getRcode(), resultCode.getErrMsg()); } } @@ -190,25 +190,26 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque // schema changed private boolean needFetchAll(int errorCode, int pcode) { return errorCode == ResultCodes.OB_SCHEMA_ERROR.errorCode - || errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode - || errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode - || errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode - || (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode); + || errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode + || errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode + || errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode + || (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode); } + private boolean needFetchPartial(int errorCode) { return errorCode == ResultCodes.OB_LOCATION_LEADER_NOT_EXIST.errorCode - || errorCode == ResultCodes.OB_NOT_MASTER.errorCode - || errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode - || errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode - || errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode - || errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode - || errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode - || errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode - || errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode - || errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode - || errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode - || errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode - || errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode - || errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode; + || errorCode == ResultCodes.OB_NOT_MASTER.errorCode + || errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode + || errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode + || errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode + || errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode + || errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode + || errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode + || errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode + || errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode + || errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode + || errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode + || errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode + || errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode; } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java b/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java index eae7b60c..e519d7c9 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java +++ b/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java @@ -160,11 +160,12 @@ public BatchOperation setReturnOneResult(boolean returnOneResult) { public BatchOperationResult execute() throws Exception { if (returnOneResult && !ObGlobal.isReturnOneResultSupport()) { throw new FeatureNotSupportedException( - "returnOneResult is not supported in this Observer version [" + ObGlobal.obVsnString() +"]"); + "returnOneResult is not supported in this Observer version [" + + ObGlobal.obVsnString() + "]"); } else if (returnOneResult - && !(isSameType && (lastType == ObTableOperationType.INSERT - || lastType == ObTableOperationType.PUT - || lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) { + && !(isSameType && (lastType == ObTableOperationType.INSERT + || lastType == ObTableOperationType.PUT + || lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) { throw new IllegalArgumentException( "returnOneResult only support multi-insert/put/replace/del"); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java index e7fa9555..fbb79d79 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java @@ -358,8 +358,7 @@ public enum ResultCodes { OB_CLUSTER_NO_MATCH(-4666), // OB_CHECK_ZONE_MERGE_ORDER(-4667), // OB_ERR_ZONE_NOT_EMPTY(-4668), // - OB_USE_DUP_FOLLOW_AFTER_DML(-4686), - OB_LS_NOT_EXIST(-4719), // + OB_USE_DUP_FOLLOW_AFTER_DML(-4686), OB_LS_NOT_EXIST(-4719), // OB_TABLET_NOT_EXIST(-4725), // OB_ERR_PARSER_INIT(-5000), // OB_ERR_PARSE_SQL(-5001), // diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperationRequest.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperationRequest.java index 1d12f0b4..1cfde2be 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperationRequest.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperationRequest.java @@ -45,7 +45,6 @@ public class ObTableBatchOperationRequest extends ObTableAbstractOperationReques private ObTableBatchOperation batchOperation; private boolean batchOperationAsAtomic = false; - private ExecutorService pool = null; /* * Get pcode. @@ -172,12 +171,4 @@ public void setBatchOpReturnOneResult(boolean returnOneResult) { this.option_flag.setReturnOneResult(false); } } - - public void setPool(ExecutorService pool) { - this.pool = pool; - } - - public ExecutorService getPool() { - return this.pool; - } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java index 1a378051..7cd75788 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java @@ -139,8 +139,8 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger, } subObTable = client .getTableWithPartId(indexTableName, partIdWithIndex.getLeft(), - needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(), false, - route).getRight().getObTable(); + needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(), + false, route).getRight().getObTable(); } } if (client.isOdpMode()) { diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/login/ObTableLoginRequest.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/login/ObTableLoginRequest.java index e8c93c9f..0bdb4b85 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/login/ObTableLoginRequest.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/login/ObTableLoginRequest.java @@ -373,7 +373,7 @@ public long getTtlUs() { public void setTtlUs(long ttlUs) { this.ttlUs = ttlUs; } - + public String getConfigsStr() { return configsStr; } diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTable.java b/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTable.java index d663f001..4b009c9e 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTable.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTable.java @@ -41,7 +41,7 @@ public abstract class AbstractObTable extends AbstractTable { protected int nettyBlockingWaitInterval = NETTY_BLOCKING_WAIT_INTERVAL.getDefaultInt(); - protected long maxConnExpiredTime = MAX_CONN_EXPIRED_TIME.getDefaultLong(); + protected long maxConnExpiredTime = MAX_CONN_EXPIRED_TIME.getDefaultLong(); /* * Get ob table connect try times. @@ -165,5 +165,7 @@ public int getNettyBlockingWaitInterval() { /* * Get connection max expired time */ - public long getConnMaxExpiredTime() { return maxConnExpiredTime; } + public long getConnMaxExpiredTime() { + return maxConnExpiredTime; + } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTableClient.java index 10878ddf..49168ebe 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTableClient.java @@ -94,7 +94,8 @@ public abstract class AbstractObTableClient extends AbstractTable { .getDefaultInt(); protected long slowQueryMonitorThreshold = SLOW_QUERY_MONITOR_THRESHOLD .getDefaultLong(); - protected Long maxConnExpiredTime = MAX_CONN_EXPIRED_TIME.getDefaultLong(); + protected Long maxConnExpiredTime = MAX_CONN_EXPIRED_TIME + .getDefaultLong(); @Deprecated /* diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java index e585797c..857c3d5e 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java @@ -336,7 +336,8 @@ public void partitionExecute(ObTableOperationResult[] results, } ObTableParam newParam = obTableClient.getTableWithPartId(tableName, originPartId, needRefreshTableEntry, - obTableClient.isTableEntryRefreshIntervalWait(), needFetchAllRouteInfo, route).getRight(); + obTableClient.isTableEntryRefreshIntervalWait(), needFetchAllRouteInfo, + route).getRight(); subObTable = newParam.getObTable(); subRequest.setPartitionId(newParam.getPartitionId()); diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObAtomicBatchOperationTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObAtomicBatchOperationTest.java index 8c16bcd9..91b49e38 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObAtomicBatchOperationTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObAtomicBatchOperationTest.java @@ -209,9 +209,9 @@ public void testBatchOperation() { } @Test - public void testReturnOneRes() { - Assume.assumeTrue("Skipping returnOneResult when ob version not support", ObGlobal.isReturnOneResultSupport()); + Assume.assumeTrue("Skipping returnOneResult when ob version not support", + ObGlobal.isReturnOneResultSupport()); TableBatchOps batchOps = obTableClient.batch("test_varchar_table"); // no atomic ReturnOneRes batch operation try { @@ -297,7 +297,8 @@ public void testReturnOneRes() { @Test public void testReturnOneResPartition() throws Exception { - Assume.assumeTrue("Skiping returnOneResult when ob version not support", ObGlobal.isReturnOneResultSupport()); + Assume.assumeTrue("Skiping returnOneResult when ob version not support", + ObGlobal.isReturnOneResultSupport()); BatchOperation batchOperation = obTableClient.batchOperation("test_mutation"); Object values[][] = { { 1L, "c2_val", "c3_val", 100L }, { 200L, "c2_val", "c3_val", 100L }, { 401L, "c2_val", "c3_val", 100L }, { 2000L, "c2_val", "c3_val", 100L }, @@ -328,7 +329,8 @@ public void testReturnOneResPartition() throws Exception { @Test public void testBatchGet() throws Exception { - Assume.assumeTrue("Skipping returnOneResult when ob version not support", ObGlobal.isReturnOneResultSupport()); + Assume.assumeTrue("Skipping returnOneResult when ob version not support", + ObGlobal.isReturnOneResultSupport()); try { { // insert diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableAuditTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableAuditTest.java index 0f7c717f..2b95748e 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableAuditTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableAuditTest.java @@ -113,19 +113,15 @@ private List get_sql_id(String keyWord) throws SQLException { public void testSingleOperation() throws Exception { try { String prefix = generateRandomString(10); - client.insertOrUpdate(tableName) - .setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", prefix)) - .execute(); + client.insertOrUpdate(tableName).setRowKey(colVal("c1", 1L)) + .addMutateColVal(colVal("c2", prefix)).execute(); List sqlIds = get_sql_id(prefix); Assert.assertEquals(1, sqlIds.size()); // same operation and columns generate same sql_id sqlIds.clear(); - client.insertOrUpdate(tableName) - .setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", prefix)) - .execute(); + client.insertOrUpdate(tableName).setRowKey(colVal("c1", 1L)) + .addMutateColVal(colVal("c2", prefix)).execute(); sqlIds = get_sql_id(prefix); Assert.assertEquals(2, sqlIds.size()); for (String sqlId : sqlIds) { @@ -134,10 +130,8 @@ public void testSingleOperation() throws Exception { // different operation generate different sql_id sqlIds.clear(); - client.update(tableName) - .setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", prefix)) - .execute(); + client.update(tableName).setRowKey(colVal("c1", 1L)) + .addMutateColVal(colVal("c2", prefix)).execute(); sqlIds = get_sql_id(prefix); Assert.assertEquals(3, sqlIds.size()); Assert.assertEquals(sqlIds.get(0), sqlIds.get(1)); @@ -146,10 +140,8 @@ public void testSingleOperation() throws Exception { // different columns generate different sql_id // write c3 sqlIds.clear(); - client.update(tableName) - .setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c3", prefix)) - .execute(); + client.update(tableName).setRowKey(colVal("c1", 1L)) + .addMutateColVal(colVal("c3", prefix)).execute(); sqlIds = get_sql_id(prefix); Assert.assertEquals(4, sqlIds.size()); Assert.assertEquals(sqlIds.get(0), sqlIds.get(1)); @@ -175,9 +167,9 @@ public void testMultiOperation() throws Exception { String prefix = generateRandomString(10); BatchOperation batchOps = client.batchOperation(tableName); Insert ins1 = client.insert(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", prefix)); + .addMutateColVal(colVal("c2", prefix)); Insert ins2 = client.insert(tableName).setRowKey(colVal("c1", 2L)) - .addMutateColVal(colVal("c2", prefix)); + .addMutateColVal(colVal("c2", prefix)); batchOps.addOperation(ins1, ins2).execute(); List sqlIds = get_sql_id(prefix); Assert.assertEquals(1, sqlIds.size()); @@ -186,9 +178,9 @@ public void testMultiOperation() throws Exception { sqlIds.clear(); batchOps = client.batchOperation(tableName); ins1 = client.insert(tableName).setRowKey(colVal("c1", 3L)) - .addMutateColVal(colVal("c2", prefix)); + .addMutateColVal(colVal("c2", prefix)); ins2 = client.insert(tableName).setRowKey(colVal("c1", 4L)) - .addMutateColVal(colVal("c2", prefix)); + .addMutateColVal(colVal("c2", prefix)); batchOps.addOperation(ins1, ins2).execute(); sqlIds = get_sql_id(prefix); Assert.assertEquals(2, sqlIds.size()); @@ -217,9 +209,9 @@ public void testMixedBatchOperation1() throws Exception { // mixed op has multi sql_id BatchOperation batchOps = client.batchOperation(tableName); Insert ins = client.insert(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", prefix)); + .addMutateColVal(colVal("c2", prefix)); InsertOrUpdate insUp = client.insertOrUpdate(tableName).setRowKey(colVal("c1", 2L)) - .addMutateColVal(colVal("c2", prefix)); + .addMutateColVal(colVal("c2", prefix)); batchOps.addOperation(ins, insUp).execute(); List sqlIds = get_sql_id(prefix); Assert.assertEquals(2, sqlIds.size()); @@ -245,11 +237,11 @@ public void testMixedBatchOperation2() throws Exception { // mixed op has multi sql_id BatchOperation batchOps = client.batchOperation(tableName); Insert ins = client.insert(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", prefix)); + .addMutateColVal(colVal("c2", prefix)); InsertOrUpdate insUp = client.insertOrUpdate(tableName).setRowKey(colVal("c1", 2L)) - .addMutateColVal(colVal("c2", prefix)); + .addMutateColVal(colVal("c2", prefix)); Append appn = client.append(tableName).setRowKey(colVal("c1", 2L)) - .addMutateColVal(colVal("c2", prefix)); + .addMutateColVal(colVal("c2", prefix)); batchOps.addOperation(ins, insUp, appn).execute(); List sqlIds = get_sql_id(prefix); Assert.assertEquals(3, sqlIds.size()); @@ -273,10 +265,8 @@ public void testSyncQuery() throws Exception { // query $table_name $column_0, $column_1, ..., $column_n range:$column_0, $column_1, ..., $column_n index:$index_name $filter try { // insert - client.insertOrUpdate(tableName) - .setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", "c2_val")) - .execute(); + client.insertOrUpdate(tableName).setRowKey(colVal("c1", 1L)) + .addMutateColVal(colVal("c2", "c2_val")).execute(); // query int limit = generateRandomNumber(); @@ -382,7 +372,7 @@ public void testSyncQueryFilter1() throws Exception { List sqlIds = get_sql_id("limit:" + limit); Assert.assertEquals(2, sqlIds.size()); Assert.assertNotEquals(sqlIds.get(0), sqlIds.get(1)); - } catch (Exception e){ + } catch (Exception e) { throw new RuntimeException(e); } } @@ -410,7 +400,7 @@ public void testSyncQueryFilter2() throws Exception { List sqlIds = get_sql_id("limit:" + limit); Assert.assertEquals(2, sqlIds.size()); Assert.assertNotEquals(sqlIds.get(0), sqlIds.get(1)); - } catch (Exception e){ + } catch (Exception e) { throw new RuntimeException(e); } } @@ -429,10 +419,8 @@ public void testSyncQueryFilter3() throws Exception { // query $table_name $column_0, $column_1, ..., $column_n range:$column_0, $column_1, ..., $column_n index:$index_name $filter try { // insert - client.insertOrUpdate(tableName) - .setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", "c2_val")) - .execute(); + client.insertOrUpdate(tableName).setRowKey(colVal("c1", 1L)) + .addMutateColVal(colVal("c2", "c2_val")).execute(); // query int limit = generateRandomNumber(); @@ -477,10 +465,8 @@ public void testAsyncQuery() throws Exception { // query $table_name $column_0, $column_1, ..., $column_n range:$column_0, $column_1, ..., $column_n index:$index_name $filter try { // insert - client.insertOrUpdate(tableName) - .setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", "c2_val")) - .execute(); + client.insertOrUpdate(tableName).setRowKey(colVal("c1", 1L)) + .addMutateColVal(colVal("c2", "c2_val")).execute(); // query int limit = generateRandomNumber(); @@ -554,10 +540,8 @@ public void testAsyncQueryFilter() throws Exception { // query $table_name $column_0, $column_1, ..., $column_n range:$column_0, $column_1, ..., $column_n index:$index_name $filter try { // insert - client.insertOrUpdate(tableName) - .setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", "c2_val")) - .execute(); + client.insertOrUpdate(tableName).setRowKey(colVal("c1", 1L)) + .addMutateColVal(colVal("c2", "c2_val")).execute(); // query int limit = generateRandomNumber(); @@ -601,10 +585,8 @@ public void testAsyncQueryFilter() throws Exception { public void testQueryAndMutate() throws Exception { try { // insert - client.insertOrUpdate(tableName) - .setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", "c2_val")) - .execute(); + client.insertOrUpdate(tableName).setRowKey(colVal("c1", 1L)) + .addMutateColVal(colVal("c2", "c2_val")).execute(); int limit = generateRandomNumber(); TableQuery tableQuery = client.query(tableName); @@ -613,7 +595,7 @@ public void testQueryAndMutate() throws Exception { tableQuery.select("c1"); tableQuery.limit(limit); ObTableQueryAndMutateRequest req = client.obTableQueryAndAppend(tableQuery, - new String[] { "c2"}, new Object[] {"_append0" }, false); + new String[] { "c2" }, new Object[] { "_append0" }, false); client.execute(req); List sqlIds = get_sql_id("limit:" + String.valueOf(limit)); Assert.assertEquals(3, sqlIds.size()); // query twice diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableClientAutoIncTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableClientAutoIncTest.java index 486c1559..14162178 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableClientAutoIncTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableClientAutoIncTest.java @@ -591,7 +591,8 @@ public void test_autoinc_in_batch() throws Exception { test_autoinc_in_batch_inner("test_auto_increment_batch_ttl", true, true); } - public void test_autoinc_in_batch_inner(String tableName, boolean useAutoinc, boolean useReplace) throws Exception { + public void test_autoinc_in_batch_inner(String tableName, boolean useAutoinc, boolean useReplace) + throws Exception { try { int batchSize = 10; BatchOperation batchOperation = client.batchOperation(tableName); @@ -619,7 +620,8 @@ public void test_autoinc_in_batch_inner(String tableName, boolean useAutoinc, bo // check result TableQuery tableQuery = client.query(tableName); tableQuery.select("c1", "c2", "c3"); - tableQuery.addScanRange(new Object[]{100L}, new Object[]{Long.valueOf(100 + batchSize - 1)}); + tableQuery.addScanRange(new Object[] { 100L }, + new Object[] { Long.valueOf(100 + batchSize - 1) }); QueryResultSet result = tableQuery.execute(); int i = 0; Long c2_val = -1L; @@ -627,8 +629,8 @@ public void test_autoinc_in_batch_inner(String tableName, boolean useAutoinc, bo Row row = result.getResultRow(); Assert.assertEquals(Long.valueOf(i + 100), row.get("c1")); if (useAutoinc) { - Assert.assertTrue(c2_val < (Long)row.get("c2")); - c2_val = (Long)row.get("c2"); + Assert.assertTrue(c2_val < (Long) row.get("c2")); + c2_val = (Long) row.get("c2"); } else { Assert.assertEquals(Long.valueOf(i + 1000), row.get("c2")); } diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableClientInfoTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableClientInfoTest.java index d51c7c30..39cc0dab 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableClientInfoTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableClientInfoTest.java @@ -37,10 +37,10 @@ public class ObTableClientInfoTest { public ObTableClient[] clients; - private int connCnt = 10; - private int clientCnt = 10; - private Long connMaxExpiredTime = 1L; - String tableName = "test_varchar_table"; + private int connCnt = 10; + private int clientCnt = 10; + private Long connMaxExpiredTime = 1L; + String tableName = "test_varchar_table"; /** CREATE TABLE `test_varchar_table` ( @@ -57,15 +57,14 @@ public void setup() throws Exception { for (int i = 0; i < clientCnt; i++) { clients[i] = ObTableClientTestUtil.newTestClient(); clients[i].addProperty(Property.SERVER_CONNECTION_POOL_SIZE.getKey(), - Integer.toString(connCnt)); - clients[i].addProperty(Property.RUNTIME_RETRY_TIMES.getKey(), - Integer.toString(i + 3)); - clients[i].addProperty(Property.MAX_CONN_EXPIRED_TIME.getKey(), Long.toString(connMaxExpiredTime)); + Integer.toString(connCnt)); + clients[i].addProperty(Property.RUNTIME_RETRY_TIMES.getKey(), Integer.toString(i + 3)); + clients[i].addProperty(Property.MAX_CONN_EXPIRED_TIME.getKey(), + Long.toString(connMaxExpiredTime)); clients[i].init(); } } - @Test public void test_conncetion() throws Exception { String tenantName = ObTableClientTestUtil.getTenantName(); // mysql @@ -98,21 +97,22 @@ private String genClientIdStr() { return sb.toString(); } - private void checkGvClientInfo(Connection conn, String tenantName, String userName, boolean is_update) throws Exception { + private void checkGvClientInfo(Connection conn, String tenantName, String userName, + boolean is_update) throws Exception { if (conn == null) { throw new NullPointerException(); } Statement statement = conn.createStatement(); - String SQL = "select a.client_id AS CLIENT_ID, b.tenant_name as TENANT_NAME, a.user_name AS USER_NAME, a.client_info AS CLIENT_INFO, " + - "a.first_login_ts AS FIRST_LOGIN_TS, a.last_login_ts AS LAST_LOGIN_TS " + - "from oceanbase.GV$OB_KV_CLIENT_INFO a inner join oceanbase.DBA_OB_TENANTS b on a.tenant_id = b.tenant_id " + - "where a.client_id in "+ genClientIdStr(); + String SQL = "select a.client_id AS CLIENT_ID, b.tenant_name as TENANT_NAME, a.user_name AS USER_NAME, a.client_info AS CLIENT_INFO, " + + "a.first_login_ts AS FIRST_LOGIN_TS, a.last_login_ts AS LAST_LOGIN_TS " + + "from oceanbase.GV$OB_KV_CLIENT_INFO a inner join oceanbase.DBA_OB_TENANTS b on a.tenant_id = b.tenant_id " + + "where a.client_id in " + genClientIdStr(); statement.execute(SQL); ResultSet resultSet = statement.getResultSet(); int resCount = 0; Map resultMap = new HashMap(); - while(resultSet.next()) { + while (resultSet.next()) { resCount++; resultMap.put(resultSet.getLong("CLIENT_ID"), resultSet.getString("CLIENT_INFO")); Assert.assertEquals(userName, resultSet.getString("USER_NAME")); @@ -132,14 +132,16 @@ private void checkGvClientInfo(Connection conn, String tenantName, String userNa String json_config_str = resultMap.get(clients[i].getClientId()); Assert.assertTrue(json_config_str != null); Map config_map = JSON.parseObject(json_config_str); - Long srcClientId = (Long)clients[i].getTableConfigs().get("client_id"); - Long dstClientId = (Long)config_map.get("client_id"); + Long srcClientId = (Long) clients[i].getTableConfigs().get("client_id"); + Long dstClientId = (Long) config_map.get("client_id"); Assert.assertEquals(srcClientId, dstClientId); // sample check another object result : RUNTIME_RETRY_TIMES - Map srcRouteMap = (Map)clients[i].getTableConfigs().get("runtime"); - Map dstRouteMap = (Map)config_map.get("runtime"); - Assert.assertEquals(srcRouteMap.get(RUNTIME_RETRY_TIMES.getKey()), dstRouteMap.get(RUNTIME_RETRY_TIMES.getKey())); + Map srcRouteMap = (Map) clients[i].getTableConfigs() + .get("runtime"); + Map dstRouteMap = (Map) config_map.get("runtime"); + Assert.assertEquals(srcRouteMap.get(RUNTIME_RETRY_TIMES.getKey()), + dstRouteMap.get(RUNTIME_RETRY_TIMES.getKey())); } } @@ -147,7 +149,7 @@ private void doGet(ObTableClient client) { try { client.get(tableName, new String[] { "k1" }, new String[] { "c1" }); } catch (Exception e) { - Assert.assertTrue(true); // table is not exist + Assert.assertTrue(true); // table is not exist } } diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableClientPartitionHashTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableClientPartitionHashTest.java index d265e7d8..1cb0bc9d 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableClientPartitionHashTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableClientPartitionHashTest.java @@ -201,9 +201,8 @@ public void testQuery() throws Exception { // query with multiply partitions tableQuery = obTableClient.query("testHash"); - tableQuery.addScanRange( - new Object[] { timeStamp, "partition".getBytes(), timeStamp }, new Object[] { - timeStamp + 10, "partition".getBytes(), timeStamp }); + tableQuery.addScanRange(new Object[] { timeStamp, "partition".getBytes(), timeStamp }, + new Object[] { timeStamp + 10, "partition".getBytes(), timeStamp }); tableQuery.select("K", "Q", "T"); result = tableQuery.execute(); Assert.assertEquals(5, result.cacheSize()); @@ -211,8 +210,7 @@ public void testQuery() throws Exception { // query with multiply partitions using prefix K tableQuery = obTableClient.query("testHash"); tableQuery.setScanRangeColumns("K"); - tableQuery - .addScanRange(new Object[] { timeStamp }, new Object[] { timeStamp + 10 }); + tableQuery.addScanRange(new Object[] { timeStamp }, new Object[] { timeStamp + 10 }); tableQuery.select("Q", "V"); result = tableQuery.execute(); Assert.assertEquals(5, result.cacheSize()); @@ -291,9 +289,8 @@ public void testAsyncQuery() throws Exception { // query with multiply partitions tableQuery = obTableClient.query("testHash"); - tableQuery.addScanRange( - new Object[] { timeStamp, "partition".getBytes(), timeStamp }, new Object[] { - timeStamp + 10, "partition".getBytes(), timeStamp }); + tableQuery.addScanRange(new Object[] { timeStamp, "partition".getBytes(), timeStamp }, + new Object[] { timeStamp + 10, "partition".getBytes(), timeStamp }); tableQuery.select("K", "Q", "T"); tableQuery.setBatchSize(2); result = tableQuery.asyncExecute(); @@ -301,8 +298,7 @@ public void testAsyncQuery() throws Exception { // query with multiply partitions using prefix K tableQuery = obTableClient.query("testHash"); tableQuery.setScanRangeColumns("K"); - tableQuery - .addScanRange(new Object[] { timeStamp }, new Object[] { timeStamp + 10 }); + tableQuery.addScanRange(new Object[] { timeStamp }, new Object[] { timeStamp + 10 }); tableQuery.select("Q", "V"); tableQuery.setBatchSize(1); result = tableQuery.asyncExecute(); diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableClientPartitionKeyTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableClientPartitionKeyTest.java index dfee15d2..0f000c1c 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableClientPartitionKeyTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableClientPartitionKeyTest.java @@ -267,7 +267,8 @@ public void testQuery() throws Exception { Assert.assertEquals("value1", new String((byte[]) row.get("V"))); tableQuery = obTableClient.query(TEST_TABLE); - tableQuery.addScanRange(new Object[] {"key1_1".getBytes()}, new Object[] {"key1_8".getBytes()}); + tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, + new Object[] { "key1_8".getBytes() }); tableQuery.setScanRangeColumns("K"); tableQuery.select("Q", "T", "K", "V"); result = tableQuery.execute(); @@ -422,8 +423,8 @@ public void testAsyncQuery() throws Exception { tableQuery = obTableClient.query(TEST_TABLE); tableQuery.setScanRangeColumns("K"); - tableQuery.addScanRange( - new Object[] {"key1_1".getBytes()}, new Object[] {"key1_8".getBytes()}); + tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, + new Object[] { "key1_8".getBytes() }); tableQuery.select("Q", "T", "K", "V"); result = tableQuery.asyncExecute(); Assert.assertTrue(result.cacheSize() >= 2); diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableClientTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableClientTest.java index 1d4e2b43..a09d0b0f 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableClientTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableClientTest.java @@ -2610,83 +2610,89 @@ public void test_prefix_scan() throws Exception { cleanTable(tableName); timeStamp = System.currentTimeMillis(); try { - client.insert(tableName, - new Object[] { "key1_1".getBytes(), "partition".getBytes(), timeStamp }, - new String[] { "V" }, new Object[] { "value1".getBytes() }); - client.insert(tableName, - new Object[] { "key1_1".getBytes(), "partition".getBytes(), timeStamp + 1 }, - new String[] { "V" }, new Object[] { "value2".getBytes() }); - client.insert(tableName, - new Object[] { "key1_2".getBytes(), "partition".getBytes(), timeStamp }, - new String[] { "V" }, new Object[] { "value3".getBytes() }); + client.insert(tableName, new Object[] { "key1_1".getBytes(), "partition".getBytes(), + timeStamp }, new String[] { "V" }, new Object[] { "value1".getBytes() }); + client.insert(tableName, new Object[] { "key1_1".getBytes(), "partition".getBytes(), + timeStamp + 1 }, new String[] { "V" }, new Object[] { "value2".getBytes() }); + client.insert(tableName, new Object[] { "key1_2".getBytes(), "partition".getBytes(), + timeStamp }, new String[] { "V" }, new Object[] { "value3".getBytes() }); TableQuery tableQuery = client.query(tableName); QueryResultSet result = tableQuery.execute(); Assert.assertEquals(3, result.cacheSize()); tableQuery = client.query(tableName); - tableQuery.addScanRange(new Object[] { "key1_1".getBytes(), "partition".getBytes()}, - new Object[] { "key1_1".getBytes(), "partition".getBytes()}); + tableQuery.addScanRange(new Object[] { "key1_1".getBytes(), "partition".getBytes() }, + new Object[] { "key1_1".getBytes(), "partition".getBytes() }); tableQuery.setScanRangeColumns("K", "Q"); result = tableQuery.execute(); Assert.assertEquals(2, result.cacheSize()); tableQuery = client.query(tableName); tableQuery.setScanRangeColumns("K"); - tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, new Object[] { "key1_2".getBytes() } ); + tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, + new Object[] { "key1_2".getBytes() }); result = tableQuery.execute(); Assert.assertEquals(3, result.cacheSize()); tableQuery = client.query(tableName); tableQuery.setScanRangeColumns("K"); - tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, true, new Object[] { "key1_2".getBytes() }, false ); + tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, true, + new Object[] { "key1_2".getBytes() }, false); result = tableQuery.execute(); Assert.assertEquals(2, result.cacheSize()); tableQuery = client.query(tableName); tableQuery.setScanRangeColumns("K"); - tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, false, new Object[] { "key1_2".getBytes() }, true ); + tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, false, + new Object[] { "key1_2".getBytes() }, true); result = tableQuery.execute(); Assert.assertEquals(1, result.cacheSize()); tableQuery = client.query(tableName); tableQuery.setScanRangeColumns("K"); - tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, false, new Object[] { "key1_2".getBytes() }, false ); + tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, false, + new Object[] { "key1_2".getBytes() }, false); result = tableQuery.execute(); Assert.assertEquals(0, result.cacheSize()); tableQuery = client.query(tableName); tableQuery.setScanRangeColumns("K"); tableQuery.indexName("idx_k_v"); - tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, new Object[] { "key1_2".getBytes() }); + tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, + new Object[] { "key1_2".getBytes() }); result = tableQuery.execute(); Assert.assertEquals(3, result.cacheSize()); tableQuery = client.query(tableName); tableQuery.setScanRangeColumns("K"); tableQuery.indexName("idx_k_v"); - tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, new Object[] { "key1_1".getBytes() }); + tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, + new Object[] { "key1_1".getBytes() }); result = tableQuery.execute(); Assert.assertEquals(2, result.cacheSize()); tableQuery = client.query(tableName); tableQuery.setScanRangeColumns("K"); tableQuery.indexName("idx_k_v"); - tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, false, new Object[] { "key1_2".getBytes() }, true); + tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, false, + new Object[] { "key1_2".getBytes() }, true); result = tableQuery.execute(); Assert.assertEquals(1, result.cacheSize()); tableQuery = client.query(tableName); tableQuery.setScanRangeColumns("K"); tableQuery.indexName("idx_k_v"); - tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, true, new Object[] { "key1_2".getBytes() }, false); + tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, true, + new Object[] { "key1_2".getBytes() }, false); result = tableQuery.execute(); Assert.assertEquals(2, result.cacheSize()); tableQuery = client.query(tableName); tableQuery.setScanRangeColumns("K"); tableQuery.indexName("idx_k_v"); - tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, false, new Object[] { "key1_2".getBytes() }, false); + tableQuery.addScanRange(new Object[] { "key1_1".getBytes() }, false, + new Object[] { "key1_2".getBytes() }, false); result = tableQuery.execute(); Assert.assertEquals(0, result.cacheSize()); diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableGroupCommitTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableGroupCommitTest.java index d54622a9..292af0b7 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableGroupCommitTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableGroupCommitTest.java @@ -90,9 +90,10 @@ private void checkSystemView() throws SQLException { // mysql tenant Connection mysql_conn = ObTableClientTestUtil.getConnection(); Statement statement = mysql_conn.createStatement(); - statement.execute("select b.tenant_id as tenant_id, b.tenant_name as tenant_name, a.group_type as group_type, a.batch_size as batch_size " + - " from oceanbase.GV$OB_KV_GROUP_COMMIT_STATUS a inner join " + - "oceanbase.DBA_OB_TENANTS b on a.tenant_id = b.tenant_id group by a.group_type"); + statement + .execute("select b.tenant_id as tenant_id, b.tenant_name as tenant_name, a.group_type as group_type, a.batch_size as batch_size " + + " from oceanbase.GV$OB_KV_GROUP_COMMIT_STATUS a inner join " + + "oceanbase.DBA_OB_TENANTS b on a.tenant_id = b.tenant_id group by a.group_type"); ResultSet resultSet = statement.getResultSet(); int resCount = 0; System.out.println("visit by mysql tenant:"); @@ -101,7 +102,8 @@ private void checkSystemView() throws SQLException { String tenant_name = resultSet.getString("tenant_name"); String group_type = resultSet.getString("group_type"); long batch_size = resultSet.getLong("batch_size"); - System.out.println("tenant_id:" + tenant_id+", tenant_name: "+ tenant_name +", group_type: "+group_type+", batch_size: "+batch_size); + System.out.println("tenant_id:" + tenant_id + ", tenant_name: " + tenant_name + + ", group_type: " + group_type + ", batch_size: " + batch_size); resCount++; } Assert.assertTrue(resCount >= 3); @@ -110,9 +112,12 @@ private void checkSystemView() throws SQLException { // sys tenant Connection sys_conn = ObTableClientTestUtil.getSysConnection(); Statement statement2 = sys_conn.createStatement(); - statement2.execute("select b.tenant_id as tenant_id, b.tenant_name as tenant_name, a.group_type as group_type, a.batch_size as batch_size " + - " from oceanbase.GV$OB_KV_GROUP_COMMIT_STATUS a inner join " + - "oceanbase.__all_tenant b on a.tenant_id = b.tenant_id where b.tenant_name in ('sys', '"+ObTableClientTestUtil.getTenantName()+"') group by b.tenant_name, a.group_type;"); + statement2 + .execute("select b.tenant_id as tenant_id, b.tenant_name as tenant_name, a.group_type as group_type, a.batch_size as batch_size " + + " from oceanbase.GV$OB_KV_GROUP_COMMIT_STATUS a inner join " + + "oceanbase.__all_tenant b on a.tenant_id = b.tenant_id where b.tenant_name in ('sys', '" + + ObTableClientTestUtil.getTenantName() + + "') group by b.tenant_name, a.group_type;"); resultSet = statement2.getResultSet(); resCount = 0; System.out.println("visit by sys tenant:"); @@ -121,7 +126,8 @@ private void checkSystemView() throws SQLException { String tenant_name = resultSet.getString("tenant_name"); String group_type = resultSet.getString("group_type"); long batch_size = resultSet.getLong("batch_size"); - System.out.println("tenant_id:" + tenant_id+", tenant_name: "+ tenant_name +", group_type: "+group_type+", batch_size: "+batch_size); + System.out.println("tenant_id:" + tenant_id + ", tenant_name: " + tenant_name + + ", group_type: " + group_type + ", batch_size: " + batch_size); resCount++; } Assert.assertTrue(resCount >= 4); @@ -149,7 +155,7 @@ public void run() { String c1 = String.format("rk_%d_%d", id, counter); String c2 = String.format("col_%d_%d", id, counter); obTableClient.insert(tableName).setRowKey(row(colVal("c1", c1))) - .addMutateRow(row(colVal("c2",c2))).execute(); + .addMutateRow(row(colVal("c2", c2))).execute(); counter++; } catch (Exception e) { e.printStackTrace(); @@ -192,12 +198,12 @@ private void switchGroupCommit(boolean is_enable) throws SQLException { int batch_size = is_enable ? 10 : 1; Connection mysql_conn = ObTableClientTestUtil.getConnection(); Statement statement = mysql_conn.createStatement(); - statement.execute("alter system set kv_group_commit_batch_size = "+ batch_size); + statement.execute("alter system set kv_group_commit_batch_size = " + batch_size); } private void deleteTable(String tableName) throws SQLException { Connection mysql_conn = ObTableClientTestUtil.getConnection(); Statement statement = mysql_conn.createStatement(); - statement.execute("delete from "+ tableName); + statement.execute("delete from " + tableName); } } diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableLsBatchTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableLsBatchTest.java index ec04c791..fe1bd778 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableLsBatchTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableLsBatchTest.java @@ -310,7 +310,8 @@ public void testGetAllObjType() throws Exception { @Test public void testBatchInsert() throws Exception { - Assume.assumeTrue("Skipping returnOneResult when ob version not support", ObGlobal.isReturnOneResultSupport()); + Assume.assumeTrue("Skipping returnOneResult when ob version not support", + ObGlobal.isReturnOneResultSupport()); BatchOperation batchOperation = client.batchOperation(TABLE_NAME); Object values[][] = { { 1L, "c2_val", "c3_val", 100L }, { 400L, "c2_val", "c3_val", 100L }, { 401L, "c2_val", "c3_val", 100L }, { 1000L, "c2_val", "c3_val", 100L }, @@ -398,7 +399,8 @@ public void testBatchAppend() throws Exception { @Test public void testBatchDel() throws Exception { - Assume.assumeTrue("Skipping returnOneResult when ob version not support", ObGlobal.isReturnOneResultSupport()); + Assume.assumeTrue("Skipping returnOneResult when ob version not support", + ObGlobal.isReturnOneResultSupport()); Object values[][] = { { 1L, "c2_val", "c3_val", 100L }, { 400L, "c2_val", "c3_val", 100L }, { 401L, "c2_val", "c3_val", 100L }, { 1000L, "c2_val", "c3_val", 100L }, { 1001L, "c2_val", "c3_val", 100L }, { 1002L, "c2_val", "c3_val", 100L }, }; @@ -499,7 +501,8 @@ public void testBatchIncrement() throws Exception { @Test public void testBatchReplace() throws Exception { - Assume.assumeTrue("Skipping returnOneResult when ob version not support", ObGlobal.isReturnOneResultSupport()); + Assume.assumeTrue("Skipping returnOneResult when ob version not support", + ObGlobal.isReturnOneResultSupport()); Object values[][] = { { 1L, "c2_val", "c3_val", 100L }, { 400L, "c2_val", "c3_val", 100L }, { 401L, "c2_val", "c3_val", 100L }, { 1000L, "c2_val", "c3_val", 100L }, { 1001L, "c2_val", "c3_val", 100L }, { 1002L, "c2_val", "c3_val", 100L }, }; @@ -716,7 +719,8 @@ public void testHybridBatch() throws Exception { @Test public void testPut() throws Exception { - Assume.assumeTrue("Skipping returnOneResult when ob version not support", ObGlobal.isReturnOneResultSupport()); + Assume.assumeTrue("Skipping returnOneResult when ob version not support", + ObGlobal.isReturnOneResultSupport()); // put operation should set binlog_row_image minimal Connection connection = ObTableClientTestUtil.getConnection(); Statement statement = connection.createStatement(); diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableModeTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableModeTest.java index cfe9642e..7e6ef9c2 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableModeTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableModeTest.java @@ -47,31 +47,31 @@ public class ObTableModeTest { ObTableClient client; - public static String tableName = "test_varchar_table"; - public static String mysqlCompatMode = "mysql"; + public static String tableName = "test_varchar_table"; + public static String mysqlCompatMode = "mysql"; public static String oracleCompatMode = "oracle"; - public static String allKVMode = "ALL"; - public static String tableKVMode = "TABLEAPI"; - public static String hbaseKVMode = "HBASE"; - public static String RedisKVMode = "REDIS"; - public static String noneKVMode = "NONE"; + public static String allKVMode = "ALL"; + public static String tableKVMode = "TABLEAPI"; + public static String hbaseKVMode = "HBASE"; + public static String RedisKVMode = "REDIS"; + public static String noneKVMode = "NONE"; - public static String tpUnit = "tpUnit"; - public static String tpPool = "tpPool"; - public static String tpTenant = "tpTenant"; + public static String tpUnit = "tpUnit"; + public static String tpPool = "tpPool"; + public static String tpTenant = "tpTenant"; - public static String hbaseUnit = "hbaseUnit"; - public static String hbasePool = "hbasePool"; - public static String hbaseTenant = "hbaseTenant"; + public static String hbaseUnit = "hbaseUnit"; + public static String hbasePool = "hbasePool"; + public static String hbaseTenant = "hbaseTenant"; - public static String tableUnit = "tableUnit"; - public static String tablePool = "tablePool"; - public static String tableTenant = "tableTenant"; + public static String tableUnit = "tableUnit"; + public static String tablePool = "tablePool"; + public static String tableTenant = "tableTenant"; - public static String redisUnit = "redisUnit"; - public static String redisPool = "redisPool"; - public static String redisTenant = "redisTenant"; + public static String redisUnit = "redisUnit"; + public static String redisPool = "redisPool"; + public static String redisTenant = "redisTenant"; @Before public void setup() throws Exception { @@ -97,21 +97,17 @@ public static String extractClusterName(String input) { public void createTable(String userName, String tenantName) throws Exception { String user = userName + "@" + tenantName; - String url = "jdbc:mysql://" + JDBC_IP + ":" + JDBC_PORT + "/ " + "test" + "?" + - "rewriteBatchedStatements=TRUE&" + - "allowMultiQueries=TRUE&" + - "useLocalSessionState=TRUE&" + - "useUnicode=TRUE&" + - "characterEncoding=utf-8&" + - "socketTimeout=3000000&" + - "connectTimeout=60000"; + String url = "jdbc:mysql://" + JDBC_IP + ":" + JDBC_PORT + "/ " + "test" + "?" + + "rewriteBatchedStatements=TRUE&" + "allowMultiQueries=TRUE&" + + "useLocalSessionState=TRUE&" + "useUnicode=TRUE&" + + "characterEncoding=utf-8&" + "socketTimeout=3000000&" + + "connectTimeout=60000"; Connection conn = DriverManager.getConnection(url, user, PASSWORD); Statement statement = conn.createStatement(); - statement.execute("CREATE TABLE IF NOT EXISTS `test_varchar_table` (" + - " `c1` varchar(20) NOT NULL," + - " `c2` varchar(20) DEFAULT NULL," + - " PRIMARY KEY (`c1`)" + - " );"); + statement.execute("CREATE TABLE IF NOT EXISTS `test_varchar_table` (" + + " `c1` varchar(20) NOT NULL," + + " `c2` varchar(20) DEFAULT NULL," + " PRIMARY KEY (`c1`)" + + " );"); } public void createResourceUnit(String unitName) throws Exception { @@ -124,15 +120,19 @@ public void createResourcePool(String unitName, String poolName) throws Exceptio createResourceUnit(unitName); Connection conn = ObTableClientTestUtil.getSysConnection(); Statement statement = conn.createStatement(); - statement.execute("create resource pool " + poolName + " unit = '" + unitName + "', unit_num = 1;"); + statement.execute("create resource pool " + poolName + " unit = '" + unitName + + "', unit_num = 1;"); } - public void createTenant(String unitName, String poolName, String tenantName, String compatMode, String kvMode) throws Exception { + public void createTenant(String unitName, String poolName, String tenantName, + String compatMode, String kvMode) throws Exception { createResourcePool(unitName, poolName); Connection conn = ObTableClientTestUtil.getSysConnection(); Statement statement = conn.createStatement(); - statement.execute("create tenant " + tenantName + " replica_num = 1, resource_pool_list=('"+ poolName +"') " + - "set ob_tcp_invited_nodes='%', ob_compatibility_mode='" + compatMode + "', ob_kv_mode='" + kvMode + "';"); + statement.execute("create tenant " + tenantName + " replica_num = 1, resource_pool_list=('" + + poolName + "') " + + "set ob_tcp_invited_nodes='%', ob_compatibility_mode='" + compatMode + + "', ob_kv_mode='" + kvMode + "';"); } public void dropResourceUnit(String unitName) throws Exception { @@ -263,8 +263,7 @@ public void testTableTenant() throws Exception { client.addRowKeyElement(tableName, new String[] { "c1" }); client.insert(tableName).setRowKey(colVal("c1", "a")) - .addMutateColVal(colVal("c2", "a")) - .execute(); + .addMutateColVal(colVal("c2", "a")).execute(); } finally { dropTenant(tableTenant); dropResourcePool(tablePool); diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableP99Test.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableP99Test.java index 07f75d97..403c8eaf 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableP99Test.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableP99Test.java @@ -44,16 +44,16 @@ PRIMARY KEY (`c1`) ); **/ public class ObTableP99Test { - ObTableClient client; - private static Connection conn = null; - public static String tableName = "test_p99"; - public static String insertSqlType = "TABLEAPI INSERT"; - public static String selectSqlType = "TABLEAPI SELECT"; - public static String deleteSqlType = "TABLEAPI DELETE"; - public static String updateSqlType = "TABLEAPI UPDATE"; - public static String replaceSqlType = "TABLEAPI REPLACE"; - public static String queryAndMutateSqlType = "TABLEAPI QUERY AND MUTATE"; - public static String otherSqlType = "TABLEAPI OTHER"; + ObTableClient client; + private static Connection conn = null; + public static String tableName = "test_p99"; + public static String insertSqlType = "TABLEAPI INSERT"; + public static String selectSqlType = "TABLEAPI SELECT"; + public static String deleteSqlType = "TABLEAPI DELETE"; + public static String updateSqlType = "TABLEAPI UPDATE"; + public static String replaceSqlType = "TABLEAPI REPLACE"; + public static String queryAndMutateSqlType = "TABLEAPI QUERY AND MUTATE"; + public static String otherSqlType = "TABLEAPI OTHER"; @Before public void setup() throws Exception { @@ -65,8 +65,9 @@ public void setup() throws Exception { } private static long getResultCount(String sqlType) throws Exception { - PreparedStatement ps = conn.prepareStatement("select * from oceanbase.gv$ob_query_response_time_histogram " + - "where sql_type=" + "\"" +sqlType +"\""); + PreparedStatement ps = conn + .prepareStatement("select * from oceanbase.gv$ob_query_response_time_histogram " + + "where sql_type=" + "\"" + sqlType + "\""); ResultSet rs = ps.executeQuery(); long totalCnt = 0L; while (rs.next()) { @@ -85,25 +86,21 @@ private static void flushHistogram() throws Exception { public void testInsert() throws Exception { try { // single insert - client.insert(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", 1L)) - .execute(); + client.insert(tableName).setRowKey(colVal("c1", 1L)).addMutateColVal(colVal("c2", 1L)) + .execute(); assertEquals(1, getResultCount(insertSqlType)); - client.insert(tableName).setRowKey(colVal("c1", 2L)) - .addMutateColVal(colVal("c2", 1L)) - .execute(); + client.insert(tableName).setRowKey(colVal("c1", 2L)).addMutateColVal(colVal("c2", 1L)) + .execute(); assertEquals(2, getResultCount(insertSqlType)); // single insertOrUpdate flushHistogram(); Thread.sleep(100); client.insertOrUpdate(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", 1L)) - .execute(); + .addMutateColVal(colVal("c2", 1L)).execute(); assertEquals(1, getResultCount(insertSqlType)); client.insertOrUpdate(tableName).setRowKey(colVal("c1", 2L)) - .addMutateColVal(colVal("c2", 1L)) - .execute(); + .addMutateColVal(colVal("c2", 1L)).execute(); assertEquals(2, getResultCount(insertSqlType)); // multi insert @@ -111,7 +108,7 @@ public void testInsert() throws Exception { Thread.sleep(100); BatchOperation batch1 = client.batchOperation(tableName); Insert ins_0 = client.insert(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", 1L)); + .addMutateColVal(colVal("c2", 1L)); batch1.addOperation(ins_0).execute(); assertEquals(1, getResultCount(insertSqlType)); batch1.addOperation(ins_0).execute(); @@ -122,7 +119,7 @@ public void testInsert() throws Exception { Thread.sleep(100); BatchOperation batch2 = client.batchOperation(tableName); InsertOrUpdate insUp_0 = client.insertOrUpdate(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", 1L)); + .addMutateColVal(colVal("c2", 1L)); batch2.addOperation(insUp_0).execute(); assertEquals(1, getResultCount(insertSqlType)); batch2.addOperation(insUp_0).execute(); @@ -138,24 +135,23 @@ public void testInsert() throws Exception { @Test public void testSelect() throws Exception { try { - client.insert(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", 1L)) - .execute(); + client.insert(tableName).setRowKey(colVal("c1", 1L)).addMutateColVal(colVal("c2", 1L)) + .execute(); assertEquals(1, getResultCount(insertSqlType)); // single get flushHistogram(); Thread.sleep(100); - client.get(tableName, 1L, new String[]{"c1"}); + client.get(tableName, 1L, new String[] { "c1" }); assertEquals(1, getResultCount(selectSqlType)); - client.get(tableName, 1L, new String[]{"c1"}); + client.get(tableName, 1L, new String[] { "c1" }); assertEquals(2, getResultCount(selectSqlType)); // multi get flushHistogram(); Thread.sleep(100); TableBatchOps batchOps = client.batch(tableName); - batchOps.get(1L, new String[]{"c1"}); + batchOps.get(1L, new String[] { "c1" }); batchOps.execute(); assertEquals(1, getResultCount(selectSqlType)); batchOps.execute(); @@ -165,7 +161,7 @@ public void testSelect() throws Exception { flushHistogram(); Thread.sleep(100); TableQuery tableQuery = client.query(tableName); - tableQuery.addScanRange(new Object[] { 0L,}, new Object[] { 1L,}); + tableQuery.addScanRange(new Object[] { 0L, }, new Object[] { 1L, }); tableQuery.select("c1"); tableQuery.execute(); assertEquals(1, getResultCount(selectSqlType)); @@ -188,12 +184,10 @@ public void testSelect() throws Exception { @Test public void testDelete() throws Exception { try { - client.insert(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", 1L)) - .execute(); - client.insert(tableName).setRowKey(colVal("c1", 2L)) - .addMutateColVal(colVal("c2", 1L)) - .execute(); + client.insert(tableName).setRowKey(colVal("c1", 1L)).addMutateColVal(colVal("c2", 1L)) + .execute(); + client.insert(tableName).setRowKey(colVal("c1", 2L)).addMutateColVal(colVal("c2", 1L)) + .execute(); assertEquals(2, getResultCount(insertSqlType)); // single delete @@ -223,48 +217,40 @@ public void testDelete() throws Exception { @Test public void testUpdate() throws Exception { try { - client.insert(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", 1L)) - .execute(); - client.insert(tableName).setRowKey(colVal("c1", 2L)) - .addMutateColVal(colVal("c2", 1L)) - .execute(); + client.insert(tableName).setRowKey(colVal("c1", 1L)).addMutateColVal(colVal("c2", 1L)) + .execute(); + client.insert(tableName).setRowKey(colVal("c1", 2L)).addMutateColVal(colVal("c2", 1L)) + .execute(); assertEquals(2, getResultCount(insertSqlType)); // single update flushHistogram(); Thread.sleep(100); - client.update(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", 2L)) - .execute(); + client.update(tableName).setRowKey(colVal("c1", 1L)).addMutateColVal(colVal("c2", 2L)) + .execute(); assertEquals(1, getResultCount(updateSqlType)); - client.update(tableName).setRowKey(colVal("c1", 2L)) - .addMutateColVal(colVal("c2", 2L)) - .execute(); + client.update(tableName).setRowKey(colVal("c1", 2L)).addMutateColVal(colVal("c2", 2L)) + .execute(); assertEquals(2, getResultCount(updateSqlType)); // single increment flushHistogram(); Thread.sleep(100); client.increment(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", 2L)) - .execute(); + .addMutateColVal(colVal("c2", 2L)).execute(); assertEquals(1, getResultCount(updateSqlType)); client.increment(tableName).setRowKey(colVal("c1", 2L)) - .addMutateColVal(colVal("c2", 2L)) - .execute(); + .addMutateColVal(colVal("c2", 2L)).execute(); assertEquals(2, getResultCount(updateSqlType)); // single append flushHistogram(); Thread.sleep(100); client.append(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c3", "world")) - .execute(); + .addMutateColVal(colVal("c3", "world")).execute(); assertEquals(1, getResultCount(updateSqlType)); client.append(tableName).setRowKey(colVal("c1", 2L)) - .addMutateColVal(colVal("c3", "world")) - .execute(); + .addMutateColVal(colVal("c3", "world")).execute(); assertEquals(2, getResultCount(updateSqlType)); // multi update @@ -272,7 +258,7 @@ public void testUpdate() throws Exception { Thread.sleep(100); BatchOperation batch1 = client.batchOperation(tableName); Update ins_0 = client.update(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", 1L)); + .addMutateColVal(colVal("c2", 1L)); batch1.addOperation(ins_0).execute(); assertEquals(1, getResultCount(updateSqlType)); batch1.addOperation(ins_0).execute(); @@ -283,7 +269,7 @@ public void testUpdate() throws Exception { Thread.sleep(100); BatchOperation batch2 = client.batchOperation(tableName); Increment inc_0 = client.increment(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", 1L)); + .addMutateColVal(colVal("c2", 1L)); batch2.addOperation(inc_0).execute(); assertEquals(1, getResultCount(updateSqlType)); batch2.addOperation(inc_0).execute(); @@ -294,7 +280,7 @@ public void testUpdate() throws Exception { Thread.sleep(100); BatchOperation batch3 = client.batchOperation(tableName); Append app_0 = client.append(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c3", "llo")); + .addMutateColVal(colVal("c3", "llo")); batch3.addOperation(app_0).execute(); assertEquals(1, getResultCount(updateSqlType)); batch3.addOperation(app_0).execute(); @@ -309,22 +295,18 @@ public void testUpdate() throws Exception { @Test public void testReplace() throws Exception { try { - client.insert(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", 1L)) - .execute(); - client.insert(tableName).setRowKey(colVal("c1", 2L)) - .addMutateColVal(colVal("c2", 1L)) - .execute(); + client.insert(tableName).setRowKey(colVal("c1", 1L)).addMutateColVal(colVal("c2", 1L)) + .execute(); + client.insert(tableName).setRowKey(colVal("c1", 2L)).addMutateColVal(colVal("c2", 1L)) + .execute(); assertEquals(2, getResultCount(insertSqlType)); flushHistogram(); Thread.sleep(100); - client.replace(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", 2L)) - .execute(); + client.replace(tableName).setRowKey(colVal("c1", 1L)).addMutateColVal(colVal("c2", 2L)) + .execute(); assertEquals(1, getResultCount(replaceSqlType)); - client.replace(tableName).setRowKey(colVal("c1", 2L)) - .addMutateColVal(colVal("c2", 2L)) - .execute(); + client.replace(tableName).setRowKey(colVal("c1", 2L)).addMutateColVal(colVal("c2", 2L)) + .execute(); assertEquals(2, getResultCount(replaceSqlType)); // multi replace @@ -332,7 +314,7 @@ public void testReplace() throws Exception { Thread.sleep(100); BatchOperation batch = client.batchOperation(tableName); Replace rep_0 = client.replace(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", 1L)); + .addMutateColVal(colVal("c2", 1L)); batch.addOperation(rep_0).execute(); assertEquals(1, getResultCount(replaceSqlType)); batch.addOperation(rep_0).execute(); @@ -350,10 +332,10 @@ public void testQueryAndMutate() throws Exception { flushHistogram(); Thread.sleep(100); TableQuery tableQuery = client.query(tableName); - tableQuery.addScanRange(new Object[] { 0L,}, new Object[] { 1L,}); + tableQuery.addScanRange(new Object[] { 0L, }, new Object[] { 1L, }); tableQuery.select("c1"); ObTableQueryAndMutateRequest request_0 = client.obTableQueryAndAppend(tableQuery, - new String[] { "c3" }, new Object[] {"_append0" }, true); + new String[] { "c3" }, new Object[] { "_append0" }, true); client.execute(request_0); assertEquals(1, getResultCount(queryAndMutateSqlType)); } finally { @@ -369,9 +351,9 @@ public void testOther() throws Exception { Thread.sleep(100); BatchOperation batch = client.batchOperation(tableName); Insert ins_0 = client.insert(tableName).setRowKey(colVal("c1", 1L)) - .addMutateColVal(colVal("c2", 1L)); + .addMutateColVal(colVal("c2", 1L)); Replace rep_0 = client.replace(tableName).setRowKey(colVal("c1", 2L)) - .addMutateColVal(colVal("c2", 1L)); + .addMutateColVal(colVal("c2", 1L)); batch.addOperation(ins_0, rep_0).execute(); assertEquals(1, getResultCount(otherSqlType)); batch.addOperation(ins_0, rep_0).execute(); diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableTTLTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableTTLTest.java index 86c865cc..2c073f93 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableTTLTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableTTLTest.java @@ -406,7 +406,7 @@ PRIMARY KEY (`adiu`, `pk`), // local index: query with select all columns res = client .query(TABLE_NAME) - .setScanRangeColumns(new String[] {"adiu", "pk", "name"}) + .setScanRangeColumns(new String[] { "adiu", "pk", "name" }) .addScanRange(new Object[] { "adiu_0", "pk_0", "0_name" }, new Object[] { "adiu_9", "pk_9", "9_name" }).indexName("idx_adiu").execute(); count = 0; @@ -419,7 +419,7 @@ PRIMARY KEY (`adiu`, `pk`), // local index: query with select partial columns, without ttl columns res = client .query(TABLE_NAME) - .setScanRangeColumns(new String[] {"adiu", "pk", "name"}) + .setScanRangeColumns(new String[] { "adiu", "pk", "name" }) .addScanRange(new Object[] { "adiu_0", "pk_0", "0_name" }, new Object[] { "adiu_9", "pk_9", "9_name" }).indexName("idx_adiu") .select("adiu", "pk").execute(); @@ -433,8 +433,8 @@ PRIMARY KEY (`adiu`, `pk`), // local index: query with select partial columns, without ttl columns res = client .query(TABLE_NAME) - .setScanRangeColumns("adiu", "pk", "name") - .addScanRange(new Object[] { "adiu_0", "pk_0", "0_name" }, + .setScanRangeColumns("adiu", "pk", "name") + .addScanRange(new Object[] { "adiu_0", "pk_0", "0_name" }, new Object[] { "adiu_9", "pk_9", "9_name" }).indexName("idx_adiu") .select("adiu", "gmt_create").execute(); count = 0; From efbe565c4d5deaf59d16a150f35a8ff05f21f78b Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Wed, 18 Sep 2024 17:26:05 +0800 Subject: [PATCH 4/8] remove useless dependency --- .../payload/impl/execute/ObTableBatchOperationRequest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperationRequest.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperationRequest.java index 1cfde2be..25eeca53 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperationRequest.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperationRequest.java @@ -22,8 +22,6 @@ import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; -import java.util.concurrent.ExecutorService; - /* * OB_SERIALIZE_MEMBER(ObTableBatchOperationRequest, From dd606ab0e8b67374c1e0e45e5eab58ace03c85ac Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Fri, 20 Sep 2024 10:08:42 +0800 Subject: [PATCH 5/8] revert format of ResultCodes --- .../com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java index fbb79d79..e7fa9555 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java @@ -358,7 +358,8 @@ public enum ResultCodes { OB_CLUSTER_NO_MATCH(-4666), // OB_CHECK_ZONE_MERGE_ORDER(-4667), // OB_ERR_ZONE_NOT_EMPTY(-4668), // - OB_USE_DUP_FOLLOW_AFTER_DML(-4686), OB_LS_NOT_EXIST(-4719), // + OB_USE_DUP_FOLLOW_AFTER_DML(-4686), + OB_LS_NOT_EXIST(-4719), // OB_TABLET_NOT_EXIST(-4725), // OB_ERR_PARSER_INIT(-5000), // OB_ERR_PARSE_SQL(-5001), // From bdaa09d17c4c506ab55a74a81c027d9af76c2a1e Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Mon, 23 Sep 2024 16:39:02 +0800 Subject: [PATCH 6/8] use default database when users do not set database --- src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index e83f919c..db4be27d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -3257,6 +3257,10 @@ public void setParamURL(String paramURL) throws IllegalArgumentException { } } } + // set db as "default" if the user does not assign any database + if (db == null) { + db = "default"; + } if (StringUtils.isBlank(db)) { throw new IllegalArgumentException(String.format( From cf4c9729512e289e5c704968d2532516600b313d Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 24 Sep 2024 17:00:37 +0800 Subject: [PATCH 7/8] correct the exception in HBase mode using asyncQueryRequest --- .../rpc/bolt/transport/ObTableRemoting.java | 5 +-- .../query/AbstractQueryStreamResult.java | 31 ++++++++++++++----- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java index e3d26f0d..8a6914a9 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java @@ -122,7 +122,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques ObRpcResultCode resultCode = new ObRpcResultCode(); resultCode.decode(buf); // If response indicates the request is routed to wrong server, we should refresh the routing meta. - if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) { + if (!conn.getObTable().getReRouting() && response.getHeader().isRoutingWrong()) { String errMessage = TraceUtil.formatTraceMessage(conn, request, "routed to the wrong server: " + response.getMessage()); logger.warn(errMessage); @@ -139,7 +139,8 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques throw new ObTableNeedFetchAllException(errMessage); } } - if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) { + if (resultCode.getRcode() != 0 + && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) { String errMessage = TraceUtil.formatTraceMessage(conn, request, "routed to the wrong server: " + response.getMessage()); logger.warn(errMessage); diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java index 7cd75788..e8efd240 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java @@ -32,6 +32,7 @@ import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableStreamRequest; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.QueryStreamResult; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncRequest; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncResult; import com.alipay.oceanbase.rpc.table.ObTable; import com.alipay.oceanbase.rpc.table.ObTableParam; @@ -156,8 +157,12 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger, result = subObTable.execute(request); if (result instanceof ObTableApiMove) { ObTableApiMove move = (ObTableApiMove) result; - logger.warn("The server has not yet completed the master switch, and returned an incorrect leader with an IP address of {}. " + - "Rerouting return IP is {}", moveResponse.getReplica().getServer().ipToString(), move .getReplica().getServer().ipToString()); + logger + .warn( + "The server has not yet completed the master switch, and returned an incorrect leader with an IP address of {}. " + + "Rerouting return IP is {}", moveResponse + .getReplica().getServer().ipToString(), move.getReplica() + .getServer().ipToString()); throw new ObTableRoutingWrongException(); } } @@ -222,12 +227,22 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger, throw e; } } else if (e instanceof ObTableException) { - if ((((ObTableException) e).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode || ((ObTableException) e) - .getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode) - && ((ObTableQueryRequest) request).getTableQuery().isHbaseQuery() - && client.getTableGroupInverted().get(indexTableName) != null) { - // table not exists && hbase mode && table group exists , three condition both - client.eraseTableGroupFromCache(tableName); + if (((ObTableException) e).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode + || ((ObTableException) e).getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode) { + if (request instanceof ObTableQueryRequest) { + if (((ObTableQueryRequest) request).getTableQuery().isHbaseQuery() + && client.getTableGroupInverted().get(indexTableName) != null) { + // table not exists && hbase mode && table group exists , three condition both + client.eraseTableGroupFromCache(tableName); + } + } else if (request instanceof ObTableQueryAsyncRequest) { + if (((ObTableQueryAsyncRequest) request).getObTableQueryRequest() + .getTableQuery().isHbaseQuery() + && client.getTableGroupInverted().get(indexTableName) != null) { + // table not exists && hbase mode && table group exists , three condition both + client.eraseTableGroupFromCache(tableName); + } + } } if (((ObTableException) e).isNeedRefreshTableEntry()) { needRefreshTableEntry = true; From 4663f65519536aa9f77bc6e1eb2c544bcc4f7b72 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Sun, 13 Oct 2024 23:44:33 +0800 Subject: [PATCH 8/8] revert the change in ObTableClient, move the specific change to HBase client --- src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index db4be27d..e83f919c 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -3257,10 +3257,6 @@ public void setParamURL(String paramURL) throws IllegalArgumentException { } } } - // set db as "default" if the user does not assign any database - if (db == null) { - db = "default"; - } if (StringUtils.isBlank(db)) { throw new IllegalArgumentException(String.format(