From 845d60938abe0dad83ee3d5c572816be355d8995 Mon Sep 17 00:00:00 2001 From: WeiXinChan Date: Thu, 28 Dec 2023 21:25:12 +0800 Subject: [PATCH 01/10] batch put with same property name --- .../oceanbase/rpc/ObClusterTableBatchOps.java | 15 ++ .../alipay/oceanbase/rpc/ObTableClient.java | 41 +++++ .../rpc/mutation/BatchOperation.java | 12 ++ .../alipay/oceanbase/rpc/mutation/Put.java | 147 +++++++++++++++++- .../impl/execute/AbstractObTableEntity.java | 11 +- .../payload/impl/execute/ObITableEntity.java | 3 + .../impl/execute/ObTableBatchOperation.java | 36 +++-- .../payload/impl/execute/ObTableEntity.java | 10 ++ .../impl/execute/ObTableOperationType.java | 5 +- .../rpc/table/AbstractTableBatchOps.java | 10 ++ .../rpc/table/ObTableBatchOpsImpl.java | 8 + .../rpc/table/ObTableClientBatchOpsImpl.java | 21 ++- .../rpc/table/api/TableBatchOps.java | 6 + src/test/java/ci.sql | 8 + .../oceanbase/rpc/ObTableBatchPutTest.java | 89 +++++++++++ 15 files changed, 393 insertions(+), 29 deletions(-) create mode 100644 src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java b/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java index 665b462f..d91e8d64 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java @@ -72,6 +72,11 @@ public void insert(Object[] rowkeys, String[] columns, Object[] values) { tableBatchOps.insert(rowkeys, columns, values); } + @Override + public void put(Object[] rowkeys, String[] columns, Object[] values) { + tableBatchOps.put(rowkeys, columns, values); + } + /* * Replace. */ @@ -167,4 +172,14 @@ public void setAtomicOperation(boolean atomicOperation) { super.setAtomicOperation(atomicOperation); tableBatchOps.setAtomicOperation(atomicOperation); } + + public boolean isSamePropertiesNames() { + return super.isSamePropertiesNames(); + } + + public void setSamePropertiesNames(boolean samePropertiesNames) { + super.setSamePropertiesNames(samePropertiesNames); + tableBatchOps.setSamePropertiesNames(samePropertiesNames); + } + } diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index fed663ff..a58f40f9 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -1784,6 +1784,47 @@ public ObPayload execute(ObPair obPair) throws Exception { }); } + /** + * put with result + * @param tableName which table to put + * @param rowKey insert row key + * @param keyRanges scan range + * @param columns columns name to put + * @param values new values + * @return execute result + * @throws Exception exception + */ + public ObPayload putWithResult(final String tableName, final Object[] rowKey, + final List keyRanges, final String[] columns, + final Object[] values) throws Exception { + final long start = System.currentTimeMillis(); + return executeMutation(tableName, + new MutationExecuteCallback(rowKey, keyRanges) { + /** + * Execute. + */ + @Override + public ObPayload execute(ObPair obPair) throws Exception { + long TableTime = System.currentTimeMillis(); + ObTableParam tableParam = obPair.getRight(); + ObTable obTable = tableParam.getObTable(); + ObTableOperationRequest request = ObTableOperationRequest.getInstance( + tableName, PUT, rowKey, columns, values, + obTable.getObTableOperationTimeout()); + request.setTableId(tableParam.getTableId()); + // partId/tabletId + request.setPartitionId(tableParam.getPartitionId()); + ObPayload result = obTable.execute(request); + String endpoint = obTable.getIp() + ":" + obTable.getPort(); + MonitorUtil.info(request, database, tableName, "PUT", endpoint, rowKey, + (ObTableOperationResult) result, TableTime - start, + System.currentTimeMillis() - TableTime, getslowQueryMonitorThreshold()); + checkResult(obTable.getIp(), obTable.getPort(), request, result); + return result; + } + }); + } + /** * Replace. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java b/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java index 8b1e34a3..b35d1270 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java +++ b/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java @@ -35,6 +35,12 @@ public class BatchOperation { boolean withResult; private List operations; boolean isAtomic = false; + boolean isSamePropertiesNames = false; + + public BatchOperation setSamePropertiesNames(boolean samePropertiesNames) { + isSamePropertiesNames = samePropertiesNames; + return this; + } /* * default constructor @@ -126,6 +132,11 @@ public BatchOperationResult execute() throws Exception { batchOps.insert(mutation.getRowKey(), ((Insert) mutation).getColumns(), ((Insert) mutation).getValues()); break; + case PUT: + ((Put) mutation).removeRowkeyFromMutateColval(); + batchOps.put(mutation.getRowKey(), ((Put) mutation).getColumns(), + ((Put) mutation).getValues()); + break; case DEL: batchOps.delete(mutation.getRowKey()); break; @@ -168,6 +179,7 @@ public BatchOperationResult execute() throws Exception { } } batchOps.setAtomicOperation(isAtomic); + batchOps.setSamePropertiesNames(isSamePropertiesNames); return new BatchOperationResult(batchOps.executeWithResult()); } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java b/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java index a4ddd447..79cf56c5 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java +++ b/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java @@ -17,20 +17,30 @@ package com.alipay.oceanbase.rpc.mutation; +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.exception.ObTableException; +import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException; +import com.alipay.oceanbase.rpc.filter.ObTableFilter; +import com.alipay.oceanbase.rpc.mutation.result.MutationResult; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType; import com.alipay.oceanbase.rpc.table.api.Table; -/* - * Put impl. - * Need fill all columns when use put impl. - * Server will do override when use put impl. - */ -public class Put extends InsertOrUpdate { +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class Put extends Mutation { + private List columns = null; + private List values = null; + /* * default constructor */ public Put() { super(); - super.usePut(); + columns = new ArrayList(); + values = new ArrayList(); } /* @@ -38,6 +48,127 @@ public Put() { */ public Put(Table client, String tableName) { super(client, tableName); - super.usePut(); + columns = new ArrayList(); + values = new ArrayList(); + } + + /* + * set the Row Key of mutation with Row and keep scan range + */ + @Override + public Put setRowKey(Row rowKey) { + return setRowKeyOnly(rowKey); + } + + /* + * set the Row Key of mutation with ColumnValues and keep scan range + */ + @Override + public Put setRowKey(ColumnValue... rowKey) { + return setRowKeyOnly(rowKey); + } + + /* + * add filter into mutation (use QueryAndMutate) and scan range + */ + public Put setFilter(ObTableFilter filter) throws Exception { + return setFilterOnly(filter); + } + + /* + * get operation type + */ + public ObTableOperationType getOperationType() { + return ObTableOperationType.PUT; + } + + /* + * add mutated Row + */ + public Put addMutateRow(Row rows) { + if (null == rows) { + throw new IllegalArgumentException("Invalid null rowKey set into Put"); + } else if (0 == rows.getMap().size()) { + throw new IllegalArgumentException("input row key should not be empty"); + } + + // set mutate row into Put + for (Map.Entry entry : rows.getMap().entrySet()) { + columns.add(entry.getKey()); + values.add(entry.getValue()); + } + + return this; + } + + /* + * get the mutated columns' name + */ + public String[] getColumns() { + return columns.toArray(new String[0]); + } + + /* + * get the mutated columns' value + */ + public Object[] getValues() { + return values.toArray(); + } + + /* + * add mutated ColumnValues + */ + public Put addMutateColVal(ColumnValue... columnValues) throws Exception { + if (null == columnValues) { + throw new IllegalArgumentException("Invalid null columnValues set into Put"); + } + + // set mutate row into Put + for (ColumnValue columnValue : columnValues) { + if (columns.contains(columnValue.getColumnName())) { + throw new ObTableException("Duplicate column in Row Key"); + } + columns.add(columnValue.getColumnName()); + values.add(columnValue.getValue()); + } + + return this; + } + + /* + * Remove rowkey from mutateColval + */ + public Put removeRowkeyFromMutateColval() { + removeRowkeyFromMutateColval(this.columns, this.values, this.rowKeyNames); + return this; + } + + /* + * execute + */ + public MutationResult execute() throws Exception { + if (null == getTableName()) { + throw new ObTableException("table name is null"); + } else if (null == getClient()) { + throw new ObTableException("client is null"); + } + removeRowkeyFromMutateColval(this.columns, this.values, this.rowKeyNames); + if (null == getQuery()) { + // simple Put, without filter + return new MutationResult(((ObTableClient) getClient()).putWithResult( + getTableName(), getRowKey(), getKeyRanges(), columns.toArray(new String[0]), + values.toArray())); + } else { + if (checkMutationWithFilter()) { + // QueryAndPut + ObTableOperation operation = ObTableOperation.getInstance( + ObTableOperationType.PUT, getRowKey(), columns.toArray(new String[0]), + values.toArray()); + return new MutationResult(((ObTableClient) getClient()).mutationWithFilter( + getQuery(), getRowKey(), getKeyRanges(), operation, true)); + } else { + throw new ObTableUnexpectedException("should set filter and scan range both"); + } + } } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/AbstractObTableEntity.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/AbstractObTableEntity.java index 654d7ae6..61f0fb3c 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/AbstractObTableEntity.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/AbstractObTableEntity.java @@ -172,8 +172,11 @@ public byte[] encode() { String name = entry.getKey(); ObObj property = entry.getValue(); - int keyLen = Serialization.getNeedBytes(name); - System.arraycopy(Serialization.encodeVString(name), 0, bytes, idx, keyLen); + int keyLen = 0; + if (!isOnlyEncodeValue()) { + keyLen = Serialization.getNeedBytes(name); + System.arraycopy(Serialization.encodeVString(name), 0, bytes, idx, keyLen); + } idx += keyLen; byte[] objBytes = property.encode(); System.arraycopy(objBytes, 0, bytes, idx, objBytes.length); @@ -229,7 +232,9 @@ public long getPayloadContentSize() { size += this.getRowKeyValue(i).getEncodedSize(); } for (Map.Entry entry : this.getProperties().entrySet()) { - size += Serialization.getNeedBytes(entry.getKey()); + if (!isOnlyEncodeValue()) { + size += Serialization.getNeedBytes(entry.getKey()); + } size += entry.getValue().getEncodedSize(); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObITableEntity.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObITableEntity.java index 7344a030..e2102401 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObITableEntity.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObITableEntity.java @@ -198,4 +198,7 @@ public interface ObITableEntity extends ObPayload { */ long getPropertiesCount(); + void setOnlyEncodeValue(boolean onlyEncodeValue); + + boolean isOnlyEncodeValue(); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperation.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperation.java index 495ffde2..ac096680 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperation.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperation.java @@ -33,10 +33,10 @@ */ public class ObTableBatchOperation extends AbstractPayload { + private boolean isSamePropertiesNames; private List tableOperations = new ArrayList(); private boolean isReadOnly = true; private boolean isSameType = true; - private boolean isSamePropertiesNames; /* * Encode. @@ -46,31 +46,38 @@ public byte[] encode() { byte[] bytes = new byte[(int) getPayloadSize()]; int idx = 0; + // 0. encode header int headerLen = (int) getObUniVersionHeaderLength(getVersion(), getPayloadContentSize()); System.arraycopy(encodeObUniVersionHeader(getVersion(), getPayloadContentSize()), 0, bytes, idx, headerLen); idx += headerLen; - // 1. encode Operation + // 1. encode isSamePropertiesNames + System.arraycopy(Serialization.encodeI8(isSamePropertiesNames ? (byte) 1 : (byte) 0), 0, + bytes, idx, 1); + idx++; + + // 2. encode Operation int len = Serialization.getNeedBytes(tableOperations.size()); System.arraycopy(Serialization.encodeVi64(tableOperations.size()), 0, bytes, idx, len); idx += len; - for (ObTableOperation tableOperation : tableOperations) { - len = (int) tableOperation.getPayloadSize(); - System.arraycopy(tableOperation.encode(), 0, bytes, idx, len); + for (int i = 0; i < tableOperations.size(); i++) { + ObTableOperation operation = tableOperations.get(i); + if (i != 0 && isSamePropertiesNames) { + operation.getEntity().setOnlyEncodeValue(true); + } + len = (int) operation.getPayloadSize(); + System.arraycopy(operation.encode(), 0, bytes, idx, len); idx += len; } - // 2. encode others + // 3. encode others System .arraycopy(Serialization.encodeI8(isReadOnly ? (byte) 1 : (byte) 0), 0, bytes, idx, 1); idx++; System .arraycopy(Serialization.encodeI8(isSameType ? (byte) 1 : (byte) 0), 0, bytes, idx, 1); - idx++; - System.arraycopy(Serialization.encodeI8(isSamePropertiesNames ? (byte) 1 : (byte) 0), 0, - bytes, idx, 1); return bytes; } @@ -106,12 +113,19 @@ public Object decode(ByteBuf buf) { @Override public long getPayloadContentSize() { long payloadContentSize = 0; + + payloadContentSize+=1; // isSamePropertiesNames + payloadContentSize += Serialization.getNeedBytes(tableOperations.size()); - for (ObTableOperation operation : tableOperations) { + for (int i = 0; i < tableOperations.size(); i++) { + ObTableOperation operation = tableOperations.get(i); + if (i != 0 && isSamePropertiesNames) { + operation.getEntity().setOnlyEncodeValue(true); + } payloadContentSize += operation.getPayloadSize(); } - return payloadContentSize + 3; + return payloadContentSize + 2; } /* diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableEntity.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableEntity.java index 47339289..92b9866e 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableEntity.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableEntity.java @@ -28,6 +28,16 @@ public class ObTableEntity extends AbstractObTableEntity { private ObRowKey rowKey = new ObRowKey(); private Map properties = new HashMap(); + private boolean onlyEncodeValue; + + public void setOnlyEncodeValue(boolean onlyEncodeValue) { + this.onlyEncodeValue = onlyEncodeValue; + } + + public boolean isOnlyEncodeValue() { + return onlyEncodeValue; + } + /* * Set row key value. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOperationType.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOperationType.java index 0ab0a969..b0c6f71c 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOperationType.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOperationType.java @@ -26,7 +26,10 @@ public enum ObTableOperationType { INSERT_OR_UPDATE(4), // INSERT or UPDATE, columns not in arguments will remain unchanged REPLACE(5), // DELETE & INSERT, columns not in arguments will change to default value INCREMENT(6), // the column must be can be cast to long. if exist increase, else insert - APPEND(7);// append column value + APPEND(7),// append column value + SCAN(8),// scan value + TTL(9),// expire value + PUT(10);// put value private int value; private static Map map = new HashMap(); diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/AbstractTableBatchOps.java b/src/main/java/com/alipay/oceanbase/rpc/table/AbstractTableBatchOps.java index bf9bfaca..2896e5ad 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/AbstractTableBatchOps.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/AbstractTableBatchOps.java @@ -26,8 +26,18 @@ public abstract class AbstractTableBatchOps implements TableBatchOps { protected boolean atomicOperation; + protected boolean isSamePropertiesNames; + protected ObTableEntityType entityType = ObTableEntityType.DYNAMIC; + public boolean isSamePropertiesNames() { + return isSamePropertiesNames; + } + + public void setSamePropertiesNames(boolean samePropertiesNames) { + isSamePropertiesNames = samePropertiesNames; + } + /** * Get. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableBatchOpsImpl.java index d623752c..8b4f52f1 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableBatchOpsImpl.java @@ -82,6 +82,14 @@ public void insert(Object[] rowkeys, String[] columns, Object[] values) { addObTableOperation(ObTableOperationType.INSERT, rowkeys, columns, values); } + /* + * Put. + */ + @Override + public void put(Object[] rowkeys, String[] columns, Object[] values) { + addObTableOperation(ObTableOperationType.PUT, rowkeys, columns, values); + } + /* * Replace. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java index b8792f65..15cb04f0 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java @@ -108,6 +108,14 @@ public void insert(Object[] rowkeys, String[] columns, Object[] values) { addObTableClientOperation(ObTableOperationType.INSERT, rowkeys, columns, values); } + /* + * Put. + */ + @Override + public void put(Object[] rowkeys, String[] columns, Object[] values) { + addObTableClientOperation(ObTableOperationType.PUT, rowkeys, columns, values); + } + /* * Replace. */ @@ -193,6 +201,7 @@ public List executeWithResult() throws Exception { case REPLACE: case INCREMENT: case APPEND: + case PUT: results.add(new MutationResult(result)); break; default: @@ -247,12 +256,12 @@ public Map>>> obTableOperations.getRight().add(new ObPair(i, operation)); } - if (atomicOperation) { - if (partitionOperationsMap.size() > 1) { - throw new ObTablePartitionConsistentException( - "require atomic operation but found across partition may cause consistent problem "); - } - } +// if (atomicOperation) { +// if (partitionOperationsMap.size() > 1) { +// throw new ObTablePartitionConsistentException( +// "require atomic operation but found across partition may cause consistent problem "); +// } +// } return partitionOperationsMap; } diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/api/TableBatchOps.java b/src/main/java/com/alipay/oceanbase/rpc/table/api/TableBatchOps.java index 0ab134cb..ff8ecd9f 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/api/TableBatchOps.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/api/TableBatchOps.java @@ -32,6 +32,10 @@ public interface TableBatchOps { boolean isAtomicOperation(); + boolean isSamePropertiesNames(); + + void setSamePropertiesNames(boolean samePropertiesNames); + void setEntityType(ObTableEntityType entityType); ObTableEntityType getEntityType(); @@ -52,6 +56,8 @@ public interface TableBatchOps { void insert(Object[] rowkeys, String[] columns, Object[] values); + void put(Object[] rowkeys, String[] columns, Object[] values); + void replace(Object rowkey, String[] columns, Object[] values); void replace(Object[] rowkeys, String[] columns, Object[] values); diff --git a/src/test/java/ci.sql b/src/test/java/ci.sql index 4448290f..5dea1896 100644 --- a/src/test/java/ci.sql +++ b/src/test/java/ci.sql @@ -309,4 +309,12 @@ CREATE TABLE IF NOT EXISTS `sync_item` ( DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_general_ci PARTITION BY KEY(`uid`) PARTITIONS 32; +CREATE TABLE IF NOT EXISTS `batch_put` ( + `id` varchar(20) NOT NULL, + `c1` bigint DEFAULT NULL, + `c2` bigint DEFAULT NULL, + `c3` varchar(32) DEFAULT NULL, + `c4` bigint DEFAULT NULL, + PRIMARY KEY(`id`)) PARTITION BY KEY(`id`) PARTITIONS 32; + alter system set kv_hotkey_throttle_threshold = 50; diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java new file mode 100644 index 00000000..91427696 --- /dev/null +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java @@ -0,0 +1,89 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2023 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc; + +import com.alipay.oceanbase.rpc.mutation.BatchOperation; +import com.alipay.oceanbase.rpc.mutation.Put; +import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult; +import com.alipay.oceanbase.rpc.util.ObTableClientTestUtil; +import org.junit.Before; +import org.junit.Test; + + +import static com.alipay.oceanbase.rpc.mutation.MutationFactory.*; +import static org.junit.Assert.assertTrue; + +/* +CREATE TABLE IF NOT EXISTS `batch_put` ( + `id` varchar(20) NOT NULL, + `c1` bigint DEFAULT NULL, + `c2` bigint DEFAULT NULL, + `c3` varchar(32) DEFAULT NULL, + `c4` bigint DEFAULT NULL, + PRIMARY KEY(`id`))PARTITION BY HASH(`id`) PARTITIONS 32; + */ +public class ObTableBatchPutTest { + ObTableClient client; + public static String tableName = "batch_put"; + public static String idColumnName = "hello world"; + public static String c1ColumnName = "c1"; + public static String c2ColumnName = "c2"; + public static String c3ColumnName = "c3"; + public static String c4ColumnName = "c4"; + + @Before + public void setup() throws Exception { + final ObTableClient obTableClient = ObTableClientTestUtil.newTestClient(); + obTableClient.init(); + this.client = obTableClient; +// client.addRowKeyElement(tableName, new String[] { idColumnName }); + } + + /* + CREATE TABLE IF NOT EXISTS `batch_put` ( + `id` varchar(20) NOT NULL, + `c1` bigint DEFAULT NULL, + `c2` bigint DEFAULT NULL, + `c3` varchar(32) DEFAULT NULL, + `c4` bigint DEFAULT NULL, + PRIMARY KEY(`id`))PARTITION BY HASH(`id`) PARTITIONS 32; + */ + @Test + public void test_batch_put() throws Exception { + try { + BatchOperation batchOperation = client.batchOperation(tableName).setIsAtomic(true).setSamePropertiesNames(true); + for (long i = 0; i < 100; i++) { + Put putOp = put().setRowKey(row(colVal("id", String.valueOf(i)))) + .addMutateColVal(colVal("c1", i)) + .addMutateColVal(colVal("c2", i)) + .addMutateColVal(colVal("c3", String.valueOf(i))) + .addMutateColVal(colVal("c4", i)); + batchOperation.addOperation(putOp); + } + BatchOperationResult result = batchOperation.setIsAtomic(true).execute(); + } catch (Exception e) { + e.printStackTrace(); + assertTrue(false); + } finally { + client.addRowKeyElement(tableName, new String[] { idColumnName }); + for (long i = 0; i < 100; i++) { + client.delete(tableName).addScanRange(String.valueOf(i), String.valueOf(i)).execute(); + } + } + } +} \ No newline at end of file From 5cd2a1929efc1f2d0a86d2f5dadd70d2a4ea39a6 Mon Sep 17 00:00:00 2001 From: WeiXinChan Date: Fri, 29 Dec 2023 10:50:31 +0800 Subject: [PATCH 02/10] batch with sames names --- .../oceanbase/rpc/table/ObTableClientBatchOpsImpl.java | 1 + .../com/alipay/oceanbase/rpc/ObTableBatchPutTest.java | 9 --------- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java index 15cb04f0..e9abe8bb 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java @@ -299,6 +299,7 @@ public void partitionExecute(ObTableOperationResult[] results, .toObTableConsistencyLevel()); } subRequest.setBatchOperationAsAtomic(isAtomicOperation()); + subRequest.getBatchOperation().setSamePropertiesNames(isSamePropertiesNames()); ObTableBatchOperationResult subObTableBatchOperationResult; boolean needRefreshTableEntry = false; diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java index 91427696..51d660d1 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java @@ -41,17 +41,12 @@ public class ObTableBatchPutTest { ObTableClient client; public static String tableName = "batch_put"; public static String idColumnName = "hello world"; - public static String c1ColumnName = "c1"; - public static String c2ColumnName = "c2"; - public static String c3ColumnName = "c3"; - public static String c4ColumnName = "c4"; @Before public void setup() throws Exception { final ObTableClient obTableClient = ObTableClientTestUtil.newTestClient(); obTableClient.init(); this.client = obTableClient; -// client.addRowKeyElement(tableName, new String[] { idColumnName }); } /* @@ -80,10 +75,6 @@ public void test_batch_put() throws Exception { e.printStackTrace(); assertTrue(false); } finally { - client.addRowKeyElement(tableName, new String[] { idColumnName }); - for (long i = 0; i < 100; i++) { - client.delete(tableName).addScanRange(String.valueOf(i), String.valueOf(i)).execute(); - } } } } \ No newline at end of file From d446d587d4da9f78b9028842b6dac29b9623bf3c Mon Sep 17 00:00:00 2001 From: WeiXinChan Date: Fri, 29 Dec 2023 15:35:38 +0800 Subject: [PATCH 03/10] add demo --- .../oceanbase/rpc/location/LocationUtil.java | 6 ++- src/test/java/ci.sql | 11 ++--- .../oceanbase/rpc/ObTableBatchPutTest.java | 42 ++++++++++--------- 3 files changed, 32 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java index 96dde160..156e4632 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java @@ -1292,8 +1292,10 @@ private static List> parseRangePart(ResultS List> bounds = new ArrayList>(); Map partNameIdMap = new HashMap(); Map partTabletIdMap = new HashMap(); - ObRangePartDesc subRangePartDesc = (ObRangePartDesc) tableEntry.getPartitionInfo() - .getSubPartDesc(); + ObRangePartDesc subRangePartDesc = null; + if (isSubPart) { + subRangePartDesc = (ObRangePartDesc) tableEntry.getPartitionInfo().getSubPartDesc(); + } long idx = 0L; while (rs.next()) { if (null != subRangePartDesc && !isSubPart && subRangePartDesc.getPartNum() == 0) { diff --git a/src/test/java/ci.sql b/src/test/java/ci.sql index 5dea1896..9080df98 100644 --- a/src/test/java/ci.sql +++ b/src/test/java/ci.sql @@ -311,10 +311,11 @@ CREATE TABLE IF NOT EXISTS `sync_item` ( CREATE TABLE IF NOT EXISTS `batch_put` ( `id` varchar(20) NOT NULL, - `c1` bigint DEFAULT NULL, - `c2` bigint DEFAULT NULL, - `c3` varchar(32) DEFAULT NULL, - `c4` bigint DEFAULT NULL, - PRIMARY KEY(`id`)) PARTITION BY KEY(`id`) PARTITIONS 32; + `c_1` varchar(32) NOT NULL, + `t_1` datetime(3) DEFAULT NULL, + `t_2` timestamp(3) DEFAULT NULL, + `t_3` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), + `b_1` bigint(20) DEFAULT NULL, + PRIMARY KEY(`id`, `c_1`)) partition by key(`id`) subpartition by key(`c_1`) subpartitions 4 partitions 97; alter system set kv_hotkey_throttle_threshold = 50; diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java index 51d660d1..eae8abf6 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java @@ -19,28 +19,22 @@ import com.alipay.oceanbase.rpc.mutation.BatchOperation; import com.alipay.oceanbase.rpc.mutation.Put; +import com.alipay.oceanbase.rpc.mutation.Row; import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult; import com.alipay.oceanbase.rpc.util.ObTableClientTestUtil; import org.junit.Before; import org.junit.Test; +import java.sql.Date; +import java.sql.Timestamp; + import static com.alipay.oceanbase.rpc.mutation.MutationFactory.*; import static org.junit.Assert.assertTrue; -/* -CREATE TABLE IF NOT EXISTS `batch_put` ( - `id` varchar(20) NOT NULL, - `c1` bigint DEFAULT NULL, - `c2` bigint DEFAULT NULL, - `c3` varchar(32) DEFAULT NULL, - `c4` bigint DEFAULT NULL, - PRIMARY KEY(`id`))PARTITION BY HASH(`id`) PARTITIONS 32; - */ public class ObTableBatchPutTest { ObTableClient client; public static String tableName = "batch_put"; - public static String idColumnName = "hello world"; @Before public void setup() throws Exception { @@ -52,22 +46,30 @@ public void setup() throws Exception { /* CREATE TABLE IF NOT EXISTS `batch_put` ( `id` varchar(20) NOT NULL, - `c1` bigint DEFAULT NULL, - `c2` bigint DEFAULT NULL, - `c3` varchar(32) DEFAULT NULL, - `c4` bigint DEFAULT NULL, - PRIMARY KEY(`id`))PARTITION BY HASH(`id`) PARTITIONS 32; + `c_1` varchar(32) NOT NULL, + `t_1` datetime(3) DEFAULT NULL, + `t_2` timestamp(3) DEFAULT NULL, + `t_3` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), + `b_1` bigint(20) DEFAULT NULL, + PRIMARY KEY(`id`, `c_1`)) partition by key(`id`) subpartition by key(`c_1`) subpartitions 4 partitions 97; */ @Test public void test_batch_put() throws Exception { try { + // 当设置setSamePropertiesNames(true)时,表中所有的列都必须填充值 BatchOperation batchOperation = client.batchOperation(tableName).setIsAtomic(true).setSamePropertiesNames(true); for (long i = 0; i < 100; i++) { - Put putOp = put().setRowKey(row(colVal("id", String.valueOf(i)))) - .addMutateColVal(colVal("c1", i)) - .addMutateColVal(colVal("c2", i)) - .addMutateColVal(colVal("c3", String.valueOf(i))) - .addMutateColVal(colVal("c4", i)); + // 清除毫秒部分,仅保留到秒 + long timeInSeconds = (System.currentTimeMillis() / 1000) * 1000; + Timestamp ts = new Timestamp(timeInSeconds); + java.util.Date date = new Date(timeInSeconds); + Row rowKey = new Row(colVal("id", String.valueOf(i)), // `id` varchar(20) + colVal("c_1", String.valueOf(i))); // `id` varchar(20) + Put putOp = put().setRowKey(rowKey) + .addMutateColVal(colVal("t_1", date)) // `t_1` datetime(3) + .addMutateColVal(colVal("t_2", ts)) // `t_2` timestamp(3) + .addMutateColVal(colVal("t_3", ts)) // `t_3` timestamp(3) + .addMutateColVal(colVal("b_1", i)); // `b_1` bigint(20) batchOperation.addOperation(putOp); } BatchOperationResult result = batchOperation.setIsAtomic(true).execute(); From 3ae566b2ac2bd85f47a1514bb88271004e42c6c9 Mon Sep 17 00:00:00 2001 From: WeiXinChan Date: Fri, 29 Dec 2023 19:23:13 +0800 Subject: [PATCH 04/10] support sub partition of range in datetime column type --- .../oceanbase/rpc/location/LocationUtil.java | 10 +++--- .../model/partition/ObListPartDesc.java | 6 ++++ .../location/model/partition/ObPartDesc.java | 2 ++ .../rpc/protocol/payload/impl/ObObjType.java | 32 ++++++++++++++++--- .../oceanbase/rpc/ObTableBatchPutTest.java | 17 +++++----- 5 files changed, 49 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java index 156e4632..daa3d77c 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java @@ -1292,17 +1292,15 @@ private static List> parseRangePart(ResultS List> bounds = new ArrayList>(); Map partNameIdMap = new HashMap(); Map partTabletIdMap = new HashMap(); - ObRangePartDesc subRangePartDesc = null; - if (isSubPart) { - subRangePartDesc = (ObRangePartDesc) tableEntry.getPartitionInfo().getSubPartDesc(); - } + ObPartDesc subPartDesc = tableEntry.getPartitionInfo().getSubPartDesc(); + long idx = 0L; while (rs.next()) { - if (null != subRangePartDesc && !isSubPart && subRangePartDesc.getPartNum() == 0) { + if (null != subPartDesc && !isSubPart && subPartDesc.getPartNum() == 0) { // client only support template partition table // so the sub_part_num is a constant and will store in subPartDesc which is different from proxy long subPartNum = rs.getLong("sub_part_num"); - subRangePartDesc.setPartNum((int) subPartNum); + subPartDesc.setPartNum((int) subPartNum); } String highBoundVal = rs.getString("high_bound_val"); diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObListPartDesc.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObListPartDesc.java index f9197670..65e3fb94 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObListPartDesc.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObListPartDesc.java @@ -99,6 +99,12 @@ public void prepare() throws IllegalArgumentException { super.prepare(); } + /* + * Set part num. + */ + public void setPartNum(int partNum) { + } + /* * Get part ids. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartDesc.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartDesc.java index 60ac37b9..f60eddd0 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartDesc.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartDesc.java @@ -242,4 +242,6 @@ public abstract Long getPartId(List rowKeys, boolean consistency) ObTablePartitionConsistentException; public abstract Long getRandomPartId(); + + public abstract void setPartNum(int partNum); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java index e9f6f482..ad7b2cb8 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java @@ -26,6 +26,7 @@ import io.netty.buffer.ByteBuf; import java.sql.Timestamp; +import java.text.SimpleDateFormat; import java.time.*; import java.util.*; @@ -929,10 +930,13 @@ public ObObjMeta getDefaultObjMeta() { * Parse to comparable. */ @Override - public Timestamp parseToComparable(Object o, ObCollationType ct) - throws IllegalArgumentException, - FeatureNotSupportedException { - return parseTimestamp(this, o, ct); + public Date parseToComparable(Object o, ObCollationType ct) + throws IllegalArgumentException, + FeatureNotSupportedException { + if (o instanceof String) { + return TimeUtils.strToDate((String) o); + } + return (Date) o; } }, // The TIMESTAMP data type is used for values that contain both date and time parts. @@ -1823,6 +1827,26 @@ public static Timestamp parseTimestamp(ObObjType obObjType, Object object, + collationType + "argument:" + object); } + public static Date parseToDate(ObObjType obObjType, Object object, + ObCollationType collationType) throws Exception{ + if (object instanceof Date) { + return (Date) object; + } + + if (object instanceof java.util.Date) { + return (Date) object; + } + + if (object instanceof String) { + String str = (String)object; + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + return dateFormat.parse(str); + } + + throw new IllegalArgumentException(obObjType.name() + "can not parseToComparable with " + + collationType + "argument:" + object); + } + /* * Parse long. */ diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java index eae8abf6..b54f972f 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java @@ -28,8 +28,11 @@ import java.sql.Date; import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import java.util.Map; import static com.alipay.oceanbase.rpc.mutation.MutationFactory.*; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class ObTableBatchPutTest { @@ -46,12 +49,11 @@ public void setup() throws Exception { /* CREATE TABLE IF NOT EXISTS `batch_put` ( `id` varchar(20) NOT NULL, - `c_1` varchar(32) NOT NULL, - `t_1` datetime(3) DEFAULT NULL, + `b_1` varchar(32) DEFAULT NULL, + `t_1` datetime(3) NOT NULL, `t_2` timestamp(3) DEFAULT NULL, `t_3` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), - `b_1` bigint(20) DEFAULT NULL, - PRIMARY KEY(`id`, `c_1`)) partition by key(`id`) subpartition by key(`c_1`) subpartitions 4 partitions 97; + PRIMARY KEY(`id`, `t_1`)) partition by range columns(`t_1`) subpartition by key(`id`)); */ @Test public void test_batch_put() throws Exception { @@ -64,12 +66,11 @@ public void test_batch_put() throws Exception { Timestamp ts = new Timestamp(timeInSeconds); java.util.Date date = new Date(timeInSeconds); Row rowKey = new Row(colVal("id", String.valueOf(i)), // `id` varchar(20) - colVal("c_1", String.valueOf(i))); // `id` varchar(20) + colVal("t_1", date)); // `t_1` varchar(20) Put putOp = put().setRowKey(rowKey) - .addMutateColVal(colVal("t_1", date)) // `t_1` datetime(3) + .addMutateColVal(colVal("b_1", String.valueOf(i))) // `b_1` varchar(32) .addMutateColVal(colVal("t_2", ts)) // `t_2` timestamp(3) - .addMutateColVal(colVal("t_3", ts)) // `t_3` timestamp(3) - .addMutateColVal(colVal("b_1", i)); // `b_1` bigint(20) + .addMutateColVal(colVal("t_3", ts)); // `t_3` timestamp(3) batchOperation.addOperation(putOp); } BatchOperationResult result = batchOperation.setIsAtomic(true).execute(); From dc08fd31ad3bbedc6ce08528f66964ea29dc2d1e Mon Sep 17 00:00:00 2001 From: WeiXinChan Date: Fri, 29 Dec 2023 20:12:34 +0800 Subject: [PATCH 05/10] fix demo --- .../alipay/oceanbase/rpc/ObTableClient.java | 60 ++++++++-------- .../rpc/bolt/transport/ObTableRemoting.java | 3 +- .../rpc/exception/ExceptionUtil.java | 8 ++- .../location/model/partition/ObPartDesc.java | 2 +- .../rpc/mutation/BatchOperation.java | 4 +- .../alipay/oceanbase/rpc/mutation/Put.java | 12 ++-- .../rpc/protocol/payload/impl/ObObjType.java | 12 ++-- .../impl/execute/ObTableBatchOperation.java | 5 +- .../payload/impl/execute/ObTableEntity.java | 2 +- .../impl/execute/ObTableOperationType.java | 6 +- .../alipay/oceanbase/rpc/table/ObTable.java | 2 +- .../rpc/table/ObTableBatchOpsImpl.java | 14 ++-- .../rpc/table/ObTableClientBatchOpsImpl.java | 20 +++--- .../oceanbase/rpc/util/MonitorUtil.java | 6 +- src/test/java/ci.sql | 24 +++++-- .../oceanbase/rpc/ObTableAggregationTest.java | 28 ++++---- .../oceanbase/rpc/ObTableBatchPutTest.java | 58 +++++++++++---- .../rpc/ObTableClientAutoIncTest.java | 4 +- .../oceanbase/rpc/ObTableClientTest.java | 70 +++++++++---------- .../oceanbase/rpc/ObTableErrMsgTest.java | 3 +- 20 files changed, 192 insertions(+), 151 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index a58f40f9..6af3569a 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -454,7 +454,8 @@ void checkObTableOperationResult(String ip, int port, ObPayload request, ObPaylo long uniqueId = obTableOperationResult.getUniqueId() == 0 ? obTableOperationRequest .getUniqueId() : obTableOperationResult.getUniqueId(); ExceptionUtil.throwObTableException(ip, port, sequence, uniqueId, - obTableOperationResult.getHeader().getErrno(), obTableOperationResult.getHeader().getErrMsg()); + obTableOperationResult.getHeader().getErrno(), obTableOperationResult.getHeader() + .getErrMsg()); } void checkObTableQueryAndMutateResult(String ip, int port, ObPayload result) { @@ -601,7 +602,8 @@ void checkResult(String ip, int port, ObPayload request, ObPayload result) { long uniqueId = obTableOperationResult.getUniqueId() == 0 ? obTableOperationRequest .getUniqueId() : obTableOperationResult.getUniqueId(); ExceptionUtil.throwObTableException(ip, port, sequence, uniqueId, - obTableOperationResult.getHeader().getErrno(), obTableOperationResult.getHeader().getErrMsg()); + obTableOperationResult.getHeader().getErrno(), obTableOperationResult + .getHeader().getErrMsg()); } else if (result instanceof ObTableQueryAndMutateResult) { // TODO: Add func like throwObTableException() // which will output the ip / port / error information @@ -1209,7 +1211,7 @@ public ObPair getTable(String tableName, Object[] rowKey, bo boolean waitForRefresh, ObServerRoute route) throws Exception { TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh); - + long partId = getPartition(tableEntry, rowKey); return getTable(tableName, tableEntry, partId, waitForRefresh, route); @@ -1795,34 +1797,34 @@ public ObPayload execute(ObPair obPair) throws Exception { * @throws Exception exception */ public ObPayload putWithResult(final String tableName, final Object[] rowKey, - final List keyRanges, final String[] columns, - final Object[] values) throws Exception { + final List keyRanges, final String[] columns, + final Object[] values) throws Exception { final long start = System.currentTimeMillis(); return executeMutation(tableName, - new MutationExecuteCallback(rowKey, keyRanges) { - /** - * Execute. - */ - @Override - public ObPayload execute(ObPair obPair) throws Exception { - long TableTime = System.currentTimeMillis(); - ObTableParam tableParam = obPair.getRight(); - ObTable obTable = tableParam.getObTable(); - ObTableOperationRequest request = ObTableOperationRequest.getInstance( - tableName, PUT, rowKey, columns, values, - obTable.getObTableOperationTimeout()); - request.setTableId(tableParam.getTableId()); - // partId/tabletId - request.setPartitionId(tableParam.getPartitionId()); - ObPayload result = obTable.execute(request); - String endpoint = obTable.getIp() + ":" + obTable.getPort(); - MonitorUtil.info(request, database, tableName, "PUT", endpoint, rowKey, - (ObTableOperationResult) result, TableTime - start, - System.currentTimeMillis() - TableTime, getslowQueryMonitorThreshold()); - checkResult(obTable.getIp(), obTable.getPort(), request, result); - return result; - } - }); + new MutationExecuteCallback(rowKey, keyRanges) { + /** + * Execute. + */ + @Override + public ObPayload execute(ObPair obPair) throws Exception { + long TableTime = System.currentTimeMillis(); + ObTableParam tableParam = obPair.getRight(); + ObTable obTable = tableParam.getObTable(); + ObTableOperationRequest request = ObTableOperationRequest.getInstance( + tableName, PUT, rowKey, columns, values, + obTable.getObTableOperationTimeout()); + request.setTableId(tableParam.getTableId()); + // partId/tabletId + request.setPartitionId(tableParam.getPartitionId()); + ObPayload result = obTable.execute(request); + String endpoint = obTable.getIp() + ":" + obTable.getPort(); + MonitorUtil.info(request, database, tableName, "PUT", endpoint, rowKey, + (ObTableOperationResult) result, TableTime - start, + System.currentTimeMillis() - TableTime, getslowQueryMonitorThreshold()); + checkResult(obTable.getIp(), obTable.getPort(), request, result); + return result; + } + }); } /** diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java index c2cae5d5..d7a770a1 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java @@ -58,7 +58,8 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques if (request instanceof Credentialable) { if (conn.getCredential() == null) { - String errMessage = TraceUtil.formatTraceMessage(conn, request, "credential is null"); + String errMessage = TraceUtil.formatTraceMessage(conn, request, + "credential is null"); logger.warn(errMessage); throw new ObTableUnexpectedException(errMessage); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/exception/ExceptionUtil.java b/src/main/java/com/alipay/oceanbase/rpc/exception/ExceptionUtil.java index 18cf4a74..795de0cf 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/exception/ExceptionUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/exception/ExceptionUtil.java @@ -58,7 +58,8 @@ public static void throwObTableException(String host, int port, long sequence, l * @param resultCodes error code return from Ob Server */ public static ObTableException convertToObTableException(String host, int port, long sequence, - long uniqueId, int errorCode, String errorMessage) { + long uniqueId, int errorCode, + String errorMessage) { String trace = String.format("Y%X-%016X", uniqueId, sequence); String server = host + ":" + port; String errMsg = Objects.equals(errorMessage, "") ? "error occur in server" : errorMessage; @@ -70,8 +71,9 @@ public static ObTableException convertToObTableException(String host, int port, } // [errCode][errCodeName][errMsg][server][trace] - return new ObTableException("["+ String.valueOf(resultCodes.errorCode) +"]" + "["+ resultCodes.name() +"]" + - "[" + errMsg + "]" + "[" + server + "]" + "[" + trace + "]", resultCodes.errorCode); + return new ObTableException("[" + String.valueOf(resultCodes.errorCode) + "]" + "[" + + resultCodes.name() + "]" + "[" + errMsg + "]" + "[" + server + + "]" + "[" + trace + "]", resultCodes.errorCode); } /* diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartDesc.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartDesc.java index f60eddd0..39d029df 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartDesc.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartDesc.java @@ -243,5 +243,5 @@ public abstract Long getPartId(List rowKeys, boolean consistency) public abstract Long getRandomPartId(); - public abstract void setPartNum(int partNum); + public abstract void setPartNum(int partNum); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java b/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java index b35d1270..8fd9ab42 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java +++ b/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java @@ -34,7 +34,7 @@ public class BatchOperation { private Table client; boolean withResult; private List operations; - boolean isAtomic = false; + boolean isAtomic = false; boolean isSamePropertiesNames = false; public BatchOperation setSamePropertiesNames(boolean samePropertiesNames) { @@ -135,7 +135,7 @@ public BatchOperationResult execute() throws Exception { case PUT: ((Put) mutation).removeRowkeyFromMutateColval(); batchOps.put(mutation.getRowKey(), ((Put) mutation).getColumns(), - ((Put) mutation).getValues()); + ((Put) mutation).getValues()); break; case DEL: batchOps.delete(mutation.getRowKey()); diff --git a/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java b/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java index 79cf56c5..a4ca7730 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java +++ b/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java @@ -155,17 +155,15 @@ public MutationResult execute() throws Exception { removeRowkeyFromMutateColval(this.columns, this.values, this.rowKeyNames); if (null == getQuery()) { // simple Put, without filter - return new MutationResult(((ObTableClient) getClient()).putWithResult( - getTableName(), getRowKey(), getKeyRanges(), columns.toArray(new String[0]), - values.toArray())); + return new MutationResult(((ObTableClient) getClient()).putWithResult(getTableName(), + getRowKey(), getKeyRanges(), columns.toArray(new String[0]), values.toArray())); } else { if (checkMutationWithFilter()) { // QueryAndPut - ObTableOperation operation = ObTableOperation.getInstance( - ObTableOperationType.PUT, getRowKey(), columns.toArray(new String[0]), - values.toArray()); + ObTableOperation operation = ObTableOperation.getInstance(ObTableOperationType.PUT, + getRowKey(), columns.toArray(new String[0]), values.toArray()); return new MutationResult(((ObTableClient) getClient()).mutationWithFilter( - getQuery(), getRowKey(), getKeyRanges(), operation, true)); + getQuery(), getRowKey(), getKeyRanges(), operation, true)); } else { throw new ObTableUnexpectedException("should set filter and scan range both"); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java index ad7b2cb8..910a9e5f 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java @@ -931,8 +931,8 @@ public ObObjMeta getDefaultObjMeta() { */ @Override public Date parseToComparable(Object o, ObCollationType ct) - throws IllegalArgumentException, - FeatureNotSupportedException { + throws IllegalArgumentException, + FeatureNotSupportedException { if (o instanceof String) { return TimeUtils.strToDate((String) o); } @@ -1827,8 +1827,8 @@ public static Timestamp parseTimestamp(ObObjType obObjType, Object object, + collationType + "argument:" + object); } - public static Date parseToDate(ObObjType obObjType, Object object, - ObCollationType collationType) throws Exception{ + public static Date parseToDate(ObObjType obObjType, Object object, ObCollationType collationType) + throws Exception { if (object instanceof Date) { return (Date) object; } @@ -1838,13 +1838,13 @@ public static Date parseToDate(ObObjType obObjType, Object object, } if (object instanceof String) { - String str = (String)object; + String str = (String) object; SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); return dateFormat.parse(str); } throw new IllegalArgumentException(obObjType.name() + "can not parseToComparable with " - + collationType + "argument:" + object); + + collationType + "argument:" + object); } /* diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperation.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperation.java index ac096680..90ecb528 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperation.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperation.java @@ -46,7 +46,6 @@ public byte[] encode() { byte[] bytes = new byte[(int) getPayloadSize()]; int idx = 0; - // 0. encode header int headerLen = (int) getObUniVersionHeaderLength(getVersion(), getPayloadContentSize()); System.arraycopy(encodeObUniVersionHeader(getVersion(), getPayloadContentSize()), 0, bytes, @@ -55,7 +54,7 @@ public byte[] encode() { // 1. encode isSamePropertiesNames System.arraycopy(Serialization.encodeI8(isSamePropertiesNames ? (byte) 1 : (byte) 0), 0, - bytes, idx, 1); + bytes, idx, 1); idx++; // 2. encode Operation @@ -114,7 +113,7 @@ public Object decode(ByteBuf buf) { public long getPayloadContentSize() { long payloadContentSize = 0; - payloadContentSize+=1; // isSamePropertiesNames + payloadContentSize += 1; // isSamePropertiesNames payloadContentSize += Serialization.getNeedBytes(tableOperations.size()); for (int i = 0; i < tableOperations.size(); i++) { diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableEntity.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableEntity.java index 92b9866e..f092382e 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableEntity.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableEntity.java @@ -28,7 +28,7 @@ public class ObTableEntity extends AbstractObTableEntity { private ObRowKey rowKey = new ObRowKey(); private Map properties = new HashMap(); - private boolean onlyEncodeValue; + private boolean onlyEncodeValue; public void setOnlyEncodeValue(boolean onlyEncodeValue) { this.onlyEncodeValue = onlyEncodeValue; diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOperationType.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOperationType.java index b0c6f71c..cd654ab0 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOperationType.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOperationType.java @@ -26,9 +26,9 @@ public enum ObTableOperationType { INSERT_OR_UPDATE(4), // INSERT or UPDATE, columns not in arguments will remain unchanged REPLACE(5), // DELETE & INSERT, columns not in arguments will change to default value INCREMENT(6), // the column must be can be cast to long. if exist increase, else insert - APPEND(7),// append column value - SCAN(8),// scan value - TTL(9),// expire value + APPEND(7), // append column value + SCAN(8), // scan value + TTL(9), // expire value PUT(10);// put value private int value; diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java index a80ebd39..6ce0509f 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java @@ -396,7 +396,7 @@ private void checkObTableOperationResult(String ip, int port, Object result) { ((ObTableOperationResult) result).setExecutePort(port); ExceptionUtil.throwObTableException(ip, port, obTableOperationResult.getSequence(), obTableOperationResult.getUniqueId(), obTableOperationResult.getHeader().getErrno(), - obTableOperationResult.getHeader().getErrMsg()); + obTableOperationResult.getHeader().getErrMsg()); } /* diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableBatchOpsImpl.java index 8b4f52f1..edb31373 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableBatchOpsImpl.java @@ -159,10 +159,9 @@ public List execute() throws RemotingException, InterruptedException { results.add(realResult.getAffectedRows()); } } else { - results - .add(ExceptionUtil.convertToObTableException(obTable.getIp(), - obTable.getPort(), realResult.getSequence(), realResult.getUniqueId(), - errCode, realResult.getHeader().getErrMsg())); + results.add(ExceptionUtil.convertToObTableException(obTable.getIp(), + obTable.getPort(), realResult.getSequence(), realResult.getUniqueId(), errCode, + realResult.getHeader().getErrMsg())); } } return results; @@ -200,10 +199,9 @@ public List executeWithResult() throws Exception { + realResult.getOperationType()); } } else { - results - .add(ExceptionUtil.convertToObTableException(obTable.getIp(), - obTable.getPort(), realResult.getSequence(), realResult.getUniqueId(), - errCode, realResult.getHeader().getErrMsg())); + results.add(ExceptionUtil.convertToObTableException(obTable.getIp(), + obTable.getPort(), realResult.getSequence(), realResult.getUniqueId(), errCode, + realResult.getHeader().getErrMsg())); } } return results; diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java index e9abe8bb..30f44c8d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java @@ -176,8 +176,8 @@ public List execute() throws Exception { } } else { results.add(ExceptionUtil.convertToObTableException(result.getExecuteHost(), - result.getExecutePort(), result.getSequence(), result.getUniqueId(), - errCode, result.getHeader().getErrMsg())); + result.getExecutePort(), result.getSequence(), result.getUniqueId(), errCode, + result.getHeader().getErrMsg())); } } return results; @@ -210,8 +210,8 @@ public List executeWithResult() throws Exception { } } else { results.add(ExceptionUtil.convertToObTableException(result.getExecuteHost(), - result.getExecutePort(), result.getSequence(), result.getUniqueId(), - errCode, result.getHeader().getErrMsg())); + result.getExecutePort(), result.getSequence(), result.getUniqueId(), errCode, + result.getHeader().getErrMsg())); } } return results; @@ -256,12 +256,12 @@ public Map>>> obTableOperations.getRight().add(new ObPair(i, operation)); } -// if (atomicOperation) { -// if (partitionOperationsMap.size() > 1) { -// throw new ObTablePartitionConsistentException( -// "require atomic operation but found across partition may cause consistent problem "); -// } -// } + // if (atomicOperation) { + // if (partitionOperationsMap.size() > 1) { + // throw new ObTablePartitionConsistentException( + // "require atomic operation but found across partition may cause consistent problem "); + // } + // } return partitionOperationsMap; } diff --git a/src/main/java/com/alipay/oceanbase/rpc/util/MonitorUtil.java b/src/main/java/com/alipay/oceanbase/rpc/util/MonitorUtil.java index 3495bbd4..8185455d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/util/MonitorUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/util/MonitorUtil.java @@ -117,7 +117,8 @@ private static String logMessage(String traceId, String database, String tableNa endpoint = endpoint.replaceAll(",", "#"); } // if rowkeys is empty point, then append "rowKeys:null" into log message - String argsValue = rowKeys == null ? "rowKeys:null" : buildParamsString(Arrays.asList(rowKeys)); + String argsValue = rowKeys == null ? "rowKeys:null" : buildParamsString(Arrays + .asList(rowKeys)); ResultCodes resultCode = ResultCodes.valueOf(result.getHeader().getErrno()); String res = ""; @@ -163,7 +164,8 @@ private static String logMessage(String traceId, String database, String tableNa endpoint = endpoint.replaceAll(",", "#"); } // if rowkeys is empty point, then append "rowKeys:null" into log message - String argsValue = rowKeys == null ? "rowKeys:null" : buildParamsString(Arrays.asList(rowKeys)); + String argsValue = rowKeys == null ? "rowKeys:null" : buildParamsString(Arrays + .asList(rowKeys)); StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(traceId).append(" - ").append(database).append(",").append(tableName) diff --git a/src/test/java/ci.sql b/src/test/java/ci.sql index 9080df98..36dfb6f9 100644 --- a/src/test/java/ci.sql +++ b/src/test/java/ci.sql @@ -311,11 +311,27 @@ CREATE TABLE IF NOT EXISTS `sync_item` ( CREATE TABLE IF NOT EXISTS `batch_put` ( `id` varchar(20) NOT NULL, - `c_1` varchar(32) NOT NULL, - `t_1` datetime(3) DEFAULT NULL, + `b_1` varchar(32) DEFAULT NULL, + `t_1` datetime(3) NOT NULL, `t_2` timestamp(3) DEFAULT NULL, `t_3` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), - `b_1` bigint(20) DEFAULT NULL, - PRIMARY KEY(`id`, `c_1`)) partition by key(`id`) subpartition by key(`c_1`) subpartitions 4 partitions 97; + `c_1` bigint(20) default NULL, + PRIMARY KEY(`id`, `t_1`)) partition by range columns(`t_1`) subpartition by key(`id`) subpartition template ( + subpartition `p0`, + subpartition `p1`, + subpartition `p2`, + subpartition `p3`, + subpartition `p4`, + subpartition `p5`, + subpartition `p6`, + subpartition `p7`, + subpartition `p8`, + subpartition `p9`, + subpartition `p10`) + (partition `p0` values less than ('2023-12-01 00:00:00'), + partition `p1` values less than ('2023-12-10 00:00:00'), + partition `p2` values less than ('2023-12-20 00:00:00'), + partition `p3` values less than ('2023-12-30 00:00:00'), + partition `p4` values less than ('2024-01-01 00:00:00')); alter system set kv_hotkey_throttle_threshold = 50; diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableAggregationTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableAggregationTest.java index 6166fb9b..8a171058 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableAggregationTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableAggregationTest.java @@ -68,6 +68,7 @@ public class ObTableAggregationTest { private ObTableClient client; + @Before public void setup() throws Exception { System.setProperty("ob_table_min_rslist_refresh_interval_millis", "1"); @@ -561,20 +562,20 @@ PRIMARY KEY(`c1`) // Test aggregation with big int , should report error out of range public void testAggregationWithBigint() throws Exception { final String TABLE_NAME = "test_aggregation"; - try{ + try { final ObTableClient client = (ObTableClient) this.client; - client.addRowKeyElement(TABLE_NAME, new String[]{"c1"}); + client.addRowKeyElement(TABLE_NAME, new String[] { "c1" }); SimpleDateFormat sdf = new SimpleDateFormat(" yyyy-MM-dd HH:mm:ss "); Date date1 = sdf.parse(" 1000-12-10 19:20:00 "); Date date2 = sdf.parse(" 1010-07-10 19:20:00 "); Date date3 = sdf.parse(" 1100-02-10 19:20:00 "); - client.insert(TABLE_NAME, "first_row", new String[]{"c2", "c3", "c4", "c5", - "c6", "c7"}, new Object[]{1, 9223372036854775807L, 1.0f, 1.0, (byte) 1, date1}); - client.insert(TABLE_NAME, "second_row", new String[]{"c2", "c3", "c4", "c5", - "c6", "c7"}, new Object[]{2, 9223372036854775807L, 2.0f, 2.0, (byte) 2, date2}); - client.insert(TABLE_NAME, "third_row", new String[]{"c2", "c3", "c4", "c5", - "c6", "c7"}, new Object[]{3, 9223372036854775807L, 3.0f, 3.0, (byte) 3, date3}); + client.insert(TABLE_NAME, "first_row", new String[] { "c2", "c3", "c4", "c5", "c6", + "c7" }, new Object[] { 1, 9223372036854775807L, 1.0f, 1.0, (byte) 1, date1 }); + client.insert(TABLE_NAME, "second_row", new String[] { "c2", "c3", "c4", "c5", "c6", + "c7" }, new Object[] { 2, 9223372036854775807L, 2.0f, 2.0, (byte) 2, date2 }); + client.insert(TABLE_NAME, "third_row", new String[] { "c2", "c3", "c4", "c5", "c6", + "c7" }, new Object[] { 3, 9223372036854775807L, 3.0f, 3.0, (byte) 3, date3 }); ObTableAggregation obtableAggregation = client.aggregate(TABLE_NAME); // test @@ -585,11 +586,12 @@ public void testAggregationWithBigint() throws Exception { Assert.assertEquals(10L, obtableAggregationResult.get("sum(c3)")); } catch (Exception e) { - Assert.assertTrue(((ObTableException) e).getMessage().contains("[OB_DATA_OUT_OF_RANGE][Out of range value for column 'sum(c3)' at row 0]")); + Assert.assertTrue(((ObTableException) e).getMessage().contains( + "[OB_DATA_OUT_OF_RANGE][Out of range value for column 'sum(c3)' at row 0]")); } finally { } } - + @Test // Test aggregation with empty table public void testAggregationEmptyVal() throws Exception { @@ -602,12 +604,12 @@ public void testAggregationEmptyVal() throws Exception { */ final ObTableClient client = (ObTableClient) this.client; - client.addRowKeyElement("test_partition_aggregation", new String[]{"c1"}); + client.addRowKeyElement("test_partition_aggregation", new String[] { "c1" }); try { // with filter ObTableAggregation obtableAggregationWithFilter = client - .aggregate("test_partition_aggregation"); + .aggregate("test_partition_aggregation"); // test obtableAggregationWithFilter.max("c2"); @@ -624,7 +626,7 @@ public void testAggregationEmptyVal() throws Exception { // execute ObTableAggregationResult obtableAggregationResultWithFilter = obtableAggregationWithFilter - .execute(); + .execute(); // empty table generate null row Assert.assertEquals(5, obtableAggregationResultWithFilter.getRow().size()); diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java index b54f972f..7ea31b7a 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java @@ -25,25 +25,31 @@ import org.junit.Before; import org.junit.Test; - import java.sql.Date; import java.sql.Timestamp; -import java.text.SimpleDateFormat; -import java.util.Map; import static com.alipay.oceanbase.rpc.mutation.MutationFactory.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class ObTableBatchPutTest { - ObTableClient client; - public static String tableName = "batch_put"; + public static String PARAM_URL = "configer-server url"; // http://ip:port/services?User_ID=alibaba&UID=test&Action=ObRootServiceInfo&ObCluster=xxx&database=xxx + public static String FULL_USER_NAME = "userName@tenantName@clusterName"; + public static String PASSWORD = "password for userName"; + public static String PROXY_SYS_USER_NAME = "root"; + public static String PROXY_SYS_USER_PASSWORD = ""; + + ObTableClient client = new ObTableClient(); + public static String tableName = "batch_put"; @Before public void setup() throws Exception { - final ObTableClient obTableClient = ObTableClientTestUtil.newTestClient(); - obTableClient.init(); - this.client = obTableClient; + client.setFullUserName(FULL_USER_NAME); + client.setParamURL(PARAM_URL); + client.setPassword(PASSWORD); + client.setSysUserName(PROXY_SYS_USER_NAME); + client.setSysPassword(PROXY_SYS_USER_PASSWORD); + client.init(); } /* @@ -53,27 +59,49 @@ public void setup() throws Exception { `t_1` datetime(3) NOT NULL, `t_2` timestamp(3) DEFAULT NULL, `t_3` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), - PRIMARY KEY(`id`, `t_1`)) partition by range columns(`t_1`) subpartition by key(`id`)); + `c_1` bigint(20) default NULL, + PRIMARY KEY(`id`, `t_1`)) partition by range columns(`t_1`) subpartition by key(`id`) subpartition template ( + subpartition `p0`, + subpartition `p1`, + subpartition `p2`, + subpartition `p3`, + subpartition `p4`, + subpartition `p5`, + subpartition `p6`, + subpartition `p7`, + subpartition `p8`, + subpartition `p9`, + subpartition `p10`) + (partition `p0` values less than ('2023-12-01 00:00:00'), + partition `p1` values less than ('2023-12-10 00:00:00'), + partition `p2` values less than ('2023-12-20 00:00:00'), + partition `p3` values less than ('2023-12-30 00:00:00'), + partition `p4` values less than ('2024-01-01 00:00:00')); */ @Test public void test_batch_put() throws Exception { try { // 当设置setSamePropertiesNames(true)时,表中所有的列都必须填充值 - BatchOperation batchOperation = client.batchOperation(tableName).setIsAtomic(true).setSamePropertiesNames(true); - for (long i = 0; i < 100; i++) { + BatchOperation batchOperation = client.batchOperation(tableName).setIsAtomic(true) + .setSamePropertiesNames(true); + long batchSize = 50; + for (long i = 0; i < batchSize; i++) { // 清除毫秒部分,仅保留到秒 long timeInSeconds = (System.currentTimeMillis() / 1000) * 1000; Timestamp ts = new Timestamp(timeInSeconds); java.util.Date date = new Date(timeInSeconds); Row rowKey = new Row(colVal("id", String.valueOf(i)), // `id` varchar(20) - colVal("t_1", date)); // `t_1` varchar(20) + colVal("t_1", date)); // `t_1` varchar(20) Put putOp = put().setRowKey(rowKey) - .addMutateColVal(colVal("b_1", String.valueOf(i))) // `b_1` varchar(32) - .addMutateColVal(colVal("t_2", ts)) // `t_2` timestamp(3) - .addMutateColVal(colVal("t_3", ts)); // `t_3` timestamp(3) + .addMutateColVal(colVal("b_1", String.valueOf(i))) // `b_1` varchar(32) + .addMutateColVal(colVal("t_2", ts)) // `t_2` timestamp(3) + .addMutateColVal(colVal("t_3", ts)) // `t_3` timestamp(3) + .addMutateColVal(colVal("c_1", i)); // `c_1` bigint(20) batchOperation.addOperation(putOp); } BatchOperationResult result = batchOperation.setIsAtomic(true).execute(); + assertEquals(batchSize, result.size()); + assertEquals(0, result.getWrongCount()); } catch (Exception e) { e.printStackTrace(); assertTrue(false); diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableClientAutoIncTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableClientAutoIncTest.java index 009feb1e..2f1e4441 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableClientAutoIncTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableClientAutoIncTest.java @@ -535,7 +535,7 @@ public void testAutoIncrementNotRowkey() throws Exception { dropTable(TABLE_NAME); } } - + // CREATE TABLE IF NOT EXISTS `test_auto_increment_one_rowkey` (`c1` int auto_increment, `c2` int NOT NULL, PRIMARY KEY(`c1`)); @Test // test insert null into auto increment column @@ -548,7 +548,7 @@ public void testAutoColumnRowKey() throws Exception { client.insert(TABLE_NAME, null, new String[] { "c2" }, new Object[] { 1 }); } catch (Exception e) { Assert.assertEquals("Cannot read the array length because \"rowKeys\" is null", - ((NullPointerException) e).getMessage()); + ((NullPointerException) e).getMessage()); } } finally { } diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableClientTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableClientTest.java index c03beefb..e114c5b6 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableClientTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableClientTest.java @@ -2321,22 +2321,20 @@ public void testBatchInsertJudge() throws Exception { try { // prepare data with insert - Insert insert_0 = insert().setRowKey(row(colVal("measurement", "measurement1"), - colVal("tag_key", "tag_key1"), - colVal("tag_value", "tag_value1") )) - .addMutateColVal(colVal("series_ids", "series_ids1")); - Insert insert_1 = insert().setRowKey(row(colVal("measurement", "measurement1"), - colVal("tag_key", "tag_key2"), - colVal("tag_value", "tag_value2") )) - .addMutateColVal(colVal("series_ids", "series_ids2")); - Insert insert_2 = insert().setRowKey(row(colVal("measurement", "measurement1"), - colVal("tag_key", "tag_key3"), - colVal("tag_value", "tag_value3") )) - .addMutateColVal(colVal("series_ids", "series_ids3")); + Insert insert_0 = insert().setRowKey( + row(colVal("measurement", "measurement1"), colVal("tag_key", "tag_key1"), + colVal("tag_value", "tag_value1"))).addMutateColVal( + colVal("series_ids", "series_ids1")); + Insert insert_1 = insert().setRowKey( + row(colVal("measurement", "measurement1"), colVal("tag_key", "tag_key2"), + colVal("tag_value", "tag_value2"))).addMutateColVal( + colVal("series_ids", "series_ids2")); + Insert insert_2 = insert().setRowKey( + row(colVal("measurement", "measurement1"), colVal("tag_key", "tag_key3"), + colVal("tag_value", "tag_value3"))).addMutateColVal( + colVal("series_ids", "series_ids3")); BatchOperationResult batchResult = client.batchOperation("cse_index_1") - .addOperation(insert_0) - .addOperation(insert_1).addOperation(insert_2) - .execute(); + .addOperation(insert_0).addOperation(insert_1).addOperation(insert_2).execute(); Assert.assertEquals(0, batchResult.getWrongCount()); Assert.assertEquals(3, batchResult.getCorrectCount()); @@ -2369,12 +2367,12 @@ partition by range columns (`c1`) ( PARTITION p0 VALUES LESS THAN (300), Object[] c4 = new Object[] { 10L, null, null, null, null }; try { for (int i = 0; i < c1.length; i++) { - client.insert(tableName, c1[i], columnNames, new Object[]{c2[i], c3[i], c4[i]}); + client.insert(tableName, c1[i], columnNames, new Object[] { c2[i], c3[i], c4[i] }); } // query with c4 is null ,limit is 2 and offset 1 TableQuery tableQuery = client.query(tableName).select(allColumnNames) - .addScanRange(new Object[] { 0L }, new Object[] { 200L }).limit(2, -1); + .addScanRange(new Object[] { 0L }, new Object[] { 200L }).limit(2, -1); QueryResultSet result = tableQuery.execute(); int expRowIdx[] = { 2, 3 }; Assert.assertEquals(result.cacheSize(), expRowIdx.length); @@ -2383,7 +2381,7 @@ partition by range columns (`c1`) ( PARTITION p0 VALUES LESS THAN (300), Map value = result.getRow(); assertEquals(value.get(allColumnNames[0]), c1[expRowIdx[i]]); assertTrue(Arrays.equals((byte[]) value.get(allColumnNames[1]), - (byte[]) c2[expRowIdx[i]])); + (byte[]) c2[expRowIdx[i]])); assertEquals(value.get(allColumnNames[2]), c3[expRowIdx[i]]); assertEquals(value.get(allColumnNames[3]), c4[expRowIdx[i]]); } @@ -2391,7 +2389,7 @@ partition by range columns (`c1`) ( PARTITION p0 VALUES LESS THAN (300), } catch (Exception e) { e.printStackTrace(); Assert.assertEquals("offset can not be use without limit", - ((ObTableException) e).getMessage()); + ((ObTableException) e).getMessage()); } finally { for (int i = 0; i < c1.length; i++) { client.delete("test_query_filter_mutate", new Object[] { c1[i] }); @@ -2411,16 +2409,15 @@ partition by range columns (`c1`) ( PARTITION p0 VALUES LESS THAN (300), final String TABLE_NAME = "test_query_filter_mutate"; System.setProperty("ob_table_min_rslist_refresh_interval_millis", "1"); - ((ObTableClient) client) - .addRowKeyElement(TABLE_NAME, new String[] { "c1" }); //同索引列的值一样 + ((ObTableClient) client).addRowKeyElement(TABLE_NAME, new String[] { "c1" }); //同索引列的值一样 try { - client.insert(TABLE_NAME, new Object[] { 0L }, new String[] { "c2", - "c3" }, new Object[] { new byte[] { 1 }, "row1" }); - client.insert(TABLE_NAME, new Object[] { 1L }, new String[] { "c2", - "c3" }, new Object[] { new byte[] { 1 }, "row2" }); - client.insert(TABLE_NAME, new Object[] { 2L }, new String[] { "c2", - "c3" }, new Object[] { new byte[] { 1 }, "row3" }); + client.insert(TABLE_NAME, new Object[] { 0L }, new String[] { "c2", "c3" }, + new Object[] { new byte[] { 1 }, "row1" }); + client.insert(TABLE_NAME, new Object[] { 1L }, new String[] { "c2", "c3" }, + new Object[] { new byte[] { 1 }, "row2" }); + client.insert(TABLE_NAME, new Object[] { 2L }, new String[] { "c2", "c3" }, + new Object[] { new byte[] { 1 }, "row3" }); TableQuery tableQuery = client.query(TABLE_NAME); tableQuery.addScanRange(new Object[] { 0L }, new Object[] { 250L }); @@ -2446,24 +2443,21 @@ public void testInsertMutation() throws Exception { try { //insert MutationResult insertResult = client.insert("sync_item") - .setRowKey(row(colVal("uid", "a1"), - colVal("object_id", "b1"))) - .addMutateColVal(colVal("type", 1)) - .addMutateColVal(colVal("ver_oid", "1")) - .addMutateColVal(colVal("ver_ts", timestamp)) - .addMutateColVal(colVal("data_id", "data")) - .execute(); + .setRowKey(row(colVal("uid", "a1"), colVal("object_id", "b1"))) + .addMutateColVal(colVal("type", 1)).addMutateColVal(colVal("ver_oid", "1")) + .addMutateColVal(colVal("ver_ts", timestamp)) + .addMutateColVal(colVal("data_id", "data")).execute(); Assert.assertEquals(1, insertResult.getAffectedRows()); //insert null -> not null col try { - client.insert("sync_item") - .setRowKey(row(colVal("uid", "a2"))) - .execute(); + client.insert("sync_item").setRowKey(row(colVal("uid", "a2"))).execute(); } catch (ObTableException e) { Assert.assertEquals(ResultCodes.OB_BAD_NULL_ERROR.errorCode, e.getErrorCode()); } } catch (Exception e) { - Assert.assertEquals("RowKey size mismatch, rowKey list is {uid=0, object_id=1}but found[a2]", e.getMessage()); + Assert.assertEquals( + "RowKey size mismatch, rowKey list is {uid=0, object_id=1}but found[a2]", + e.getMessage()); } finally { } } diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableErrMsgTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableErrMsgTest.java index be48d303..a0b58ec8 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableErrMsgTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableErrMsgTest.java @@ -30,7 +30,7 @@ public class ObTableErrMsgTest { ObTableClient client; - public static String tableName = "error_message_table"; + public static String tableName = "error_message_table"; @Before public void setup() throws Exception { @@ -237,4 +237,3 @@ public void testUpdateVirtualColumnNotSupport() { assertTrue(thrown.getMessage().contains("[-4007][OB_NOT_SUPPORTED][assign virtual generated column not supported]")); } } - From 3e634b3b389b5c0e9c07192b6fc9ccdd10fae82a Mon Sep 17 00:00:00 2001 From: WeiXinChan Date: Tue, 2 Jan 2024 11:39:40 +0800 Subject: [PATCH 06/10] add batch atomic defensive check --- .../rpc/table/ObTableClientBatchOpsImpl.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java index 30f44c8d..4e920a11 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java @@ -236,6 +236,7 @@ public Map>>> return partitionOperationsMap; } + boolean has_put = false; for (int i = 0; i < operations.size(); i++) { ObTableOperation operation = operations.get(i); ObRowKey rowKeyObject = operation.getEntity().getRowKey(); @@ -244,6 +245,9 @@ public Map>>> for (int j = 0; j < rowKeySize; j++) { rowKey[j] = rowKeyObject.getObj(j).getValue(); } + if (!has_put && operation.getOperationType() == ObTableOperationType.PUT) { + has_put = true; + } ObPair tableObPair = obTableClient.getTable(tableName, rowKey, false, false, obTableClient.getRoute(batchOperation.isReadOnly())); ObPair>> obTableOperations = partitionOperationsMap @@ -256,12 +260,13 @@ public Map>>> obTableOperations.getRight().add(new ObPair(i, operation)); } - // if (atomicOperation) { - // if (partitionOperationsMap.size() > 1) { - // throw new ObTablePartitionConsistentException( - // "require atomic operation but found across partition may cause consistent problem "); - // } - // } + + if (atomicOperation && !has_put) { + if (partitionOperationsMap.size() > 1) { + throw new ObTablePartitionConsistentException( + "require atomic operation but found across partition may cause consistent problem "); + } + } return partitionOperationsMap; } From 8fb95f13b1b4196484690635674e134c9cd00564 Mon Sep 17 00:00:00 2001 From: WeiXinChan Date: Tue, 2 Jan 2024 11:55:26 +0800 Subject: [PATCH 07/10] fix compile --- .../rpc/location/model/partition/ObListPartDesc.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObListPartDesc.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObListPartDesc.java index 1162dccf..e8bd1482 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObListPartDesc.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObListPartDesc.java @@ -99,12 +99,6 @@ public void prepare() throws IllegalArgumentException { super.prepare(); } - /* - * Set part num. - */ - public void setPartNum(int partNum) { - } - /* * Get part ids. */ From facdd8f64926118bad5fb9a6ad0ee4b87428f5f5 Mon Sep 17 00:00:00 2001 From: WeiXinChan Date: Tue, 2 Jan 2024 12:09:26 +0800 Subject: [PATCH 08/10] fix review --- .../rpc/protocol/payload/impl/ObObjType.java | 20 ------------------- .../oceanbase/rpc/ObTableBatchPutTest.java | 2 +- 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java index 7aa271c3..17bf89c8 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java @@ -1827,26 +1827,6 @@ public static Timestamp parseTimestamp(ObObjType obObjType, Object object, + collationType + "argument:" + object); } - public static Date parseToDate(ObObjType obObjType, Object object, ObCollationType collationType) - throws Exception { - if (object instanceof Date) { - return (Date) object; - } - - if (object instanceof java.util.Date) { - return (Date) object; - } - - if (object instanceof String) { - String str = (String) object; - SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); - return dateFormat.parse(str); - } - - throw new IllegalArgumentException(obObjType.name() + "can not parseToComparable with " - + collationType + "argument:" + object); - } - /* * Parse long. */ diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java index 7ea31b7a..7bfbcfe1 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java @@ -108,4 +108,4 @@ public void test_batch_put() throws Exception { } finally { } } -} \ No newline at end of file +} From cd3620f7c7b06d69ffcd2257c95fb699df1ad68d Mon Sep 17 00:00:00 2001 From: "wumengjie.wmj" Date: Mon, 8 Jan 2024 10:01:23 +0800 Subject: [PATCH 09/10] feat: opt put, use LinkedHashSet instead of List --- .../oceanbase/rpc/mutation/Mutation.java | 19 ++++++++++++++++--- .../alipay/oceanbase/rpc/mutation/Put.java | 16 +++++++++------- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/mutation/Mutation.java b/src/main/java/com/alipay/oceanbase/rpc/mutation/Mutation.java index 109cf34d..64fc9a3c 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/mutation/Mutation.java +++ b/src/main/java/com/alipay/oceanbase/rpc/mutation/Mutation.java @@ -25,9 +25,7 @@ import com.alipay.oceanbase.rpc.table.api.Table; import com.alipay.oceanbase.rpc.table.api.TableQuery; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.*; public class Mutation { private String tableName; @@ -440,4 +438,19 @@ static void removeRowkeyFromMutateColval(List columns, List valu } } } + + static void removeRowkeyFromMutateColval(LinkedHashSet columns, List values, + List rowKeyNames) { + if (null == columns || null == rowKeyNames || columns.size() != values.size()) { + return; + } + Iterator itr = columns.iterator(); + for (int i = values.size() - 1; i >= 0 && itr.hasNext(); --i) { + String column = itr.next(); + if (rowKeyNames.contains(column)) { + itr.remove(); + values.remove(i); + } + } + } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java b/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java index a4ca7730..19c6a016 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java +++ b/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java @@ -27,19 +27,20 @@ import com.alipay.oceanbase.rpc.table.api.Table; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; public class Put extends Mutation { - private List columns = null; - private List values = null; + private LinkedHashSet columns = null; + private List values = null; /* * default constructor */ public Put() { super(); - columns = new ArrayList(); + columns = new LinkedHashSet(); values = new ArrayList(); } @@ -48,7 +49,7 @@ public Put() { */ public Put(Table client, String tableName) { super(client, tableName); - columns = new ArrayList(); + columns = new LinkedHashSet(); values = new ArrayList(); } @@ -105,7 +106,7 @@ public Put addMutateRow(Row rows) { * get the mutated columns' name */ public String[] getColumns() { - return columns.toArray(new String[0]); + return columns.toArray(new String[columns.size()]); } /* @@ -156,12 +157,13 @@ public MutationResult execute() throws Exception { if (null == getQuery()) { // simple Put, without filter return new MutationResult(((ObTableClient) getClient()).putWithResult(getTableName(), - getRowKey(), getKeyRanges(), columns.toArray(new String[0]), values.toArray())); + getRowKey(), getKeyRanges(), columns.toArray(new String[columns.size()]), + values.toArray())); } else { if (checkMutationWithFilter()) { // QueryAndPut ObTableOperation operation = ObTableOperation.getInstance(ObTableOperationType.PUT, - getRowKey(), columns.toArray(new String[0]), values.toArray()); + getRowKey(), columns.toArray(new String[columns.size()]), values.toArray()); return new MutationResult(((ObTableClient) getClient()).mutationWithFilter( getQuery(), getRowKey(), getKeyRanges(), operation, true)); } else { From 499dfb35df4f98b7dcefb2eba7202b1afcab27a8 Mon Sep 17 00:00:00 2001 From: "wumengjie.wmj" Date: Mon, 8 Jan 2024 10:34:10 +0800 Subject: [PATCH 10/10] del removeRowkeyFromMutateColval when execute put --- src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java b/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java index 19c6a016..ad29f6e2 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java +++ b/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java @@ -153,7 +153,7 @@ public MutationResult execute() throws Exception { } else if (null == getClient()) { throw new ObTableException("client is null"); } - removeRowkeyFromMutateColval(this.columns, this.values, this.rowKeyNames); + if (null == getQuery()) { // simple Put, without filter return new MutationResult(((ObTableClient) getClient()).putWithResult(getTableName(),