Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.List;

public interface CommitTable {

Expand All @@ -46,6 +47,11 @@ interface Writer extends Closeable {
* Allows to clean the write's current buffer. It is required for HA
*/
void clearWriteBuffer();

/**
* Add commited transaction while checking if invalidated by other client
*/
boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException;
}

interface Client extends Closeable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ public void clearWriteBuffer() {
table.clear();
}

@Override
public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
// In this implementation, we use only one location that represents
// both the value and the invalidation. Therefore, putIfAbsent is
// required to make sure the entry was not invalidated.
return (table.putIfAbsent(startTimestamp, commitTimestamp) == null);
}

@Override
public void close() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public void clearWriteBuffer() {
// noop
}

@Override
public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
return true;
}

@Override
public void flush() throws IOException {
// noop
Expand Down
1 change: 1 addition & 0 deletions common/src/main/proto/TSOProto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,5 @@ message HandshakeRequest {
message HandshakeResponse {
optional bool clientCompatible = 1;
optional Capabilities serverCapabilities = 2;
optional bool lowLatency = 3[default= false];
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public static class Builder {
// Optional parameters - initialized to default values
private Optional<TSOProtocol> tsoClient = Optional.absent();
private Optional<CommitTable.Client> commitTableClient = Optional.absent();
private Optional<CommitTable.Writer> commitTableWriter = Optional.absent();
private Optional<PostCommitActions> postCommitter = Optional.absent();

public Builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
Expand All @@ -106,13 +107,15 @@ public Builder postCommitter(PostCommitActions postCommitter) {
public HBaseTransactionManager build() throws IOException, InterruptedException {

CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient()).get();
CommitTable.Writer commitTableWriter = this.commitTableWriter.or(buildCommitTableWriter()).get();
PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient)).get();
TSOProtocol tsoClient = this.tsoClient.or(buildTSOClient()).get();

return new HBaseTransactionManager(hbaseOmidClientConf,
postCommitter,
tsoClient,
commitTableClient,
commitTableWriter,
new HBaseTransactionFactory());
}

Expand All @@ -128,6 +131,13 @@ private Optional<CommitTable.Client> buildCommitTableClient() throws IOException
return Optional.of(commitTable.getClient());
}

private Optional<CommitTable.Writer> buildCommitTableWriter() throws IOException {
HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
CommitTable commitTable = new HBaseCommitTable(hbaseOmidClientConf.getHBaseConfiguration(), commitTableConf);
return Optional.of(commitTable.getWriter());
}

private Optional<PostCommitActions> buildPostCommitter(CommitTable.Client commitTableClient ) {

PostCommitActions postCommitter;
Expand Down Expand Up @@ -160,14 +170,15 @@ private HBaseTransactionManager(HBaseOmidClientConfiguration hBaseOmidClientConf
PostCommitActions postCommitter,
TSOProtocol tsoClient,
CommitTable.Client commitTableClient,
CommitTable.Writer commitTableWriter,
HBaseTransactionFactory hBaseTransactionFactory) {

super(hBaseOmidClientConfiguration.getMetrics(),
postCommitter,
tsoClient,
commitTableClient,
hBaseTransactionFactory);

postCommitter,
tsoClient,
commitTableClient,
commitTableWriter,
hBaseTransactionFactory);
}

// ----------------------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,7 @@
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.timestamp.storage.HBaseTimestampStorage;
import org.apache.omid.timestamp.storage.TimestampStorage;
import org.apache.omid.tso.BatchPoolModule;
import org.apache.omid.tso.DisruptorModule;
import org.apache.omid.tso.RuntimeExceptionPanicker;
import org.apache.omid.tso.NetworkInterfaceUtils;
import org.apache.omid.tso.Panicker;
import org.apache.omid.tso.PausableTimestampOracle;
import org.apache.omid.tso.PersistenceProcessorHandler;
import org.apache.omid.tso.TSOChannelHandler;
import org.apache.omid.tso.TSOServerConfig;
import org.apache.omid.tso.TSOStateManager;
import org.apache.omid.tso.TSOStateManagerImpl;
import org.apache.omid.tso.TimestampOracle;
import org.apache.omid.tso.*;

import org.apache.hadoop.conf.Configuration;

Expand Down Expand Up @@ -72,6 +61,7 @@ protected void configure() {
bind(TimestampStorage.class).to(HBaseTimestampStorage.class).in(Singleton.class);
bind(TimestampOracle.class).to(PausableTimestampOracle.class).in(Singleton.class);
bind(Panicker.class).to(RuntimeExceptionPanicker.class).in(Singleton.class);
bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class);

install(new BatchPoolModule(config));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.CommitTimestamp.Location;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -128,6 +125,18 @@ public void clearWriteBuffer() {
writeBuffer.clear();
}

@Override
public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
assert (startTimestamp < commitTimestamp);
byte[] transactionRow = startTimestampToKey(startTimestamp);
Put put = new Put(transactionRow, startTimestamp);
byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
put.add(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);

// TODO checkandput return false but still writes the put!?!
return table.checkAndPut(transactionRow, commitTableFamily, INVALID_TX_QUALIFIER, null, put);
}

