diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java b/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java index 665b462f..d91e8d64 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java @@ -72,6 +72,11 @@ public void insert(Object[] rowkeys, String[] columns, Object[] values) { tableBatchOps.insert(rowkeys, columns, values); } + @Override + public void put(Object[] rowkeys, String[] columns, Object[] values) { + tableBatchOps.put(rowkeys, columns, values); + } + /* * Replace. */ @@ -167,4 +172,14 @@ public void setAtomicOperation(boolean atomicOperation) { super.setAtomicOperation(atomicOperation); tableBatchOps.setAtomicOperation(atomicOperation); } + + public boolean isSamePropertiesNames() { + return super.isSamePropertiesNames(); + } + + public void setSamePropertiesNames(boolean samePropertiesNames) { + super.setSamePropertiesNames(samePropertiesNames); + tableBatchOps.setSamePropertiesNames(samePropertiesNames); + } + } diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index aab42bf4..6af3569a 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -1786,6 +1786,47 @@ public ObPayload execute(ObPair obPair) throws Exception { }); } + /** + * put with result + * @param tableName which table to put + * @param rowKey insert row key + * @param keyRanges scan range + * @param columns columns name to put + * @param values new values + * @return execute result + * @throws Exception exception + */ + public ObPayload putWithResult(final String tableName, final Object[] rowKey, + final List keyRanges, final String[] columns, + final Object[] values) throws Exception { + final long start = System.currentTimeMillis(); + return executeMutation(tableName, + new MutationExecuteCallback(rowKey, keyRanges) { + /** + * Execute. + */ + @Override + public ObPayload execute(ObPair obPair) throws Exception { + long TableTime = System.currentTimeMillis(); + ObTableParam tableParam = obPair.getRight(); + ObTable obTable = tableParam.getObTable(); + ObTableOperationRequest request = ObTableOperationRequest.getInstance( + tableName, PUT, rowKey, columns, values, + obTable.getObTableOperationTimeout()); + request.setTableId(tableParam.getTableId()); + // partId/tabletId + request.setPartitionId(tableParam.getPartitionId()); + ObPayload result = obTable.execute(request); + String endpoint = obTable.getIp() + ":" + obTable.getPort(); + MonitorUtil.info(request, database, tableName, "PUT", endpoint, rowKey, + (ObTableOperationResult) result, TableTime - start, + System.currentTimeMillis() - TableTime, getslowQueryMonitorThreshold()); + checkResult(obTable.getIp(), obTable.getPort(), request, result); + return result; + } + }); + } + /** * Replace. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java b/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java index 8b1e34a3..8fd9ab42 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java +++ b/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java @@ -34,7 +34,13 @@ public class BatchOperation { private Table client; boolean withResult; private List operations; - boolean isAtomic = false; + boolean isAtomic = false; + boolean isSamePropertiesNames = false; + + public BatchOperation setSamePropertiesNames(boolean samePropertiesNames) { + isSamePropertiesNames = samePropertiesNames; + return this; + } /* * default constructor @@ -126,6 +132,11 @@ public BatchOperationResult execute() throws Exception { batchOps.insert(mutation.getRowKey(), ((Insert) mutation).getColumns(), ((Insert) mutation).getValues()); break; + case PUT: + ((Put) mutation).removeRowkeyFromMutateColval(); + batchOps.put(mutation.getRowKey(), ((Put) mutation).getColumns(), + ((Put) mutation).getValues()); + break; case DEL: batchOps.delete(mutation.getRowKey()); break; @@ -168,6 +179,7 @@ public BatchOperationResult execute() throws Exception { } } batchOps.setAtomicOperation(isAtomic); + batchOps.setSamePropertiesNames(isSamePropertiesNames); return new BatchOperationResult(batchOps.executeWithResult()); } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/mutation/Mutation.java b/src/main/java/com/alipay/oceanbase/rpc/mutation/Mutation.java index 109cf34d..64fc9a3c 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/mutation/Mutation.java +++ b/src/main/java/com/alipay/oceanbase/rpc/mutation/Mutation.java @@ -25,9 +25,7 @@ import com.alipay.oceanbase.rpc.table.api.Table; import com.alipay.oceanbase.rpc.table.api.TableQuery; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.*; public class Mutation { private String tableName; @@ -440,4 +438,19 @@ static void removeRowkeyFromMutateColval(List columns, List valu } } } + + static void removeRowkeyFromMutateColval(LinkedHashSet columns, List values, + List rowKeyNames) { + if (null == columns || null == rowKeyNames || columns.size() != values.size()) { + return; + } + Iterator itr = columns.iterator(); + for (int i = values.size() - 1; i >= 0 && itr.hasNext(); --i) { + String column = itr.next(); + if (rowKeyNames.contains(column)) { + itr.remove(); + values.remove(i); + } + } + } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java b/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java index a4ddd447..ad29f6e2 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java +++ b/src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java @@ -17,20 +17,31 @@ package com.alipay.oceanbase.rpc.mutation; +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.exception.ObTableException; +import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException; +import com.alipay.oceanbase.rpc.filter.ObTableFilter; +import com.alipay.oceanbase.rpc.mutation.result.MutationResult; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType; import com.alipay.oceanbase.rpc.table.api.Table; -/* - * Put impl. - * Need fill all columns when use put impl. - * Server will do override when use put impl. - */ -public class Put extends InsertOrUpdate { +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; + +public class Put extends Mutation { + private LinkedHashSet columns = null; + private List values = null; + /* * default constructor */ public Put() { super(); - super.usePut(); + columns = new LinkedHashSet(); + values = new ArrayList(); } /* @@ -38,6 +49,126 @@ public Put() { */ public Put(Table client, String tableName) { super(client, tableName); - super.usePut(); + columns = new LinkedHashSet(); + values = new ArrayList(); + } + + /* + * set the Row Key of mutation with Row and keep scan range + */ + @Override + public Put setRowKey(Row rowKey) { + return setRowKeyOnly(rowKey); + } + + /* + * set the Row Key of mutation with ColumnValues and keep scan range + */ + @Override + public Put setRowKey(ColumnValue... rowKey) { + return setRowKeyOnly(rowKey); + } + + /* + * add filter into mutation (use QueryAndMutate) and scan range + */ + public Put setFilter(ObTableFilter filter) throws Exception { + return setFilterOnly(filter); + } + + /* + * get operation type + */ + public ObTableOperationType getOperationType() { + return ObTableOperationType.PUT; + } + + /* + * add mutated Row + */ + public Put addMutateRow(Row rows) { + if (null == rows) { + throw new IllegalArgumentException("Invalid null rowKey set into Put"); + } else if (0 == rows.getMap().size()) { + throw new IllegalArgumentException("input row key should not be empty"); + } + + // set mutate row into Put + for (Map.Entry entry : rows.getMap().entrySet()) { + columns.add(entry.getKey()); + values.add(entry.getValue()); + } + + return this; + } + + /* + * get the mutated columns' name + */ + public String[] getColumns() { + return columns.toArray(new String[columns.size()]); + } + + /* + * get the mutated columns' value + */ + public Object[] getValues() { + return values.toArray(); + } + + /* + * add mutated ColumnValues + */ + public Put addMutateColVal(ColumnValue... columnValues) throws Exception { + if (null == columnValues) { + throw new IllegalArgumentException("Invalid null columnValues set into Put"); + } + + // set mutate row into Put + for (ColumnValue columnValue : columnValues) { + if (columns.contains(columnValue.getColumnName())) { + throw new ObTableException("Duplicate column in Row Key"); + } + columns.add(columnValue.getColumnName()); + values.add(columnValue.getValue()); + } + + return this; + } + + /* + * Remove rowkey from mutateColval + */ + public Put removeRowkeyFromMutateColval() { + removeRowkeyFromMutateColval(this.columns, this.values, this.rowKeyNames); + return this; + } + + /* + * execute + */ + public MutationResult execute() throws Exception { + if (null == getTableName()) { + throw new ObTableException("table name is null"); + } else if (null == getClient()) { + throw new ObTableException("client is null"); + } + + if (null == getQuery()) { + // simple Put, without filter + return new MutationResult(((ObTableClient) getClient()).putWithResult(getTableName(), + getRowKey(), getKeyRanges(), columns.toArray(new String[columns.size()]), + values.toArray())); + } else { + if (checkMutationWithFilter()) { + // QueryAndPut + ObTableOperation operation = ObTableOperation.getInstance(ObTableOperationType.PUT, + getRowKey(), columns.toArray(new String[columns.size()]), values.toArray()); + return new MutationResult(((ObTableClient) getClient()).mutationWithFilter( + getQuery(), getRowKey(), getKeyRanges(), operation, true)); + } else { + throw new ObTableUnexpectedException("should set filter and scan range both"); + } + } } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java index f5dc3006..17bf89c8 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java @@ -26,6 +26,7 @@ import io.netty.buffer.ByteBuf; import java.sql.Timestamp; +import java.text.SimpleDateFormat; import java.time.*; import java.util.*; diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/AbstractObTableEntity.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/AbstractObTableEntity.java index 654d7ae6..61f0fb3c 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/AbstractObTableEntity.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/AbstractObTableEntity.java @@ -172,8 +172,11 @@ public byte[] encode() { String name = entry.getKey(); ObObj property = entry.getValue(); - int keyLen = Serialization.getNeedBytes(name); - System.arraycopy(Serialization.encodeVString(name), 0, bytes, idx, keyLen); + int keyLen = 0; + if (!isOnlyEncodeValue()) { + keyLen = Serialization.getNeedBytes(name); + System.arraycopy(Serialization.encodeVString(name), 0, bytes, idx, keyLen); + } idx += keyLen; byte[] objBytes = property.encode(); System.arraycopy(objBytes, 0, bytes, idx, objBytes.length); @@ -229,7 +232,9 @@ public long getPayloadContentSize() { size += this.getRowKeyValue(i).getEncodedSize(); } for (Map.Entry entry : this.getProperties().entrySet()) { - size += Serialization.getNeedBytes(entry.getKey()); + if (!isOnlyEncodeValue()) { + size += Serialization.getNeedBytes(entry.getKey()); + } size += entry.getValue().getEncodedSize(); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObITableEntity.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObITableEntity.java index 7344a030..e2102401 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObITableEntity.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObITableEntity.java @@ -198,4 +198,7 @@ public interface ObITableEntity extends ObPayload { */ long getPropertiesCount(); + void setOnlyEncodeValue(boolean onlyEncodeValue); + + boolean isOnlyEncodeValue(); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperation.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperation.java index 495ffde2..90ecb528 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperation.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableBatchOperation.java @@ -33,10 +33,10 @@ */ public class ObTableBatchOperation extends AbstractPayload { + private boolean isSamePropertiesNames; private List tableOperations = new ArrayList(); private boolean isReadOnly = true; private boolean isSameType = true; - private boolean isSamePropertiesNames; /* * Encode. @@ -52,25 +52,31 @@ public byte[] encode() { idx, headerLen); idx += headerLen; - // 1. encode Operation + // 1. encode isSamePropertiesNames + System.arraycopy(Serialization.encodeI8(isSamePropertiesNames ? (byte) 1 : (byte) 0), 0, + bytes, idx, 1); + idx++; + + // 2. encode Operation int len = Serialization.getNeedBytes(tableOperations.size()); System.arraycopy(Serialization.encodeVi64(tableOperations.size()), 0, bytes, idx, len); idx += len; - for (ObTableOperation tableOperation : tableOperations) { - len = (int) tableOperation.getPayloadSize(); - System.arraycopy(tableOperation.encode(), 0, bytes, idx, len); + for (int i = 0; i < tableOperations.size(); i++) { + ObTableOperation operation = tableOperations.get(i); + if (i != 0 && isSamePropertiesNames) { + operation.getEntity().setOnlyEncodeValue(true); + } + len = (int) operation.getPayloadSize(); + System.arraycopy(operation.encode(), 0, bytes, idx, len); idx += len; } - // 2. encode others + // 3. encode others System .arraycopy(Serialization.encodeI8(isReadOnly ? (byte) 1 : (byte) 0), 0, bytes, idx, 1); idx++; System .arraycopy(Serialization.encodeI8(isSameType ? (byte) 1 : (byte) 0), 0, bytes, idx, 1); - idx++; - System.arraycopy(Serialization.encodeI8(isSamePropertiesNames ? (byte) 1 : (byte) 0), 0, - bytes, idx, 1); return bytes; } @@ -106,12 +112,19 @@ public Object decode(ByteBuf buf) { @Override public long getPayloadContentSize() { long payloadContentSize = 0; + + payloadContentSize += 1; // isSamePropertiesNames + payloadContentSize += Serialization.getNeedBytes(tableOperations.size()); - for (ObTableOperation operation : tableOperations) { + for (int i = 0; i < tableOperations.size(); i++) { + ObTableOperation operation = tableOperations.get(i); + if (i != 0 && isSamePropertiesNames) { + operation.getEntity().setOnlyEncodeValue(true); + } payloadContentSize += operation.getPayloadSize(); } - return payloadContentSize + 3; + return payloadContentSize + 2; } /* diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableEntity.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableEntity.java index 47339289..f092382e 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableEntity.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableEntity.java @@ -28,6 +28,16 @@ public class ObTableEntity extends AbstractObTableEntity { private ObRowKey rowKey = new ObRowKey(); private Map properties = new HashMap(); + private boolean onlyEncodeValue; + + public void setOnlyEncodeValue(boolean onlyEncodeValue) { + this.onlyEncodeValue = onlyEncodeValue; + } + + public boolean isOnlyEncodeValue() { + return onlyEncodeValue; + } + /* * Set row key value. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOperationType.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOperationType.java index 0ab0a969..cd654ab0 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOperationType.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOperationType.java @@ -26,7 +26,10 @@ public enum ObTableOperationType { INSERT_OR_UPDATE(4), // INSERT or UPDATE, columns not in arguments will remain unchanged REPLACE(5), // DELETE & INSERT, columns not in arguments will change to default value INCREMENT(6), // the column must be can be cast to long. if exist increase, else insert - APPEND(7);// append column value + APPEND(7), // append column value + SCAN(8), // scan value + TTL(9), // expire value + PUT(10);// put value private int value; private static Map map = new HashMap(); diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/AbstractTableBatchOps.java b/src/main/java/com/alipay/oceanbase/rpc/table/AbstractTableBatchOps.java index bf9bfaca..2896e5ad 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/AbstractTableBatchOps.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/AbstractTableBatchOps.java @@ -26,8 +26,18 @@ public abstract class AbstractTableBatchOps implements TableBatchOps { protected boolean atomicOperation; + protected boolean isSamePropertiesNames; + protected ObTableEntityType entityType = ObTableEntityType.DYNAMIC; + public boolean isSamePropertiesNames() { + return isSamePropertiesNames; + } + + public void setSamePropertiesNames(boolean samePropertiesNames) { + isSamePropertiesNames = samePropertiesNames; + } + /** * Get. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableBatchOpsImpl.java index 29d7a49d..edb31373 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableBatchOpsImpl.java @@ -82,6 +82,14 @@ public void insert(Object[] rowkeys, String[] columns, Object[] values) { addObTableOperation(ObTableOperationType.INSERT, rowkeys, columns, values); } + /* + * Put. + */ + @Override + public void put(Object[] rowkeys, String[] columns, Object[] values) { + addObTableOperation(ObTableOperationType.PUT, rowkeys, columns, values); + } + /* * Replace. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java index 756b4ee9..4e920a11 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java @@ -108,6 +108,14 @@ public void insert(Object[] rowkeys, String[] columns, Object[] values) { addObTableClientOperation(ObTableOperationType.INSERT, rowkeys, columns, values); } + /* + * Put. + */ + @Override + public void put(Object[] rowkeys, String[] columns, Object[] values) { + addObTableClientOperation(ObTableOperationType.PUT, rowkeys, columns, values); + } + /* * Replace. */ @@ -193,6 +201,7 @@ public List executeWithResult() throws Exception { case REPLACE: case INCREMENT: case APPEND: + case PUT: results.add(new MutationResult(result)); break; default: @@ -227,6 +236,7 @@ public Map>>> return partitionOperationsMap; } + boolean has_put = false; for (int i = 0; i < operations.size(); i++) { ObTableOperation operation = operations.get(i); ObRowKey rowKeyObject = operation.getEntity().getRowKey(); @@ -235,6 +245,9 @@ public Map>>> for (int j = 0; j < rowKeySize; j++) { rowKey[j] = rowKeyObject.getObj(j).getValue(); } + if (!has_put && operation.getOperationType() == ObTableOperationType.PUT) { + has_put = true; + } ObPair tableObPair = obTableClient.getTable(tableName, rowKey, false, false, obTableClient.getRoute(batchOperation.isReadOnly())); ObPair>> obTableOperations = partitionOperationsMap @@ -247,7 +260,8 @@ public Map>>> obTableOperations.getRight().add(new ObPair(i, operation)); } - if (atomicOperation) { + + if (atomicOperation && !has_put) { if (partitionOperationsMap.size() > 1) { throw new ObTablePartitionConsistentException( "require atomic operation but found across partition may cause consistent problem "); @@ -290,6 +304,7 @@ public void partitionExecute(ObTableOperationResult[] results, .toObTableConsistencyLevel()); } subRequest.setBatchOperationAsAtomic(isAtomicOperation()); + subRequest.getBatchOperation().setSamePropertiesNames(isSamePropertiesNames()); ObTableBatchOperationResult subObTableBatchOperationResult; boolean needRefreshTableEntry = false; diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/api/TableBatchOps.java b/src/main/java/com/alipay/oceanbase/rpc/table/api/TableBatchOps.java index 0ab134cb..ff8ecd9f 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/api/TableBatchOps.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/api/TableBatchOps.java @@ -32,6 +32,10 @@ public interface TableBatchOps { boolean isAtomicOperation(); + boolean isSamePropertiesNames(); + + void setSamePropertiesNames(boolean samePropertiesNames); + void setEntityType(ObTableEntityType entityType); ObTableEntityType getEntityType(); @@ -52,6 +56,8 @@ public interface TableBatchOps { void insert(Object[] rowkeys, String[] columns, Object[] values); + void put(Object[] rowkeys, String[] columns, Object[] values); + void replace(Object rowkey, String[] columns, Object[] values); void replace(Object[] rowkeys, String[] columns, Object[] values); diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java new file mode 100644 index 00000000..7bfbcfe1 --- /dev/null +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableBatchPutTest.java @@ -0,0 +1,111 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2023 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc; + +import com.alipay.oceanbase.rpc.mutation.BatchOperation; +import com.alipay.oceanbase.rpc.mutation.Put; +import com.alipay.oceanbase.rpc.mutation.Row; +import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult; +import com.alipay.oceanbase.rpc.util.ObTableClientTestUtil; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Date; +import java.sql.Timestamp; + +import static com.alipay.oceanbase.rpc.mutation.MutationFactory.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ObTableBatchPutTest { + public static String PARAM_URL = "configer-server url"; // http://ip:port/services?User_ID=alibaba&UID=test&Action=ObRootServiceInfo&ObCluster=xxx&database=xxx + public static String FULL_USER_NAME = "userName@tenantName@clusterName"; + public static String PASSWORD = "password for userName"; + public static String PROXY_SYS_USER_NAME = "root"; + public static String PROXY_SYS_USER_PASSWORD = ""; + + ObTableClient client = new ObTableClient(); + public static String tableName = "batch_put"; + + @Before + public void setup() throws Exception { + client.setFullUserName(FULL_USER_NAME); + client.setParamURL(PARAM_URL); + client.setPassword(PASSWORD); + client.setSysUserName(PROXY_SYS_USER_NAME); + client.setSysPassword(PROXY_SYS_USER_PASSWORD); + client.init(); + } + + /* + CREATE TABLE IF NOT EXISTS `batch_put` ( + `id` varchar(20) NOT NULL, + `b_1` varchar(32) DEFAULT NULL, + `t_1` datetime(3) NOT NULL, + `t_2` timestamp(3) DEFAULT NULL, + `t_3` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), + `c_1` bigint(20) default NULL, + PRIMARY KEY(`id`, `t_1`)) partition by range columns(`t_1`) subpartition by key(`id`) subpartition template ( + subpartition `p0`, + subpartition `p1`, + subpartition `p2`, + subpartition `p3`, + subpartition `p4`, + subpartition `p5`, + subpartition `p6`, + subpartition `p7`, + subpartition `p8`, + subpartition `p9`, + subpartition `p10`) + (partition `p0` values less than ('2023-12-01 00:00:00'), + partition `p1` values less than ('2023-12-10 00:00:00'), + partition `p2` values less than ('2023-12-20 00:00:00'), + partition `p3` values less than ('2023-12-30 00:00:00'), + partition `p4` values less than ('2024-01-01 00:00:00')); + */ + @Test + public void test_batch_put() throws Exception { + try { + // 当设置setSamePropertiesNames(true)时,表中所有的列都必须填充值 + BatchOperation batchOperation = client.batchOperation(tableName).setIsAtomic(true) + .setSamePropertiesNames(true); + long batchSize = 50; + for (long i = 0; i < batchSize; i++) { + // 清除毫秒部分,仅保留到秒 + long timeInSeconds = (System.currentTimeMillis() / 1000) * 1000; + Timestamp ts = new Timestamp(timeInSeconds); + java.util.Date date = new Date(timeInSeconds); + Row rowKey = new Row(colVal("id", String.valueOf(i)), // `id` varchar(20) + colVal("t_1", date)); // `t_1` varchar(20) + Put putOp = put().setRowKey(rowKey) + .addMutateColVal(colVal("b_1", String.valueOf(i))) // `b_1` varchar(32) + .addMutateColVal(colVal("t_2", ts)) // `t_2` timestamp(3) + .addMutateColVal(colVal("t_3", ts)) // `t_3` timestamp(3) + .addMutateColVal(colVal("c_1", i)); // `c_1` bigint(20) + batchOperation.addOperation(putOp); + } + BatchOperationResult result = batchOperation.setIsAtomic(true).execute(); + assertEquals(batchSize, result.size()); + assertEquals(0, result.getWrongCount()); + } catch (Exception e) { + e.printStackTrace(); + assertTrue(false); + } finally { + } + } +} diff --git a/src/test/resources/ci.sql b/src/test/resources/ci.sql index 43f39592..bd12dd5b 100644 --- a/src/test/resources/ci.sql +++ b/src/test/resources/ci.sql @@ -329,4 +329,29 @@ CREATE TABLE IF NOT EXISTS `sync_item` ( DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_general_ci PARTITION BY KEY(`uid`) PARTITIONS 32; +CREATE TABLE IF NOT EXISTS `batch_put` ( + `id` varchar(20) NOT NULL, + `b_1` varchar(32) DEFAULT NULL, + `t_1` datetime(3) NOT NULL, + `t_2` timestamp(3) DEFAULT NULL, + `t_3` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), + `c_1` bigint(20) default NULL, + PRIMARY KEY(`id`, `t_1`)) partition by range columns(`t_1`) subpartition by key(`id`) subpartition template ( + subpartition `p0`, + subpartition `p1`, + subpartition `p2`, + subpartition `p3`, + subpartition `p4`, + subpartition `p5`, + subpartition `p6`, + subpartition `p7`, + subpartition `p8`, + subpartition `p9`, + subpartition `p10`) + (partition `p0` values less than ('2023-12-01 00:00:00'), + partition `p1` values less than ('2023-12-10 00:00:00'), + partition `p2` values less than ('2023-12-20 00:00:00'), + partition `p3` values less than ('2023-12-30 00:00:00'), + partition `p4` values less than ('2024-01-01 00:00:00')); + alter system set kv_hotkey_throttle_threshold = 50;