Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public void insert(Object[] rowkeys, String[] columns, Object[] values) {
tableBatchOps.insert(rowkeys, columns, values);
}

@Override
public void put(Object[] rowkeys, String[] columns, Object[] values) {
tableBatchOps.put(rowkeys, columns, values);
}

/*
* Replace.
*/
Expand Down Expand Up @@ -167,4 +172,14 @@ public void setAtomicOperation(boolean atomicOperation) {
super.setAtomicOperation(atomicOperation);
tableBatchOps.setAtomicOperation(atomicOperation);
}

public boolean isSamePropertiesNames() {
return super.isSamePropertiesNames();
}

public void setSamePropertiesNames(boolean samePropertiesNames) {
super.setSamePropertiesNames(samePropertiesNames);
tableBatchOps.setSamePropertiesNames(samePropertiesNames);
}

}
41 changes: 41 additions & 0 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1786,6 +1786,47 @@ public ObPayload execute(ObPair<Long, ObTableParam> obPair) throws Exception {
});
}

/**
* put with result
* @param tableName which table to put
* @param rowKey insert row key
* @param keyRanges scan range
* @param columns columns name to put
* @param values new values
* @return execute result
* @throws Exception exception
*/
public ObPayload putWithResult(final String tableName, final Object[] rowKey,
final List<ObNewRange> keyRanges, final String[] columns,
final Object[] values) throws Exception {
final long start = System.currentTimeMillis();
return executeMutation(tableName,
new MutationExecuteCallback<ObPayload>(rowKey, keyRanges) {
/**
* Execute.
*/
@Override
public ObPayload execute(ObPair<Long, ObTableParam> obPair) throws Exception {
long TableTime = System.currentTimeMillis();
ObTableParam tableParam = obPair.getRight();
ObTable obTable = tableParam.getObTable();
ObTableOperationRequest request = ObTableOperationRequest.getInstance(
tableName, PUT, rowKey, columns, values,
obTable.getObTableOperationTimeout());
request.setTableId(tableParam.getTableId());
// partId/tabletId
request.setPartitionId(tableParam.getPartitionId());
ObPayload result = obTable.execute(request);
String endpoint = obTable.getIp() + ":" + obTable.getPort();
MonitorUtil.info(request, database, tableName, "PUT", endpoint, rowKey,
(ObTableOperationResult) result, TableTime - start,
System.currentTimeMillis() - TableTime, getslowQueryMonitorThreshold());
checkResult(obTable.getIp(), obTable.getPort(), request, result);
return result;
}
});
}