@Override
public void close() throws IOException {
clearWriteBuffer();
Expand Down Expand Up @@ -254,7 +263,7 @@ public ListenableFuture<Boolean> tryInvalidateTransaction(long startTimestamp) {
try {
byte[] row = startTimestampToKey(startTimestamp);
Put invalidationPut = new Put(row, startTimestamp);
invalidationPut.add(commitTableFamily, INVALID_TX_QUALIFIER, null);
invalidationPut.add(commitTableFamily, INVALID_TX_QUALIFIER, Bytes.toBytes(1));

// We need to write to the invalid column only if the commit timestamp
// is empty. This has to be done atomically. Otherwise, if we first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,7 @@
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.timestamp.storage.HBaseTimestampStorage;
import org.apache.omid.timestamp.storage.TimestampStorage;
import org.apache.omid.tso.BatchPoolModule;
import org.apache.omid.tso.DisruptorModule;
import org.apache.omid.tso.LeaseManagement;
import org.apache.omid.tso.MockPanicker;
import org.apache.omid.tso.NetworkInterfaceUtils;
import org.apache.omid.tso.Panicker;
import org.apache.omid.tso.PersistenceProcessorHandler;
import org.apache.omid.tso.TSOChannelHandler;
import org.apache.omid.tso.TSOServerConfig;
import org.apache.omid.tso.TSOStateManager;
import org.apache.omid.tso.TSOStateManagerImpl;
import org.apache.omid.tso.TimestampOracle;
import org.apache.omid.tso.TimestampOracleImpl;
import org.apache.omid.tso.VoidLeaseManager;
import org.apache.omid.tso.*;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -76,7 +63,7 @@ protected void configure() {
// Timestamp storage creation
bind(TimestampStorage.class).to(HBaseTimestampStorage.class).in(Singleton.class);
bind(TimestampOracle.class).to(TimestampOracleImpl.class).in(Singleton.class);

bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class);
install(new BatchPoolModule(config));
// DisruptorConfig
install(new DisruptorModule(config));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public interface TransactionFactory<T extends CellId> {
private final PostCommitActions postCommitter;
protected final TSOProtocol tsoClient;
protected final CommitTable.Client commitTableClient;
private final CommitTable.Writer commitTableWriter;
private final TransactionFactory<? extends CellId> transactionFactory;

// Metrics
Expand Down Expand Up @@ -94,11 +95,13 @@ public AbstractTransactionManager(MetricsRegistry metrics,
PostCommitActions postCommitter,
TSOProtocol tsoClient,
CommitTable.Client commitTableClient,
CommitTable.Writer commitTableWriter,
TransactionFactory<? extends CellId> transactionFactory) {

this.tsoClient = tsoClient;
this.postCommitter = postCommitter;
this.commitTableClient = commitTableClient;
this.commitTableWriter = commitTableWriter;
this.transactionFactory = transactionFactory;

// Metrics configuration
Expand All @@ -108,7 +111,6 @@ public AbstractTransactionManager(MetricsRegistry metrics,
this.rolledbackTxsCounter = metrics.counter(name("omid", "tm", "hbase", "rolledbackTxs"));
this.errorTxsCounter = metrics.counter(name("omid", "tm", "hbase", "erroredTxs"));
this.invalidatedTxsCounter = metrics.counter(name("omid", "tm", "hbase", "invalidatedTxs"));

}

/**
Expand Down Expand Up @@ -198,7 +200,10 @@ public final void commit(Transaction transaction) throws RollbackException, Tran
if (tx.getWriteSet().isEmpty()) {
markReadOnlyTransaction(tx); // No need for read-only transactions to contact the TSO Server
} else {
commitRegularTransaction(tx);
if (tsoClient.isLowLatency())
commitLowLatencyTransaction(tx);
else
commitRegularTransaction(tx);
}
committedTxsCounter.inc();
} finally {
Expand Down Expand Up @@ -312,22 +317,43 @@ public CommitTimestamp locateCellCommitTimestamp(long cellStartTimestamp, long e

// 2) Then check the commit table
// If the data was written at a previous epoch, check whether the transaction was invalidated
Optional<CommitTimestamp> commitTimeStamp = commitTableClient.getCommitTimestamp(cellStartTimestamp).get();
if (commitTimeStamp.isPresent()) {
return commitTimeStamp.get();
Optional<CommitTimestamp> commitTimeStampFromCT = commitTableClient.getCommitTimestamp(cellStartTimestamp).get();

boolean invalidatedByOther = false;
if (commitTimeStampFromCT.isPresent()) {
if (tsoClient.isLowLatency() && !commitTimeStampFromCT.get().isValid())
invalidatedByOther = true;
else
return commitTimeStampFromCT.get();
}

// 3) Read from shadow cell
commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
Optional<CommitTimestamp> commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
if (commitTimeStamp.isPresent()) {
return commitTimeStamp.get();
}

// In case of LL, if found invalid ct cell, still must check sc in stage 3 then return
if (invalidatedByOther) {
return commitTimeStampFromCT.get();
}

// 4) Check the epoch and invalidate the entry
// if the data was written by a transaction from a previous epoch (previous TSO)
if (cellStartTimestamp < epoch) {
if (cellStartTimestamp < epoch || tsoClient.isLowLatency()) {
boolean invalidated = commitTableClient.tryInvalidateTransaction(cellStartTimestamp).get();
if (invalidated) { // Invalid commit timestamp

// If we are running lowLatency Omid, we could have manged to invalidate a ct entry,
// but the committing client already wrote to shadow cells:
if (tsoClient.isLowLatency()) {
commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
if (commitTimeStamp.isPresent()) {
// Remove false invalidation from commit table
commitTableClient.completeTransaction(cellStartTimestamp);
return commitTimeStamp.get();
}
}
return new CommitTimestamp(COMMIT_TABLE, CommitTable.INVALID_TRANSACTION_MARKER, false);
}
}
Expand Down Expand Up @@ -417,6 +443,41 @@ private void markReadOnlyTransaction(AbstractTransaction<? extends CellId> readO

}

private void commitLowLatencyTransaction(AbstractTransaction<? extends CellId> tx)
throws RollbackException, TransactionException {
try {

long commitTs = tsoClient.commit(tx.getStartTimestamp(), tx.getWriteSet()).get();
boolean commited = commitTableWriter.atomicAddCommittedTransaction(tx.getStartTimestamp(),commitTs);
if (!commited) {
// Trasaction has been invalidated by other client
rollback(tx);
commitTableClient.completeTransaction(tx.getStartTimestamp());
rolledbackTxsCounter.inc();
throw new RollbackException("Transaction " + tx.getTransactionId() + " got invalidated");
}
certifyCommitForTx(tx, commitTs);
updateShadowCellsAndRemoveCommitTableEntry(tx, postCommitter);

} catch (ExecutionException e) {
if (e.getCause() instanceof AbortException) { // TSO reports Tx conflicts as AbortExceptions in the future
rollback(tx);
rolledbackTxsCounter.inc();
throw new RollbackException("Conflicts detected in tx writeset", e.getCause());
}

if (e.getCause() instanceof ServiceUnavailableException || e.getCause() instanceof ConnectionException) {
errorTxsCounter.inc();
} else {
throw new TransactionException(tx + ": cannot determine Tx outcome", e.getCause());
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

private void commitRegularTransaction(AbstractTransaction<? extends CellId> tx)
throws RollbackException, TransactionException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public TSOFuture<Void> close() {
return new ForwardingTSOFuture<>(f);
}

@Override
public boolean isLowLatency() {
return false;
}

@Override
public long getEpoch() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
private InetSocketAddress tsoAddr;
private String zkCurrentTsoPath;

private boolean lowLatency;

// ----------------------------------------------------------------------------------------------------------------
// Construction
// ----------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -159,6 +161,7 @@ private TSOClient(OmidClientConfiguration omidConf) throws IOException {
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("reuseAddress", true);
bootstrap.setOption("connectTimeoutMillis", 100);
lowLatency = false;
}

// ----------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -264,6 +267,11 @@ public void nodeChanged() throws Exception {

}

@Override
public boolean isLowLatency() {
return lowLatency;
}

// ****************************************** Finite State Machine ************************************************

// ----------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -530,6 +538,7 @@ public StateMachine.State handleEvent(UserEvent e) {
}

public StateMachine.State handleEvent(ResponseEvent e) {
lowLatency = e.getParam().getHandshakeResponse().getLowLatency();
if (e.getParam().hasHandshakeResponse() && e.getParam().getHandshakeResponse().getClientCompatible()) {
if (timeout != null) {
timeout.cancel();
Expand Down
Loading