Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public Result get(Get get, HBaseTransaction transaction) throws IOException {
get.setAttribute(CellUtils.TRANSACTION_ATTRIBUTE, getBuilder(transaction).build().toByteArray());
get.setAttribute(CellUtils.CLIENT_GET_ATTRIBUTE, Bytes.toBytes(true));
get.setAttribute(CellUtils.LL_ATTRIBUTE, Bytes.toBytes(transaction.isLowLatency()));
get.setAttribute(CellUtils.ROW_LEVEL_CONFLICTS_ATTRIBUTE,
Bytes.toBytes(transaction.getConflictDetectionLevel() == HBaseTransactionManager.ConflictDetectionLevel.ROW));

return table.get(get);
}
Expand All @@ -55,6 +57,8 @@ public Result get(Get get, HBaseTransaction transaction) throws IOException {
public ResultScanner getScanner(Scan scan, HBaseTransaction transaction) throws IOException {
scan.setAttribute(CellUtils.TRANSACTION_ATTRIBUTE, getBuilder(transaction).build().toByteArray());
scan.setAttribute(CellUtils.LL_ATTRIBUTE, Bytes.toBytes(transaction.isLowLatency()));
scan.setAttribute(CellUtils.ROW_LEVEL_CONFLICTS_ATTRIBUTE,
Bytes.toBytes(transaction.getConflictDetectionLevel() == HBaseTransactionManager.ConflictDetectionLevel.ROW));
return table.getScanner(scan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,29 @@
package org.apache.omid.transaction;

import static com.google.common.base.Charsets.UTF_8;
import static org.apache.omid.transaction.CellUtils.SHARED_FAMILY_QUALIFIER;

import com.google.common.base.Charsets;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.tso.client.CellId;

import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;

import java.util.Arrays;
import java.util.Objects;

public class HBaseCellId implements CellId {



private final TTable table;
private final byte[] row;
private final byte[] family;
private final byte[] qualifier;
private long timestamp;

public HBaseCellId(TTable table, byte[] row, byte[] family, byte[] qualifier, long timestamp) {
private HBaseCellId(TTable table, byte[] row, byte[] family, byte[] qualifier, long timestamp) {
this.timestamp = timestamp;
this.table = table;
this.row = row;
Expand Down Expand Up @@ -62,7 +70,7 @@ public long getTimestamp() {

@Override
public String toString() {
return new String(table.getTableName(), UTF_8)
return (table == null?"NO_TABLE":new String(table.getTableName(), UTF_8))
+ ":" + new String(row, UTF_8)
+ ":" + new String(family, UTF_8)
+ ":" + new String(qualifier, UTF_8)
Expand Down Expand Up @@ -97,4 +105,66 @@ public long getRowId() {
public static Hasher getHasher() {
return Hashing.murmur3_128().newHasher();
}

@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(table.getTableName()),
Arrays.hashCode(row),
Arrays.hashCode(family),
Arrays.hashCode(qualifier));
}

@Override
public boolean equals(Object o) {
if (o == this) return true;
assert(o instanceof HBaseCellId);
HBaseCellId other = (HBaseCellId) o;

return Arrays.equals(table.getTableName(), other.getTable().getTableName()) &&
Arrays.equals(row, other.getRow()) &&
Arrays.equals(family, other.getFamily()) &&
Arrays.equals(qualifier, other.getQualifier());
}


public int compareTo(CellId o) {
assert(o instanceof HBaseCellId);
HBaseCellId other = (HBaseCellId) o;

int tableCompare = Bytes.compareTo(table.getTableName(), other.getTable().getTableName());
if (tableCompare != 0) {
return tableCompare;
} else {
int rowCompare = Bytes.compareTo(row, other.getRow());
if (rowCompare != 0) {
return rowCompare;
} else {
int familyCompare = Bytes.compareTo(family, other.getFamily());
if (familyCompare != 0) {
return familyCompare;
} else {
return Bytes.compareTo(qualifier, other.getQualifier());
}
}
}
}
public static HBaseCellId valueOf(Transaction tx, TTable table, byte[] row, byte[] family, byte[] qualifier, long timestamp) {
if (((HBaseTransaction)tx).getConflictDetectionLevel() == HBaseTransactionManager.ConflictDetectionLevel.ROW) {
return new HBaseCellId(table, row, family, SHARED_FAMILY_QUALIFIER, timestamp);
} else {
return new HBaseCellId(table, row, family, qualifier, timestamp);
}
}

public static HBaseCellId valueOf(HBaseTransactionManager.ConflictDetectionLevel conflictDetectionLevel,
TTable table, byte[] row, byte[] family, byte[] qualifier, long timestamp) {
if (conflictDetectionLevel == HBaseTransactionManager.ConflictDetectionLevel.ROW) {
return new HBaseCellId(table, row, family, SHARED_FAMILY_QUALIFIER, timestamp);
} else {
return new HBaseCellId(table, row, family, qualifier, timestamp);
}
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.omid.YAMLUtils;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.tools.hbase.SecureHBaseConfig;
import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;

import org.apache.omid.tso.client.OmidClientConfiguration.PostCommitMode;
import org.apache.omid.tso.client.OmidClientConfiguration;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -91,14 +91,6 @@ public void setPostCommitMode(PostCommitMode postCommitMode) {
omidClientConfiguration.setPostCommitMode(postCommitMode);
}

public ConflictDetectionLevel getConflictAnalysisLevel() {
return omidClientConfiguration.getConflictAnalysisLevel();
}

public void setConflictAnalysisLevel(ConflictDetectionLevel conflictAnalysisLevel) {
omidClientConfiguration.setConflictAnalysisLevel(conflictAnalysisLevel);
}

public String getCommitTableName() {
return commitTableName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,76 @@
import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.omid.transaction.HBaseTransactionManager.ConflictDetectionLevel.ROW;

public class HBaseTransaction extends AbstractTransaction<HBaseCellId> {
private static final Logger LOG = LoggerFactory.getLogger(HBaseTransaction.class);
private final HBaseTransactionManager.ConflictDetectionLevel conflictDetectionLevel;

public HBaseTransaction(long transactionId, long epoch, Set<HBaseCellId> writeSet,
Set<HBaseCellId> conflictFreeWriteSet, AbstractTransactionManager tm, boolean isLowLatency) {
super(transactionId, epoch, writeSet, conflictFreeWriteSet, tm, isLowLatency);
this.conflictDetectionLevel = ((HBaseTransactionManager)tm).getConflictDetectionLevel();
}

public HBaseTransaction(long transactionId, long epoch, Set<HBaseCellId> writeSet,
Set<HBaseCellId> conflictFreeWriteSet, AbstractTransactionManager tm,
long readTimestamp, long writeTimestamp, boolean isLowLatency) {
super(transactionId, epoch, writeSet, conflictFreeWriteSet, tm, readTimestamp, writeTimestamp, isLowLatency);
this.conflictDetectionLevel = ((HBaseTransactionManager)tm).getConflictDetectionLevel();
}

public HBaseTransaction(long transactionId, long readTimestamp, VisibilityLevel visibilityLevel, long epoch,
Set<HBaseCellId> writeSet, Set<HBaseCellId> conflictFreeWriteSet,
AbstractTransactionManager tm, boolean isLowLatency) {
super(transactionId, readTimestamp, visibilityLevel, epoch, writeSet, conflictFreeWriteSet, tm, isLowLatency);
this.conflictDetectionLevel = ((HBaseTransactionManager)tm).getConflictDetectionLevel();
}

public HBaseTransaction(long transactionId, long readTimestamp, VisibilityLevel visibilityLevel, long epoch,
Set<HBaseCellId> writeSet, Set<HBaseCellId> conflictFreeWriteSet,
HBaseTransactionManager.ConflictDetectionLevel conflictDetectionLevel, boolean isLowLatency) {
super(transactionId, readTimestamp, visibilityLevel, epoch, writeSet, conflictFreeWriteSet, null, isLowLatency);
this.conflictDetectionLevel = conflictDetectionLevel;
}


private void deleteCell(HBaseCellId cell) {
Delete delete = new Delete(cell.getRow());
delete.addColumn(cell.getFamily(), cell.getQualifier(), cell.getTimestamp());
try {
cell.getTable().getHTable().delete(delete);
} catch (IOException e) {
LOG.warn("Failed cleanup cell {} for Tx {}. This issue has been ignored", cell, getTransactionId(), e);

if (conflictDetectionLevel == ROW) {
//cell only has family info so we need to query the table to get list of cells.
Get get = new Get(cell.getRow());
get.addFamily(cell.getFamily());
try {
get.setTimeStamp(cell.getTimestamp());
Result res = cell.getTable().getHTable().get(get);
Delete delete = new Delete(cell.getRow());
for (Cell rawCell: res.listCells()) {
delete.addColumn(cell.getFamily(),
Bytes.copy(rawCell.getQualifierArray(),rawCell.getQualifierOffset(),rawCell.getQualifierLength()),
cell.getTimestamp());
}
cell.getTable().getHTable().delete(delete);
} catch (IOException e) {
LOG.warn("Failed cleanup cells in family {} for Tx {}. This issue has been ignored",
cell.getFamily(), getTransactionId(), e);
}
} else {
Delete delete = new Delete(cell.getRow());
delete.addColumn(cell.getFamily(), cell.getQualifier(), cell.getTimestamp());
try {
cell.getTable().getHTable().delete(delete);
} catch (IOException e) {
LOG.warn("Failed cleanup cell {} for Tx {}. This issue has been ignored", cell, getTransactionId(), e);
}
}
}
@Override
Expand Down Expand Up @@ -99,4 +138,8 @@ private Set<TTable> getWrittenTables() {
return tables;
}

public HBaseTransactionManager.ConflictDetectionLevel getConflictDetectionLevel() {
return conflictDetectionLevel;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
import org.apache.omid.tools.hbase.HBaseLogin;
import org.apache.omid.tso.client.CellId;
import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;

import org.apache.omid.tso.client.TSOClient;
import org.apache.omid.tso.client.TSOProtocol;
import org.slf4j.Logger;
Expand All @@ -47,6 +47,11 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen

private static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionManager.class);


public enum ConflictDetectionLevel {CELL, ROW}

private final ConflictDetectionLevel conflictLevel;

private static class HBaseTransactionFactory implements TransactionFactory<HBaseCellId> {

@Override
Expand Down Expand Up @@ -84,6 +89,7 @@ public static class Builder {
private Optional<CommitTable.Client> commitTableClient = Optional.absent();
private Optional<CommitTable.Writer> commitTableWriter = Optional.absent();
private Optional<PostCommitActions> postCommitter = Optional.absent();
private ConflictDetectionLevel conflictDetectionLevel = ConflictDetectionLevel.CELL;

public Builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
this.hbaseOmidClientConf = hbaseOmidClientConf;
Expand All @@ -109,6 +115,11 @@ Builder postCommitter(PostCommitActions postCommitter) {
return this;
}

Builder conflictDetectionLevel(ConflictDetectionLevel conflictDetectionLevel) {
this.conflictDetectionLevel = conflictDetectionLevel;
return this;
}

public HBaseTransactionManager build() throws IOException, InterruptedException {

CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient()).get();
Expand All @@ -121,7 +132,8 @@ public HBaseTransactionManager build() throws IOException, InterruptedException
tsoClient,
commitTableClient,
commitTableWriter,
new HBaseTransactionFactory());
new HBaseTransactionFactory(),
conflictDetectionLevel);
}

private Optional<TSOProtocol> buildTSOClient() throws IOException, InterruptedException {
Expand Down Expand Up @@ -175,14 +187,16 @@ private HBaseTransactionManager(HBaseOmidClientConfiguration hBaseOmidClientConf
TSOProtocol tsoClient,
CommitTable.Client commitTableClient,
CommitTable.Writer commitTableWriter,
HBaseTransactionFactory hBaseTransactionFactory) {
HBaseTransactionFactory hBaseTransactionFactory,
ConflictDetectionLevel conflictLevel) {

super(hBaseOmidClientConfiguration.getMetrics(),
postCommitter,
tsoClient,
commitTableClient,
commitTableWriter,
hBaseTransactionFactory);
this.conflictLevel = conflictLevel;
}

// ----------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -243,12 +257,9 @@ static HBaseTransaction enforceHBaseTransactionAsParam(AbstractTransaction<? ext

}

public void setConflictDetectionLevel(ConflictDetectionLevel conflictDetectionLevel) {
tsoClient.setConflictDetectionLevel(conflictDetectionLevel);
}

public ConflictDetectionLevel getConflictDetectionLevel() {
return tsoClient.getConflictDetectionLevel();
return conflictLevel;
}

static class CommitTimestampLocatorImpl implements CommitTimestampLocator {
Expand Down
Loading