/**
* Replace.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ public class BatchOperation {
private Table client;
boolean withResult;
private List<Object> operations;
boolean isAtomic = false;
boolean isAtomic = false;
boolean isSamePropertiesNames = false;

public BatchOperation setSamePropertiesNames(boolean samePropertiesNames) {
isSamePropertiesNames = samePropertiesNames;
return this;
}

/*
* default constructor
Expand Down Expand Up @@ -126,6 +132,11 @@ public BatchOperationResult execute() throws Exception {
batchOps.insert(mutation.getRowKey(), ((Insert) mutation).getColumns(),
((Insert) mutation).getValues());
break;
case PUT:
((Put) mutation).removeRowkeyFromMutateColval();
batchOps.put(mutation.getRowKey(), ((Put) mutation).getColumns(),
((Put) mutation).getValues());
break;
case DEL:
batchOps.delete(mutation.getRowKey());
break;
Expand Down Expand Up @@ -168,6 +179,7 @@ public BatchOperationResult execute() throws Exception {
}
}
batchOps.setAtomicOperation(isAtomic);
batchOps.setSamePropertiesNames(isSamePropertiesNames);
return new BatchOperationResult(batchOps.executeWithResult());
}
}
19 changes: 16 additions & 3 deletions src/main/java/com/alipay/oceanbase/rpc/mutation/Mutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import com.alipay.oceanbase.rpc.table.api.Table;
import com.alipay.oceanbase.rpc.table.api.TableQuery;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.*;

public class Mutation<T> {
private String tableName;
Expand Down Expand Up @@ -440,4 +438,19 @@ static void removeRowkeyFromMutateColval(List<String> columns, List<Object> valu
}
}
}

static void removeRowkeyFromMutateColval(LinkedHashSet<String> columns, List<Object> values,
List<String> rowKeyNames) {
if (null == columns || null == rowKeyNames || columns.size() != values.size()) {
return;
}
Iterator<String> itr = columns.iterator();
for (int i = values.size() - 1; i >= 0 && itr.hasNext(); --i) {
String column = itr.next();
if (rowKeyNames.contains(column)) {
itr.remove();
values.remove(i);
}
}
}
}
147 changes: 139 additions & 8 deletions src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,158 @@

package com.alipay.oceanbase.rpc.mutation;

import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.exception.ObTableException;
import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException;
import com.alipay.oceanbase.rpc.filter.ObTableFilter;
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType;
import com.alipay.oceanbase.rpc.table.api.Table;

/*
* Put impl.
* Need fill all columns when use put impl.
* Server will do override when use put impl.
*/
public class Put extends InsertOrUpdate {
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;

public class Put extends Mutation<Put> {
private LinkedHashSet<String> columns = null;
private List<Object> values = null;

/*
* default constructor
*/
public Put() {
super();
super.usePut();
columns = new LinkedHashSet<String>();
values = new ArrayList<Object>();
}

/*
* construct with ObTableClient and String
*/
public Put(Table client, String tableName) {
super(client, tableName);
super.usePut();
columns = new LinkedHashSet<String>();
values = new ArrayList<Object>();
}

/*
* set the Row Key of mutation with Row and keep scan range
*/
@Override
public Put setRowKey(Row rowKey) {
return setRowKeyOnly(rowKey);
}

/*
* set the Row Key of mutation with ColumnValues and keep scan range
*/
@Override
public Put setRowKey(ColumnValue... rowKey) {
return setRowKeyOnly(rowKey);
}

/*
* add filter into mutation (use QueryAndMutate) and scan range
*/
public Put setFilter(ObTableFilter filter) throws Exception {
return setFilterOnly(filter);
}

/*
* get operation type
*/
public ObTableOperationType getOperationType() {
return ObTableOperationType.PUT;
}

/*
* add mutated Row
*/
public Put addMutateRow(Row rows) {
if (null == rows) {
throw new IllegalArgumentException("Invalid null rowKey set into Put");
} else if (0 == rows.getMap().size()) {
throw new IllegalArgumentException("input row key should not be empty");
}

// set mutate row into Put
for (Map.Entry<String, Object> entry : rows.getMap().entrySet()) {
columns.add(entry.getKey());
values.add(entry.getValue());
}

return this;
}

/*
* get the mutated columns' name
*/
public String[] getColumns() {
return columns.toArray(new String[columns.size()]);
}

/*
* get the mutated columns' value
*/
public Object[] getValues() {
return values.toArray();
}

/*
* add mutated ColumnValues
*/
public Put addMutateColVal(ColumnValue... columnValues) throws Exception {
if (null == columnValues) {
throw new IllegalArgumentException("Invalid null columnValues set into Put");
}

// set mutate row into Put
for (ColumnValue columnValue : columnValues) {
if (columns.contains(columnValue.getColumnName())) {
throw new ObTableException("Duplicate column in Row Key");
}
columns.add(columnValue.getColumnName());
values.add(columnValue.getValue());
}

return this;
}

/*
* Remove rowkey from mutateColval
*/
public Put removeRowkeyFromMutateColval() {
removeRowkeyFromMutateColval(this.columns, this.values, this.rowKeyNames);
return this;
}

/*
* execute
*/
public MutationResult execute() throws Exception {
if (null == getTableName()) {
throw new ObTableException("table name is null");
} else if (null == getClient()) {
throw new ObTableException("client is null");
}

if (null == getQuery()) {
// simple Put, without filter
return new MutationResult(((ObTableClient) getClient()).putWithResult(getTableName(),
getRowKey(), getKeyRanges(), columns.toArray(new String[columns.size()]),
values.toArray()));
} else {
if (checkMutationWithFilter()) {
// QueryAndPut
ObTableOperation operation = ObTableOperation.getInstance(ObTableOperationType.PUT,
getRowKey(), columns.toArray(new String[columns.size()]), values.toArray());
return new MutationResult(((ObTableClient) getClient()).mutationWithFilter(
getQuery(), getRowKey(), getKeyRanges(), operation, true));
} else {
throw new ObTableUnexpectedException("should set filter and scan range both");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.netty.buffer.ByteBuf;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.time.*;
import java.util.*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,11 @@ public byte[] encode() {
String name = entry.getKey();
ObObj property = entry.getValue();

int keyLen = Serialization.getNeedBytes(name);
System.arraycopy(Serialization.encodeVString(name), 0, bytes, idx, keyLen);
int keyLen = 0;
if (!isOnlyEncodeValue()) {
keyLen = Serialization.getNeedBytes(name);
System.arraycopy(Serialization.encodeVString(name), 0, bytes, idx, keyLen);
}
idx += keyLen;
byte[] objBytes = property.encode();
System.arraycopy(objBytes, 0, bytes, idx, objBytes.length);
Expand Down Expand Up @@ -229,7 +232,9 @@ public long getPayloadContentSize() {
size += this.getRowKeyValue(i).getEncodedSize();
}
for (Map.Entry<String, ObObj> entry : this.getProperties().entrySet()) {
size += Serialization.getNeedBytes(entry.getKey());
if (!isOnlyEncodeValue()) {
size += Serialization.getNeedBytes(entry.getKey());
}
size += entry.getValue().getEncodedSize();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,7 @@ public interface ObITableEntity extends ObPayload {
*/
long getPropertiesCount();

void setOnlyEncodeValue(boolean onlyEncodeValue);

boolean isOnlyEncodeValue();
}
Loading