From 475f35267c892cbee42316008c80df7132f29895 Mon Sep 17 00:00:00 2001 From: Yonatan Gottesman Date: Tue, 12 Jun 2018 14:45:48 +0300 Subject: [PATCH 1/2] Low latency protocol --- .../apache/omid/committable/CommitTable.java | 6 ++ .../omid/committable/InMemoryCommitTable.java | 8 ++ .../omid/committable/NullCommitTable.java | 5 ++ common/src/main/proto/TSOProto.proto | 1 + .../transaction/HBaseTransactionManager.java | 21 +++-- .../omid/transaction/TestTSOModule.java | 14 +--- .../committable/hbase/HBaseCommitTable.java | 21 +++-- .../TSOForHBaseCompactorTestModule.java | 17 +--- .../AbstractTransactionManager.java | 75 +++++++++++++++-- .../apache/omid/tso/client/MockTSOClient.java | 5 ++ .../tso/client/OmidClientConfiguration.java | 7 ++ .../org/apache/omid/tso/client/TSOClient.java | 9 ++ .../apache/omid/tso/client/TSOProtocol.java | 6 ++ ...mpl.java => AbstractRequestProcessor.java} | 54 ++++++------ .../org/apache/omid/tso/DisruptorModule.java | 11 ++- .../apache/omid/tso/LowWatermarkWriter.java | 24 ++++++ .../omid/tso/LowWatermarkWriterImpl.java | 79 ++++++++++++++++++ .../apache/omid/tso/MonitoringContext.java | 57 ++----------- .../omid/tso/MonitoringContextFactory.java | 31 +++++++ .../omid/tso/MonitoringContextImpl.java | 75 +++++++++++++++++ .../omid/tso/MonitoringContextNullImpl.java | 36 ++++++++ .../apache/omid/tso/PersistenceProcessor.java | 3 +- .../omid/tso/PersistenceProcessorImpl.java | 35 -------- .../omid/tso/PersitenceProcessorNullImpl.java | 55 ++++++++++++ .../org/apache/omid/tso/ReplyProcessor.java | 6 +- .../apache/omid/tso/ReplyProcessorImpl.java | 27 +++--- .../omid/tso/RequestProcessorPersistCT.java | 69 +++++++++++++++ .../omid/tso/RequestProcessorSkipCT.java | 83 +++++++++++++++++++ .../apache/omid/tso/RetryProcessorImpl.java | 6 +- .../apache/omid/tso/TSOChannelHandler.java | 5 +- .../java/org/apache/omid/tso/TSOModule.java | 1 + .../java/org/apache/omid/tso/TSOServer.java | 3 +- .../org/apache/omid/tso/TSOServerConfig.java | 20 +++++ .../default-omid-server-configuration.yml | 5 ++ .../org/apache/omid/tso/TSOMockModule.java | 1 + .../java/org/apache/omid/tso/TestBatch.java | 2 +- .../org/apache/omid/tso/TestPanicker.java | 12 ++- .../omid/tso/TestPersistenceProcessor.java | 65 +++++---------- .../tso/TestPersistenceProcessorHandler.java | 64 +++++++------- .../apache/omid/tso/TestReplyProcessor.java | 12 +-- .../apache/omid/tso/TestRequestProcessor.java | 60 +++++++------- .../apache/omid/tso/TestRetryProcessor.java | 16 ++-- .../omid/tso/TestTSOChannelHandlerNetty.java | 8 +- 43 files changed, 810 insertions(+), 310 deletions(-) rename tso-server/src/main/java/org/apache/omid/tso/{RequestProcessorImpl.java => AbstractRequestProcessor.java} (86%) create mode 100644 tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriter.java create mode 100644 tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriterImpl.java create mode 100644 tso-server/src/main/java/org/apache/omid/tso/MonitoringContextFactory.java create mode 100644 tso-server/src/main/java/org/apache/omid/tso/MonitoringContextImpl.java create mode 100644 tso-server/src/main/java/org/apache/omid/tso/MonitoringContextNullImpl.java create mode 100644 tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java create mode 100644 tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java create mode 100644 tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java diff --git a/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java index 91f590e21..52d0068e0 100644 --- a/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java +++ b/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java @@ -22,6 +22,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.List; public interface CommitTable { @@ -46,6 +47,11 @@ interface Writer extends Closeable { * Allows to clean the write's current buffer. It is required for HA */ void clearWriteBuffer(); + + /** + * Add commited transaction while checking if invalidated by other client + */ + boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException; } interface Client extends Closeable { diff --git a/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java index 90af54ae4..6f9f38426 100644 --- a/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java +++ b/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java @@ -65,6 +65,14 @@ public void clearWriteBuffer() { table.clear(); } + @Override + public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException { + // In this implementation, we use only one location that represents + // both the value and the invalidation. Therefore, putIfAbsent is + // required to make sure the entry was not invalidated. + return (table.putIfAbsent(startTimestamp, commitTimestamp) == null); + } + @Override public void close() { } diff --git a/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java index 1cba77eb1..c27a2381a 100644 --- a/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java +++ b/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java @@ -50,6 +50,11 @@ public void clearWriteBuffer() { // noop } + @Override + public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException { + return true; + } + @Override public void flush() throws IOException { // noop diff --git a/common/src/main/proto/TSOProto.proto b/common/src/main/proto/TSOProto.proto index 749beaa9d..507f19645 100644 --- a/common/src/main/proto/TSOProto.proto +++ b/common/src/main/proto/TSOProto.proto @@ -63,4 +63,5 @@ message HandshakeRequest { message HandshakeResponse { optional bool clientCompatible = 1; optional Capabilities serverCapabilities = 2; + optional bool lowLatency = 3[default= false]; } diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java index b31d2c9ed..72f34635b 100644 --- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java +++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java @@ -82,6 +82,7 @@ public static class Builder { // Optional parameters - initialized to default values private Optional tsoClient = Optional.absent(); private Optional commitTableClient = Optional.absent(); + private Optional commitTableWriter = Optional.absent(); private Optional postCommitter = Optional.absent(); public Builder(HBaseOmidClientConfiguration hbaseOmidClientConf) { @@ -106,6 +107,7 @@ public Builder postCommitter(PostCommitActions postCommitter) { public HBaseTransactionManager build() throws IOException, InterruptedException { CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient()).get(); + CommitTable.Writer commitTableWriter = this.commitTableWriter.or(buildCommitTableWriter()).get(); PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient)).get(); TSOProtocol tsoClient = this.tsoClient.or(buildTSOClient()).get(); @@ -113,6 +115,7 @@ public HBaseTransactionManager build() throws IOException, InterruptedException postCommitter, tsoClient, commitTableClient, + commitTableWriter, new HBaseTransactionFactory()); } @@ -128,6 +131,13 @@ private Optional buildCommitTableClient() throws IOException return Optional.of(commitTable.getClient()); } + private Optional buildCommitTableWriter() throws IOException { + HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig(); + commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName()); + CommitTable commitTable = new HBaseCommitTable(hbaseOmidClientConf.getHBaseConfiguration(), commitTableConf); + return Optional.of(commitTable.getWriter()); + } + private Optional buildPostCommitter(CommitTable.Client commitTableClient ) { PostCommitActions postCommitter; @@ -160,14 +170,15 @@ private HBaseTransactionManager(HBaseOmidClientConfiguration hBaseOmidClientConf PostCommitActions postCommitter, TSOProtocol tsoClient, CommitTable.Client commitTableClient, + CommitTable.Writer commitTableWriter, HBaseTransactionFactory hBaseTransactionFactory) { super(hBaseOmidClientConfiguration.getMetrics(), - postCommitter, - tsoClient, - commitTableClient, - hBaseTransactionFactory); - + postCommitter, + tsoClient, + commitTableClient, + commitTableWriter, + hBaseTransactionFactory); } // ---------------------------------------------------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java index 67c9cba60..99f95dbc9 100644 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java @@ -27,18 +27,7 @@ import org.apache.omid.metrics.NullMetricsProvider; import org.apache.omid.timestamp.storage.HBaseTimestampStorage; import org.apache.omid.timestamp.storage.TimestampStorage; -import org.apache.omid.tso.BatchPoolModule; -import org.apache.omid.tso.DisruptorModule; -import org.apache.omid.tso.RuntimeExceptionPanicker; -import org.apache.omid.tso.NetworkInterfaceUtils; -import org.apache.omid.tso.Panicker; -import org.apache.omid.tso.PausableTimestampOracle; -import org.apache.omid.tso.PersistenceProcessorHandler; -import org.apache.omid.tso.TSOChannelHandler; -import org.apache.omid.tso.TSOServerConfig; -import org.apache.omid.tso.TSOStateManager; -import org.apache.omid.tso.TSOStateManagerImpl; -import org.apache.omid.tso.TimestampOracle; +import org.apache.omid.tso.*; import org.apache.hadoop.conf.Configuration; @@ -72,6 +61,7 @@ protected void configure() { bind(TimestampStorage.class).to(HBaseTimestampStorage.class).in(Singleton.class); bind(TimestampOracle.class).to(PausableTimestampOracle.class).in(Singleton.class); bind(Panicker.class).to(RuntimeExceptionPanicker.class).in(Singleton.class); + bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class); install(new BatchPoolModule(config)); diff --git a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java index 89815c4a9..0464c3998 100644 --- a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java +++ b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java @@ -24,14 +24,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.omid.committable.CommitTable; import org.apache.omid.committable.CommitTable.CommitTimestamp.Location; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,6 +125,18 @@ public void clearWriteBuffer() { writeBuffer.clear(); } + @Override + public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException { + assert (startTimestamp < commitTimestamp); + byte[] transactionRow = startTimestampToKey(startTimestamp); + Put put = new Put(transactionRow, startTimestamp); + byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp); + put.add(commitTableFamily, COMMIT_TABLE_QUALIFIER, value); + + // TODO checkandput return false but still writes the put!?! + return table.checkAndPut(transactionRow, commitTableFamily, INVALID_TX_QUALIFIER, null, put); + } + @Override public void close() throws IOException { clearWriteBuffer(); @@ -254,7 +263,7 @@ public ListenableFuture tryInvalidateTransaction(long startTimestamp) { try { byte[] row = startTimestampToKey(startTimestamp); Put invalidationPut = new Put(row, startTimestamp); - invalidationPut.add(commitTableFamily, INVALID_TX_QUALIFIER, null); + invalidationPut.add(commitTableFamily, INVALID_TX_QUALIFIER, Bytes.toBytes(1)); // We need to write to the invalid column only if the commit timestamp // is empty. This has to be done atomically. Otherwise, if we first diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java index abfe67cf3..0a6d29f8c 100644 --- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java +++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java @@ -26,20 +26,7 @@ import org.apache.omid.metrics.NullMetricsProvider; import org.apache.omid.timestamp.storage.HBaseTimestampStorage; import org.apache.omid.timestamp.storage.TimestampStorage; -import org.apache.omid.tso.BatchPoolModule; -import org.apache.omid.tso.DisruptorModule; -import org.apache.omid.tso.LeaseManagement; -import org.apache.omid.tso.MockPanicker; -import org.apache.omid.tso.NetworkInterfaceUtils; -import org.apache.omid.tso.Panicker; -import org.apache.omid.tso.PersistenceProcessorHandler; -import org.apache.omid.tso.TSOChannelHandler; -import org.apache.omid.tso.TSOServerConfig; -import org.apache.omid.tso.TSOStateManager; -import org.apache.omid.tso.TSOStateManagerImpl; -import org.apache.omid.tso.TimestampOracle; -import org.apache.omid.tso.TimestampOracleImpl; -import org.apache.omid.tso.VoidLeaseManager; +import org.apache.omid.tso.*; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; @@ -76,7 +63,7 @@ protected void configure() { // Timestamp storage creation bind(TimestampStorage.class).to(HBaseTimestampStorage.class).in(Singleton.class); bind(TimestampOracle.class).to(TimestampOracleImpl.class).in(Singleton.class); - + bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class); install(new BatchPoolModule(config)); // DisruptorConfig install(new DisruptorModule(config)); diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java index 8b406b489..a41466e39 100644 --- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java +++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java @@ -65,6 +65,7 @@ public interface TransactionFactory { private final PostCommitActions postCommitter; protected final TSOProtocol tsoClient; protected final CommitTable.Client commitTableClient; + private final CommitTable.Writer commitTableWriter; private final TransactionFactory transactionFactory; // Metrics @@ -94,11 +95,13 @@ public AbstractTransactionManager(MetricsRegistry metrics, PostCommitActions postCommitter, TSOProtocol tsoClient, CommitTable.Client commitTableClient, + CommitTable.Writer commitTableWriter, TransactionFactory transactionFactory) { this.tsoClient = tsoClient; this.postCommitter = postCommitter; this.commitTableClient = commitTableClient; + this.commitTableWriter = commitTableWriter; this.transactionFactory = transactionFactory; // Metrics configuration @@ -108,7 +111,6 @@ public AbstractTransactionManager(MetricsRegistry metrics, this.rolledbackTxsCounter = metrics.counter(name("omid", "tm", "hbase", "rolledbackTxs")); this.errorTxsCounter = metrics.counter(name("omid", "tm", "hbase", "erroredTxs")); this.invalidatedTxsCounter = metrics.counter(name("omid", "tm", "hbase", "invalidatedTxs")); - } /** @@ -198,7 +200,10 @@ public final void commit(Transaction transaction) throws RollbackException, Tran if (tx.getWriteSet().isEmpty()) { markReadOnlyTransaction(tx); // No need for read-only transactions to contact the TSO Server } else { - commitRegularTransaction(tx); + if (tsoClient.isLowLatency()) + commitLowLatencyTransaction(tx); + else + commitRegularTransaction(tx); } committedTxsCounter.inc(); } finally { @@ -312,22 +317,43 @@ public CommitTimestamp locateCellCommitTimestamp(long cellStartTimestamp, long e // 2) Then check the commit table // If the data was written at a previous epoch, check whether the transaction was invalidated - Optional commitTimeStamp = commitTableClient.getCommitTimestamp(cellStartTimestamp).get(); - if (commitTimeStamp.isPresent()) { - return commitTimeStamp.get(); + Optional commitTimeStampFromCT = commitTableClient.getCommitTimestamp(cellStartTimestamp).get(); + + boolean invalidatedByOther = false; + if (commitTimeStampFromCT.isPresent()) { + if (tsoClient.isLowLatency() && !commitTimeStampFromCT.get().isValid()) + invalidatedByOther = true; + else + return commitTimeStampFromCT.get(); } // 3) Read from shadow cell - commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator); + Optional commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator); if (commitTimeStamp.isPresent()) { return commitTimeStamp.get(); } + // In case of LL, if found invalid ct cell, still must check sc in stage 3 then return + if (invalidatedByOther) { + return commitTimeStampFromCT.get(); + } + // 4) Check the epoch and invalidate the entry // if the data was written by a transaction from a previous epoch (previous TSO) - if (cellStartTimestamp < epoch) { + if (cellStartTimestamp < epoch || tsoClient.isLowLatency()) { boolean invalidated = commitTableClient.tryInvalidateTransaction(cellStartTimestamp).get(); if (invalidated) { // Invalid commit timestamp + + // If we are running lowLatency Omid, we could have manged to invalidate a ct entry, + // but the committing client already wrote to shadow cells: + if (tsoClient.isLowLatency()) { + commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator); + if (commitTimeStamp.isPresent()) { + // Remove false invalidation from commit table + commitTableClient.completeTransaction(cellStartTimestamp); + return commitTimeStamp.get(); + } + } return new CommitTimestamp(COMMIT_TABLE, CommitTable.INVALID_TRANSACTION_MARKER, false); } } @@ -417,6 +443,41 @@ private void markReadOnlyTransaction(AbstractTransaction readO } + private void commitLowLatencyTransaction(AbstractTransaction tx) + throws RollbackException, TransactionException { + try { + + long commitTs = tsoClient.commit(tx.getStartTimestamp(), tx.getWriteSet()).get(); + boolean commited = commitTableWriter.atomicAddCommittedTransaction(tx.getStartTimestamp(),commitTs); + if (!commited) { + // Trasaction has been invalidated by other client + rollback(tx); + commitTableClient.completeTransaction(tx.getStartTimestamp()); + rolledbackTxsCounter.inc(); + throw new RollbackException("Transaction " + tx.getTransactionId() + " got invalidated"); + } + certifyCommitForTx(tx, commitTs); + updateShadowCellsAndRemoveCommitTableEntry(tx, postCommitter); + + } catch (ExecutionException e) { + if (e.getCause() instanceof AbortException) { // TSO reports Tx conflicts as AbortExceptions in the future + rollback(tx); + rolledbackTxsCounter.inc(); + throw new RollbackException("Conflicts detected in tx writeset", e.getCause()); + } + + if (e.getCause() instanceof ServiceUnavailableException || e.getCause() instanceof ConnectionException) { + errorTxsCounter.inc(); + } else { + throw new TransactionException(tx + ": cannot determine Tx outcome", e.getCause()); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + private void commitRegularTransaction(AbstractTransaction tx) throws RollbackException, TransactionException { diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java index 0511e0f9e..8486e024e 100644 --- a/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java +++ b/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java @@ -103,6 +103,11 @@ public TSOFuture close() { return new ForwardingTSOFuture<>(f); } + @Override + public boolean isLowLatency() { + return false; + } + @Override public long getEpoch() { return 0; diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java b/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java index 3542c55df..48ec31086 100644 --- a/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java +++ b/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java @@ -50,6 +50,7 @@ public enum PostCommitMode {SYNC, ASYNC} // Transaction Manager related params + private boolean lowLatency; private PostCommitMode postCommitMode = PostCommitMode.SYNC; // ---------------------------------------------------------------------------------------------------------------- @@ -68,6 +69,12 @@ public ConnType getConnectionType() { return connectionType; } + @Inject(optional = true) + @Named("omid.client.lowLatency") + public void setLowLatency(boolean lowLatency) { this.lowLatency = lowLatency;} + + public boolean getLowLatency() { return lowLatency;} + @Inject(optional = true) @Named("omid.client.connectionType") public void setConnectionType(ConnType connectionType) { diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java index fd92792f7..617b4d5c4 100644 --- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java +++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java @@ -92,6 +92,8 @@ public class TSOClient implements TSOProtocol, NodeCacheListener { private InetSocketAddress tsoAddr; private String zkCurrentTsoPath; + private boolean lowLatency; + // ---------------------------------------------------------------------------------------------------------------- // Construction // ---------------------------------------------------------------------------------------------------------------- @@ -159,6 +161,7 @@ private TSOClient(OmidClientConfiguration omidConf) throws IOException { bootstrap.setOption("keepAlive", true); bootstrap.setOption("reuseAddress", true); bootstrap.setOption("connectTimeoutMillis", 100); + lowLatency = false; } // ---------------------------------------------------------------------------------------------------------------- @@ -264,6 +267,11 @@ public void nodeChanged() throws Exception { } + @Override + public boolean isLowLatency() { + return lowLatency; + } + // ****************************************** Finite State Machine ************************************************ // ---------------------------------------------------------------------------------------------------------------- @@ -530,6 +538,7 @@ public StateMachine.State handleEvent(UserEvent e) { } public StateMachine.State handleEvent(ResponseEvent e) { + lowLatency = e.getParam().getHandshakeResponse().getLowLatency(); if (e.getParam().hasHandshakeResponse() && e.getParam().getHandshakeResponse().getClientCompatible()) { if (timeout != null) { timeout.cancel(); diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java index fae4b9605..bebc67e2b 100644 --- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java +++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java @@ -60,4 +60,10 @@ public interface TSOProtocol { */ TSOFuture close(); + /** + * checks is tso is low latency protocol + * @return + */ + boolean isLowLatency(); + } diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java similarity index 86% rename from tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java rename to tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java index 65416bcbc..14e137069 100644 --- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java +++ b/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java @@ -30,7 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.inject.Inject; import java.io.IOException; import java.util.Collection; import java.util.Iterator; @@ -42,30 +41,28 @@ import static com.lmax.disruptor.dsl.ProducerType.MULTI; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.omid.tso.RequestProcessorImpl.RequestEvent.EVENT_FACTORY; +import static org.apache.omid.tso.AbstractRequestProcessor.RequestEvent.EVENT_FACTORY; -class RequestProcessorImpl implements EventHandler, RequestProcessor, TimeoutHandler { +abstract class AbstractRequestProcessor implements EventHandler, RequestProcessor, TimeoutHandler { - private static final Logger LOG = LoggerFactory.getLogger(RequestProcessorImpl.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractRequestProcessor.class); // Disruptor-related attributes private final ExecutorService disruptorExec; - private final Disruptor disruptor; - private final RingBuffer requestRing; + protected final Disruptor disruptor; + protected RingBuffer requestRing; private final TimestampOracle timestampOracle; private final CommitHashMap hashmap; private final MetricsRegistry metrics; - private final PersistenceProcessor persistProc; - + private final LowWatermarkWriter lowWatermarkWriter; private long lowWatermark = -1L; - @Inject - RequestProcessorImpl(MetricsRegistry metrics, - TimestampOracle timestampOracle, - PersistenceProcessor persistProc, - Panicker panicker, - TSOServerConfig config) + + AbstractRequestProcessor(MetricsRegistry metrics, + TimestampOracle timestampOracle, + Panicker panicker, + TSOServerConfig config, LowWatermarkWriter lowWatermarkWriter) throws IOException { // ------------------------------------------------------------------------------------------------------------ @@ -80,17 +77,18 @@ class RequestProcessorImpl implements EventHandler(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, timeoutStrategy); disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith() disruptor.handleEventsWith(this); - this.requestRing = disruptor.start(); + // ------------------------------------------------------------------------------------------------------------ // Attribute initialization // ------------------------------------------------------------------------------------------------------------ this.metrics = metrics; - this.persistProc = persistProc; this.timestampOracle = timestampOracle; this.hashmap = new CommitHashMap(config.getConflictMapSize()); + this.lowWatermarkWriter = lowWatermarkWriter; + LOG.info("RequestProcessor initialized"); } @@ -102,7 +100,7 @@ class RequestProcessorImpl implements EventHandler persistLowWatermark(final long lowWatermark); +} diff --git a/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriterImpl.java b/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriterImpl.java new file mode 100644 index 000000000..8de1b20e9 --- /dev/null +++ b/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriterImpl.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.tso; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.inject.Inject; +import org.apache.omid.committable.CommitTable; +import org.apache.omid.metrics.MetricsRegistry; +import org.apache.omid.metrics.Timer; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.omid.metrics.MetricsUtils.name; + +public class LowWatermarkWriterImpl implements LowWatermarkWriter { + + private static final Logger LOG = LoggerFactory.getLogger(LowWatermarkWriterImpl.class); + + private final Timer lwmWriteTimer; + private final CommitTable.Writer lowWatermarkWriter; + private final ExecutorService lowWatermarkWriterExecutor; + private MetricsRegistry metrics; + + @Inject + LowWatermarkWriterImpl(TSOServerConfig config, + CommitTable commitTable, + MetricsRegistry metrics) + throws Exception { + this.metrics = metrics; + this.lowWatermarkWriter = commitTable.getWriter(); + // Low Watermark writer + ThreadFactoryBuilder lwmThreadFactory = new ThreadFactoryBuilder().setNameFormat("lwm-writer-%d"); + this.lowWatermarkWriterExecutor = Executors.newSingleThreadExecutor(lwmThreadFactory.build()); + + // Metrics config + this.lwmWriteTimer = metrics.timer(name("tso", "lwmWriter", "latency")); + LOG.info("PersistentProcessor initialized"); + } + + @Override + public Future persistLowWatermark(final long lowWatermark) { + + return lowWatermarkWriterExecutor.submit(new Callable() { + @Override + public Void call() throws IOException { + try { + lwmWriteTimer.start(); + lowWatermarkWriter.updateLowWatermark(lowWatermark); + lowWatermarkWriter.flush(); + } finally { + lwmWriteTimer.stop(); + } + return null; + } + }); + } +} diff --git a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java index 426df27bf..ea183a8f9 100644 --- a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java +++ b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java @@ -15,62 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.omid.tso; - -import com.google.common.base.Stopwatch; -import com.google.common.base.Throwables; -import org.apache.omid.metrics.MetricsRegistry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.concurrent.NotThreadSafe; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - -import static org.apache.omid.metrics.MetricsUtils.name; -@NotThreadSafe -public class MonitoringContext { - - private static final Logger LOG = LoggerFactory.getLogger(MonitoringContext.class); - - private volatile boolean flag; - private Map elapsedTimeMsMap = new ConcurrentHashMap<>(); - private Map timers = new ConcurrentHashMap<>(); - private MetricsRegistry metrics; +package org.apache.omid.tso; - public MonitoringContext(MetricsRegistry metrics) { - this.metrics = metrics; - } +public interface MonitoringContext { - public void timerStart(String name) { - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); - timers.put(name, stopwatch); - } + public void timerStart(String name); - public void timerStop(String name) { - if (flag) { - LOG.warn("timerStop({}) called after publish. Measurement was ignored. {}", name, Throwables.getStackTraceAsString(new Exception())); - return; - } - Stopwatch activeStopwatch = timers.get(name); - if (activeStopwatch == null) { - throw new IllegalStateException( - String.format("There is no %s timer in the %s monitoring context.", name, this)); - } - activeStopwatch.stop(); - elapsedTimeMsMap.put(name, activeStopwatch.elapsedTime(TimeUnit.NANOSECONDS)); - timers.remove(name); - } + public void timerStop(String name); - public void publish() { - flag = true; - for (Map.Entry entry : elapsedTimeMsMap.entrySet()) { - metrics.timer(name("tso", entry.getKey())).update(entry.getValue()); - } - } + public void publish(); } diff --git a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextFactory.java b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextFactory.java new file mode 100644 index 000000000..4280abc17 --- /dev/null +++ b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.tso; + +import org.apache.omid.metrics.MetricsRegistry; + +public class MonitoringContextFactory { + private MonitoringContextFactory(){}; + + static public MonitoringContext getInstance(TSOServerConfig config, MetricsRegistry metrics) { + if (config.getMonitorContext()) + return new MonitoringContextImpl(metrics); + else + return new MonitoringContextNullImpl(); + } +} diff --git a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextImpl.java b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextImpl.java new file mode 100644 index 000000000..5792a77c6 --- /dev/null +++ b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextImpl.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.tso; + +import com.google.common.base.Stopwatch; +import com.google.common.base.Throwables; +import org.apache.omid.metrics.MetricsRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.omid.metrics.MetricsUtils.name; +import java.util.concurrent.TimeUnit; + +@NotThreadSafe +public class MonitoringContextImpl implements MonitoringContext{ + + private static final Logger LOG = LoggerFactory.getLogger(MonitoringContextImpl.class); + + private volatile boolean flag; + private Map elapsedTimeMsMap = new ConcurrentHashMap<>(); + private Map timers = new ConcurrentHashMap<>(); + private MetricsRegistry metrics; + + public MonitoringContextImpl(MetricsRegistry metrics) { + this.metrics = metrics; + } + + public void timerStart(String name) { + Stopwatch stopwatch = new Stopwatch(); + stopwatch.start(); + timers.put(name, stopwatch); + } + + public void timerStop(String name) { + if (flag) { + LOG.warn("timerStop({}) called after publish. Measurement was ignored. {}", name, Throwables.getStackTraceAsString(new Exception())); + return; + } + Stopwatch activeStopwatch = timers.get(name); + if (activeStopwatch == null) { + throw new IllegalStateException( + String.format("There is no %s timer in the %s monitoring context.", name, this)); + } + activeStopwatch.stop(); + elapsedTimeMsMap.put(name, activeStopwatch.elapsedTime(TimeUnit.NANOSECONDS)); + timers.remove(name); + } + + public void publish() { + flag = true; + for (Map.Entry entry : elapsedTimeMsMap.entrySet()) { + metrics.timer(name("tso", entry.getKey())).update(entry.getValue()); + } + } + +} diff --git a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextNullImpl.java b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextNullImpl.java new file mode 100644 index 000000000..f88123f86 --- /dev/null +++ b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextNullImpl.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.omid.tso; + +public class MonitoringContextNullImpl implements MonitoringContext { + @Override + public void timerStart(String name) { + + } + + @Override + public void timerStop(String name) { + + } + + @Override + public void publish() { + + } +} diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java index b96945d6b..40f203f6f 100644 --- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java +++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java @@ -20,7 +20,6 @@ import org.jboss.netty.channel.Channel; import java.io.Closeable; -import java.util.concurrent.Future; interface PersistenceProcessor extends Closeable { @@ -35,5 +34,5 @@ void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, Moni void triggerCurrentBatchFlush() throws Exception; - Future persistLowWatermark(long lowWatermark); + } diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java index 95d77bad2..e4d2eba43 100644 --- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java +++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java @@ -29,7 +29,6 @@ import org.apache.commons.pool2.ObjectPool; import org.apache.omid.committable.CommitTable; import org.apache.omid.metrics.MetricsRegistry; -import org.apache.omid.metrics.Timer; import org.jboss.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,10 +36,8 @@ import javax.inject.Inject; import java.io.IOException; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import static com.lmax.disruptor.dsl.ProducerType.SINGLE; import static java.util.concurrent.TimeUnit.SECONDS; @@ -63,12 +60,7 @@ class PersistenceProcessorImpl implements PersistenceProcessor { // TODO Next two need to be either int or AtomicLong volatile private long batchSequence; - - private CommitTable.Writer lowWatermarkWriter; - private ExecutorService lowWatermarkWriterExecutor; - private MetricsRegistry metrics; - private final Timer lwmWriteTimer; @Inject PersistenceProcessorImpl(TSOServerConfig config, @@ -97,19 +89,11 @@ class PersistenceProcessorImpl implements PersistenceProcessor { // ------------------------------------------------------------------------------------------------------------ this.metrics = metrics; - this.lowWatermarkWriter = commitTable.getWriter(); this.batchSequence = 0L; this.batchPool = batchPool; this.currentBatch = batchPool.borrowObject(); - // Low Watermark writer - ThreadFactoryBuilder lwmThreadFactory = new ThreadFactoryBuilder().setNameFormat("lwm-writer-%d"); - this.lowWatermarkWriterExecutor = Executors.newSingleThreadExecutor(lwmThreadFactory.build()); - - // Metrics config - this.lwmWriteTimer = metrics.timer(name("tso", "lwmWriter", "latency")); LOG.info("PersistentProcessor initialized"); - } @Override @@ -166,25 +150,6 @@ public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContex } - @Override - public Future persistLowWatermark(final long lowWatermark) { - - return lowWatermarkWriterExecutor.submit(new Callable() { - @Override - public Void call() throws IOException { - try { - lwmWriteTimer.start(); - lowWatermarkWriter.updateLowWatermark(lowWatermark); - lowWatermarkWriter.flush(); - } finally { - lwmWriteTimer.stop(); - } - return null; - } - }); - - } - @Override public void close() throws IOException { diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java new file mode 100644 index 000000000..55b50688f --- /dev/null +++ b/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.tso; + +import org.jboss.netty.channel.Channel; + +import java.io.IOException; + +public class PersitenceProcessorNullImpl implements PersistenceProcessor { + + @Override + public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception { + + } + + @Override + public void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception { + + } + + @Override + public void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception { + + } + + @Override + public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception { + + } + + @Override + public void triggerCurrentBatchFlush() throws Exception { + + } + + @Override + public void close() throws IOException { + + } +} diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java index f196c42ef..60e3e065c 100644 --- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java +++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java @@ -44,7 +44,7 @@ interface ReplyProcessor extends Closeable { * @param channel * the channel used to send the response back to the client */ - void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel channel); + void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel channel, MonitoringContext monCtx); /** * Allows to send an abort response back to the client. @@ -54,7 +54,7 @@ interface ReplyProcessor extends Closeable { * @param channel * the channel used to send the response back to the client */ - void sendAbortResponse(long startTimestamp, Channel channel); + void sendAbortResponse(long startTimestamp, Channel channel, MonitoringContext monCtx); /** * Allow to send a timestamp response back to the client. @@ -65,7 +65,7 @@ interface ReplyProcessor extends Closeable { * the channel used to send the response back to the client */ - void sendTimestampResponse(long startTimestamp, Channel channel); + void sendTimestampResponse(long startTimestamp, Channel channel, MonitoringContext monCtx); } diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java index 8e50323ef..6681fab76 100644 --- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java +++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java @@ -114,19 +114,13 @@ void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception { switch (event.getType()) { case COMMIT: - sendCommitResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel()); - event.getMonCtx().timerStop("reply.processor.commit.latency"); - commitMeter.mark(); + sendCommitResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel(), event.getMonCtx()); break; case ABORT: - sendAbortResponse(event.getStartTimestamp(), event.getChannel()); - event.getMonCtx().timerStop("reply.processor.abort.latency"); - abortMeter.mark(); + sendAbortResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx()); break; case TIMESTAMP: - sendTimestampResponse(event.getStartTimestamp(), event.getChannel()); - event.getMonCtx().timerStop("reply.processor.timestamp.latency"); - timestampMeter.mark(); + sendTimestampResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx()); break; case COMMIT_RETRY: throw new IllegalStateException("COMMIT_RETRY events must be filtered before this step: " + event); @@ -182,7 +176,7 @@ public void manageResponsesBatch(long batchSequence, Batch batch) { } @Override - public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c) { + public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) { TSOProto.Response.Builder builder = TSOProto.Response.newBuilder(); TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder(); @@ -191,11 +185,12 @@ public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channe .setCommitTimestamp(commitTimestamp); builder.setCommitResponse(commitBuilder.build()); c.write(builder.build()); - + commitMeter.mark(); + monCtx.timerStop("reply.processor.commit.latency"); } @Override - public void sendAbortResponse(long startTimestamp, Channel c) { + public void sendAbortResponse(long startTimestamp, Channel c, MonitoringContext monCtx) { TSOProto.Response.Builder builder = TSOProto.Response.newBuilder(); TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder(); @@ -203,18 +198,20 @@ public void sendAbortResponse(long startTimestamp, Channel c) { commitBuilder.setStartTimestamp(startTimestamp); builder.setCommitResponse(commitBuilder.build()); c.write(builder.build()); - + abortMeter.mark(); + monCtx.timerStop("reply.processor.abort.latency"); } @Override - public void sendTimestampResponse(long startTimestamp, Channel c) { + public void sendTimestampResponse(long startTimestamp, Channel c, MonitoringContext monCtx) { TSOProto.Response.Builder builder = TSOProto.Response.newBuilder(); TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder(); respBuilder.setStartTimestamp(startTimestamp); builder.setTimestampResponse(respBuilder.build()); c.write(builder.build()); - + timestampMeter.mark(); + monCtx.timerStop("reply.processor.timestamp.latency"); } @Override diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java new file mode 100644 index 000000000..62d8c5d93 --- /dev/null +++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.tso; + +import com.google.inject.Inject; +import org.apache.omid.metrics.MetricsRegistry; +import org.jboss.netty.channel.Channel; + +import java.io.IOException; + +public class RequestProcessorPersistCT extends AbstractRequestProcessor { + + PersistenceProcessor persistenceProcessor; + + @Inject + RequestProcessorPersistCT(MetricsRegistry metrics, + TimestampOracle timestampOracle, + PersistenceProcessor persistenceProcessor, + Panicker panicker, + TSOServerConfig config, + LowWatermarkWriter lowWatermarkWriter) throws IOException { + + super(metrics, timestampOracle, panicker, config, lowWatermarkWriter); + this.persistenceProcessor = persistenceProcessor; + requestRing = disruptor.start(); + } + + + + @Override + public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception { + persistenceProcessor.addCommitToBatch(startTimestamp,commitTimestamp,c,monCtx); + } + + @Override + public void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception { + persistenceProcessor.addCommitRetryToBatch(startTimestamp,c,monCtx); + } + + @Override + public void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception { + persistenceProcessor.addAbortToBatch(startTimestamp,c,monCtx); + } + + @Override + public void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception { + persistenceProcessor.addTimestampToBatch(startTimestamp,c,monCtx); + } + + @Override + public void onTimeout() throws Exception { + persistenceProcessor.triggerCurrentBatchFlush(); + } +} diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java new file mode 100644 index 000000000..9ce490855 --- /dev/null +++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.tso; + +import com.google.inject.Inject; +import org.apache.omid.metrics.MetricsRegistry; +import org.jboss.netty.channel.Channel; + +import java.io.IOException; + +public class RequestProcessorSkipCT extends AbstractRequestProcessor { + + + private final ReplyProcessor replyProcessor; + + private final LeaseManagement leaseManager; + private final Panicker panicker; + private final String tsoHostAndPort; + + @Inject + RequestProcessorSkipCT(MetricsRegistry metrics, + TimestampOracle timestampOracle, + ReplyProcessor replyProcessor, + Panicker panicker, + LeaseManagement leaseManager, + TSOServerConfig config, + LowWatermarkWriter lowWatermarkWriter, + String tsoHostAndPort) throws IOException { + super(metrics, timestampOracle, panicker, config, lowWatermarkWriter); + this.replyProcessor = replyProcessor; + this.tsoHostAndPort = tsoHostAndPort; + requestRing = disruptor.start(); + this.leaseManager = leaseManager; + this.panicker = panicker; + } + + private void commitSuicideIfNotMaster() { + if (!leaseManager.stillInLeasePeriod()) { + panicker.panic("Replica " + tsoHostAndPort + " lost mastership whilst flushing data. Committing suicide"); + } + } + + @Override + public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) { + commitSuicideIfNotMaster(); + replyProcessor.sendCommitResponse(startTimestamp, commitTimestamp, c, monCtx); + } + + @Override + public void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) { + replyProcessor.sendAbortResponse(startTimestamp, c, monCtx); + } + + @Override + public void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) { + replyProcessor.sendAbortResponse(startTimestamp, c, monCtx); + } + + @Override + public void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) { + replyProcessor.sendTimestampResponse(startTimestamp, c, monCtx); + } + + @Override + public void onTimeout() { + + } +} diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java index 6d923bec4..610e76052 100644 --- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java +++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java @@ -133,16 +133,16 @@ private void handleCommitRetry(RetryEvent event) { if (commitTimestamp.isPresent()) { if (commitTimestamp.get().isValid()) { LOG.trace("Tx {}: Valid commit TS found in Commit Table. Sending Commit to client.", startTimestamp); - replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel()); + replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel(), event.getMonCtx()); txAlreadyCommittedMeter.mark(); } else { LOG.trace("Tx {}: Invalid tx marker found. Sending Abort to client.", startTimestamp); - replyProc.sendAbortResponse(startTimestamp, event.getChannel()); + replyProc.sendAbortResponse(startTimestamp, event.getChannel(), event.getMonCtx()); invalidTxMeter.mark(); } } else { LOG.trace("Tx {}: No Commit TS found in Commit Table. Sending Abort to client.", startTimestamp); - replyProc.sendAbortResponse(startTimestamp, event.getChannel()); + replyProc.sendAbortResponse(startTimestamp, event.getChannel(), event.getMonCtx()); noCTFoundMeter.mark(); } } catch (InterruptedException e) { diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java index fe99880c7..f6fb273af 100644 --- a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java +++ b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java @@ -165,14 +165,14 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { } if (request.hasTimestampRequest()) { - requestProcessor.timestampRequest(ctx.getChannel(), new MonitoringContext(metrics)); + requestProcessor.timestampRequest(ctx.getChannel(), MonitoringContextFactory.getInstance(config,metrics)); } else if (request.hasCommitRequest()) { TSOProto.CommitRequest cr = request.getCommitRequest(); requestProcessor.commitRequest(cr.getStartTimestamp(), cr.getCellIdList(), cr.getIsRetry(), ctx.getChannel(), - new MonitoringContext(metrics)); + MonitoringContextFactory.getInstance(config,metrics)); } else { LOG.error("Invalid request {}. Closing channel {}", request, ctx.getChannel()); ctx.getChannel().close(); @@ -240,6 +240,7 @@ private void checkHandshake(final ChannelHandlerContext ctx, TSOProto.HandshakeR } else { response.setClientCompatible(false); } + response.setLowLatency(config.getLowLatency()); ctx.getChannel().write(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build()); } diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java index a7aec278c..ca30d2595 100644 --- a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java +++ b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java @@ -44,6 +44,7 @@ protected void configure() { bind(TSOChannelHandler.class).in(Singleton.class); bind(TSOStateManager.class).to(TSOStateManagerImpl.class).in(Singleton.class); bind(TimestampOracle.class).to(TimestampOracleImpl.class).in(Singleton.class); + bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class); bind(Panicker.class).to(SystemExitPanicker.class).in(Singleton.class); install(new BatchPoolModule(config)); diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java index 19d9f0155..98ee6cd2f 100644 --- a/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java +++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java @@ -52,7 +52,8 @@ public class TSOServer extends AbstractIdleService { private RetryProcessor retryProcessor; @Inject private ReplyProcessor replyProcessor; - + @Inject + private LowWatermarkWriter lowWatermarkWriter; // ---------------------------------------------------------------------------------------------------------------- // High availability related variables // ---------------------------------------------------------------------------------------------------------------- diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java index 329221181..71d5280d0 100644 --- a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java +++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java @@ -82,6 +82,26 @@ public TSOServerConfig() { private String networkIfaceName = NetworkUtils.getDefaultNetworkInterface(); + public Boolean getLowLatency() { + return lowLatency; + } + + public void setLowLatency(Boolean lowLatency) { + this.lowLatency = lowLatency; + } + + private Boolean lowLatency; + + public boolean getMonitorContext() { + return monitorContext; + } + + public void setMonitorContext(boolean monitorContext) { + this.monitorContext = monitorContext; + } + + public boolean monitorContext; + public int getPort() { return port; } diff --git a/tso-server/src/main/resources/default-omid-server-configuration.yml b/tso-server/src/main/resources/default-omid-server-configuration.yml index da0c53196..b7f39bbb8 100644 --- a/tso-server/src/main/resources/default-omid-server-configuration.yml +++ b/tso-server/src/main/resources/default-omid-server-configuration.yml @@ -41,6 +41,9 @@ batchSizePerCTWriter: 25 # When this timeout expires, the contents of the batch are flushed to the datastore batchPersistTimeoutInMs: 10 +#low latency mode - clients are expected to update commit table +lowLatency: false + # Default module configuration (No TSO High Availability & in-memory storage for timestamp and commit tables) timestampStoreModule: !!org.apache.omid.tso.InMemoryTimestampStorageModule [ ] commitTableStoreModule: !!org.apache.omid.tso.InMemoryCommitTableStorageModule [ ] @@ -49,6 +52,8 @@ leaseModule: !!org.apache.omid.tso.VoidLeaseManagementModule [ ] # Default stats/metrics configuration metrics: !!org.apache.omid.metrics.NullMetricsProvider [ ] +monitorContext: false + # --------------------------------------------------------------------------------------------------------------------- # Timestamp storage configuration options # --------------------------------------------------------------------------------------------------------------------- diff --git a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java index 17fd2e003..2e5aa7d4b 100644 --- a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java +++ b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java @@ -53,6 +53,7 @@ protected void configure() { bind(TimestampStorage.class).to(InMemoryTimestampStorage.class).in(Singleton.class); bind(TimestampOracle.class).to(PausableTimestampOracle.class).in(Singleton.class); bind(Panicker.class).to(MockPanicker.class).in(Singleton.class); + bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class); install(new BatchPoolModule(config)); install(config.getLeaseModule()); diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java index 573cd8903..c286f854d 100644 --- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java +++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java @@ -43,7 +43,7 @@ public class TestBatch { @Mock private Channel channel; @Mock - private MonitoringContext monCtx; + private MonitoringContextImpl monCtx; @BeforeMethod void setup() { diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java index ae89f0188..5e1613cc6 100644 --- a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java +++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java @@ -137,9 +137,11 @@ public Client getClient() { handlers, metrics); - proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics)); + proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics)); - new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class)); + LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics); + + new RequestProcessorPersistCT(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class), lowWatermarkWriter); verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class)); @@ -189,9 +191,11 @@ public Client getClient() { panicker, handlers, metrics); - proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics)); + proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics)); + + LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics); - new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class)); + new RequestProcessorPersistCT(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class), lowWatermarkWriter); verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class)); diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java index 4779608b0..94e8f8864 100644 --- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java +++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java @@ -68,6 +68,7 @@ public class TestPersistenceProcessor { private MetricsRegistry metrics; private CommitTable commitTable; + private LowWatermarkWriter lowWatermarkWriter; @BeforeMethod(alwaysRun = true, timeOut = 30_000) public void initMocksAndComponents() throws Exception { @@ -101,29 +102,9 @@ void afterMethod() { public void testLowWatermarkIsPersisted() throws Exception { TSOServerConfig tsoConfig = new TSOServerConfig(); + lowWatermarkWriter = new LowWatermarkWriterImpl(tsoConfig, commitTable, metrics); - PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()]; - for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) { - handlers[i] = new PersistenceProcessorHandler(metrics, - "localhost:1234", - mock(LeaseManager.class), - commitTable, - mock(ReplyProcessor.class), - retryProcessor, - panicker); - } - - // Component under test - PersistenceProcessorImpl persistenceProcessor = - new PersistenceProcessorImpl(tsoConfig, - new BlockingWaitStrategy(), - commitTable, - mock(ObjectPool.class), - panicker, - handlers, - metrics); - - persistenceProcessor.persistLowWatermark(ANY_LWM).get(); + lowWatermarkWriter.persistLowWatermark(ANY_LWM).get(); ArgumentCaptor lwmCapture = ArgumentCaptor.forClass(Long.class); CommitTable.Writer lwmWriter = commitTable.getWriter(); @@ -166,10 +147,10 @@ public void testCommitPersistenceWithSingleCommitTableWriter() throws Exception verify(batchPool, times(1)).borrowObject(); // Called during initialization - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Flush: batch full + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Flush: batch full verify(batchPool, times(1 + BATCH_SIZE_PER_CT_WRITER)).borrowObject(); // 3: 1 in init + 2 when flushing @@ -211,11 +192,11 @@ public void testCommitPersistenceWithMultipleCommitTableWriters() throws Excepti verify(batchPool, times(1)).borrowObject(); // Called during initialization // Fill 1st handler Batches completely - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 1st batch full verify(batchPool, times(2)).borrowObject(); - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 2nd batch full verify(batchPool, times(3)).borrowObject(); // Test empty flush does not trigger response in getting a new currentBatch @@ -223,14 +204,14 @@ public void testCommitPersistenceWithMultipleCommitTableWriters() throws Excepti verify(batchPool, times(3)).borrowObject(); // Fill 2nd handler Batches completely - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 1st batch full + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 2nd batch full verify(batchPool, times(1 + (NUM_CT_WRITERS * BATCH_SIZE_PER_CT_WRITER))).borrowObject(); // Start filling a new currentBatch and flush it immediately - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Batch not full + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Batch not full verify(batchPool, times(5)).borrowObject(); proc.triggerCurrentBatchFlush(); // Flushing should provoke invocation of a new batch verify(batchPool, times(6)).borrowObject(); @@ -281,7 +262,7 @@ public void testCommitPersistenceWithNonHALeaseManager() throws Exception { // The non-ha lease manager always return true for // stillInLeasePeriod(), so verify the currentBatch sends replies as master - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); proc.triggerCurrentBatchFlush(); verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod(); verify(batchPool, times(2)).borrowObject(); @@ -336,7 +317,7 @@ private void testPersistenceWithHALeaseManagerPreservingLease(TSOServerConfig ts // Test: Configure the lease manager to return true always doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod(); - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); proc.triggerCurrentBatchFlush(); verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod(); verify(batchPool, times(2)).borrowObject(); @@ -357,7 +338,7 @@ private void testPersistenceWithHALeaseManagerFailingToPreserveLease1(TSOServerC // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod(); - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); proc.triggerCurrentBatchFlush(); verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod(); verify(batchPool, times(2)).borrowObject(); @@ -378,7 +359,7 @@ private void testPersistenceWithHALeaseManagerFailingToPreserveLease2(TSOServerC // Test: Configure the lease manager to return false for stillInLeasePeriod doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod(); - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); proc.triggerCurrentBatchFlush(); verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod(); verify(batchPool, times(2)).borrowObject(); @@ -402,7 +383,7 @@ private void testPersistenceWithHALeaseManagerFailingToPreserveLease3(TSOServerC // Configure mock writer to flush unsuccessfully doThrow(new IOException("Unable to write")).when(mockWriter).flush(); doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod(); - proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); + proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); proc.triggerCurrentBatchFlush(); verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod(); verify(batchPool, times(2)).borrowObject(); @@ -452,7 +433,7 @@ public void testCommitTableExceptionOnCommitPersistenceTakesDownDaemon() throws PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool, panicker, handlers, metrics); - MonitoringContext monCtx = new MonitoringContext(metrics); + MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics); // Configure lease manager to work normally doReturn(true).when(leaseManager).stillInLeasePeriod(); @@ -492,7 +473,7 @@ public void testRuntimeExceptionOnCommitPersistenceTakesDownDaemon() throws Exce // Configure writer to explode with a runtime exception doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong()); - MonitoringContext monCtx = new MonitoringContext(metrics); + MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics); // Check the panic is extended! proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx); diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java index 43f354f94..4f190f916 100644 --- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java +++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java @@ -167,7 +167,7 @@ public void testProcessingOfEmptyBatchPersistEvent() throws Exception { verify(persistenceHandler, times(1)).flush(eq(0)); verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch)); - verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class)); + verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class)); verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch)); assertTrue(batch.isEmpty()); @@ -178,14 +178,14 @@ public void testProcessingOfBatchPersistEventWithASingleTimestampEvent() throws // Prepare test batch Batch batch = new Batch(BATCH_ID, BATCH_SIZE); - batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class)); + batch.addTimestamp(FIRST_ST, null, mock(MonitoringContextImpl.class)); PersistBatchEvent batchEvent = new PersistBatchEvent(); PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch); persistenceHandler.onEvent(batchEvent); verify(persistenceHandler, times(1)).flush(eq(0)); verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch)); - verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class)); + verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class)); verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch)); assertEquals(batch.getNumEvents(), 1); assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST); @@ -197,14 +197,14 @@ public void testProcessingOfBatchPersistEventWithASingleCommitEvent() throws Exc // Prepare test batch Batch batch = new Batch(BATCH_ID, BATCH_SIZE); - batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class)); + batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class)); PersistBatchEvent batchEvent = new PersistBatchEvent(); PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch); persistenceHandler.onEvent(batchEvent); verify(persistenceHandler, times(1)).flush(eq(1)); verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch); - verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class)); + verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class)); verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch)); assertEquals(batch.getNumEvents(), 1); assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST); @@ -217,14 +217,14 @@ public void testProcessingOfBatchPersistEventWithASingleAbortEventNoRetry() thro // Prepare test batch Batch batch = new Batch(BATCH_ID, BATCH_SIZE); - batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class)); + batch.addAbort(FIRST_ST, null, mock(MonitoringContextImpl.class)); PersistBatchEvent batchEvent = new PersistBatchEvent(); PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch); persistenceHandler.onEvent(batchEvent); verify(persistenceHandler, times(1)).flush(eq(0)); verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch); - verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class)); + verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class)); verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch)); assertEquals(batch.getNumEvents(), 1); assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST); @@ -236,7 +236,7 @@ public void testProcessingOfBatchPersistEventWithASingleCommitRetryEvent() throw // Prepare test batch Batch batch = new Batch(BATCH_ID, BATCH_SIZE); - batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class)); + batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class)); PersistBatchEvent batchEvent = new PersistBatchEvent(); PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch); @@ -245,7 +245,7 @@ public void testProcessingOfBatchPersistEventWithASingleCommitRetryEvent() throw verify(persistenceHandler, times(1)).flush(eq(0)); verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch); - verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class)); + verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class)); verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch)); assertEquals(batch.getNumEvents(), 0); @@ -256,8 +256,8 @@ public void testProcessingOfBatchPersistEventWith2EventsCommitAndCommitRetry() t // Prepare test batch Batch batch = new Batch(BATCH_ID, BATCH_SIZE); - batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class)); - batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class)); + batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class)); + batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class)); PersistBatchEvent batchEvent = new PersistBatchEvent(); PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch); @@ -269,7 +269,7 @@ public void testProcessingOfBatchPersistEventWith2EventsCommitAndCommitRetry() t verify(persistenceHandler, times(1)).flush(eq(1)); verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch)); - verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class)); + verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class)); verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch)); assertEquals(batch.getNumEvents(), 1); assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST); @@ -285,8 +285,8 @@ public void testProcessingOfBatchPersistEventWith2EventsCommitRetryAndCommit() t // Prepare test batch Batch batch = new Batch(BATCH_ID, BATCH_SIZE); - batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class)); - batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContext.class)); + batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class)); + batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContextImpl.class)); PersistBatchEvent batchEvent = new PersistBatchEvent(); PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch); @@ -298,7 +298,7 @@ public void testProcessingOfBatchPersistEventWith2EventsCommitRetryAndCommit() t verify(persistenceHandler, times(1)).flush(eq(1)); verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch)); - verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class)); + verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class)); verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch)); assertEquals(batch.getNumEvents(), 1); assertEquals(batch.get(0).getStartTimestamp(), SECOND_ST); @@ -311,8 +311,8 @@ public void testProcessingOfBatchPersistEventWith2CommitRetryEvents() throws Exc // Prepare test batch Batch batch = new Batch(BATCH_ID, BATCH_SIZE); - batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class)); - batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class)); + batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class)); + batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class)); PersistBatchEvent batchEvent = new PersistBatchEvent(); PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch); @@ -324,8 +324,8 @@ public void testProcessingOfBatchPersistEventWith2CommitRetryEvents() throws Exc verify(persistenceHandler, times(1)).flush(eq(0)); verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch)); - verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class)); - verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class)); + verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class)); + verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class)); verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch)); assertEquals(batch.getNumEvents(), 0); @@ -336,8 +336,8 @@ public void testProcessingOfBatchPersistEventWith2AbortEvents() throws Exception // Prepare test batch Batch batch = new Batch(BATCH_ID, BATCH_SIZE); - batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class)); - batch.addAbort(SECOND_ST, null, mock(MonitoringContext.class)); + batch.addAbort(FIRST_ST, null, mock(MonitoringContextImpl.class)); + batch.addAbort(SECOND_ST, null, mock(MonitoringContextImpl.class)); PersistBatchEvent batchEvent = new PersistBatchEvent(); PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch); @@ -349,7 +349,7 @@ public void testProcessingOfBatchPersistEventWith2AbortEvents() throws Exception verify(persistenceHandler, times(1)).flush(eq(0)); verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch)); - verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class)); + verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class)); verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch)); assertEquals(batch.getNumEvents(), 2); assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST); @@ -364,12 +364,12 @@ public void testProcessingOfBatchPersistEventWithMultipleRetryAndNonRetryEvents( // Prepare test batch Batch batch = new Batch(BATCH_ID, BATCH_SIZE); - batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class)); - batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class)); - batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContext.class)); - batch.addAbort(FOURTH_ST, null, mock(MonitoringContext.class)); - batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContext.class)); - batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContext.class)); + batch.addTimestamp(FIRST_ST, null, mock(MonitoringContextImpl.class)); + batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class)); + batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContextImpl.class)); + batch.addAbort(FOURTH_ST, null, mock(MonitoringContextImpl.class)); + batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContextImpl.class)); + batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContextImpl.class)); PersistBatchEvent batchEvent = new PersistBatchEvent(); PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch); @@ -381,7 +381,7 @@ public void testProcessingOfBatchPersistEventWithMultipleRetryAndNonRetryEvents( verify(persistenceHandler, times(1)).flush(2); // 2 commits to flush verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch)); - verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class)); + verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class)); verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch)); assertEquals(batch.getNumEvents(), 4); assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST); @@ -408,7 +408,7 @@ public void testPanicPersistingEvents() throws Exception { // Prepare test batch Batch batch = new Batch(BATCH_ID, BATCH_SIZE); - batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class)); + batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class)); PersistBatchEvent batchEvent = new PersistBatchEvent(); PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch); @@ -450,7 +450,7 @@ public void testPanicBecauseMasterLosesMastership() throws Exception { // Prepare test batch Batch batch = new Batch(BATCH_ID, BATCH_SIZE); - batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class)); + batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class)); PersistBatchEvent batchEvent = new PersistBatchEvent(); PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch); @@ -485,7 +485,7 @@ public void testPanicBecauseMasterLosesMastership() throws Exception { // Prepare test batch batch = new Batch(BATCH_ID, BATCH_SIZE); - batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class)); + batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class)); batchEvent = new PersistBatchEvent(); PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch); diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java index 3ead24ba8..54d1e70ad 100644 --- a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java +++ b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java @@ -70,7 +70,7 @@ public class TestReplyProcessor { private Panicker panicker; @Mock - private MonitoringContext monCtx; + private MonitoringContextImpl monCtx; private MetricsRegistry metrics; @@ -247,11 +247,11 @@ public void testUnorderedSequenceOfBatchReplyEventsThatMustBeOrderedBeforeSendin inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(thirdBatchEvent)); InOrder inOrderReplies = inOrder(replyProcessor, replyProcessor, replyProcessor, replyProcessor, replyProcessor); - inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class)); - inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class)); - inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class)); - inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class)); - inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class)); + inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class), eq(monCtx)); + inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class), eq(monCtx)); + inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class), eq(monCtx)); + inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class), eq(monCtx)); + inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class), eq(monCtx)); } diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java index 405102a28..01844aee6 100644 --- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java +++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java @@ -56,6 +56,8 @@ public class TestRequestProcessor { // Request processor under test private RequestProcessor requestProc; + private LowWatermarkWriter lowWatermarkWriter; + @BeforeMethod public void beforeMethod() throws Exception { @@ -66,16 +68,16 @@ public void beforeMethod() throws Exception { new TimestampOracleImpl(metrics, new TimestampOracleImpl.InMemoryTimestampStorage(), new MockPanicker()); stateManager = new TSOStateManagerImpl(timestampOracle); - + lowWatermarkWriter = mock(LowWatermarkWriter.class); persist = mock(PersistenceProcessor.class); SettableFuture f = SettableFuture.create(); f.set(null); - doReturn(f).when(persist).persistLowWatermark(any(Long.class)); + doReturn(f).when(lowWatermarkWriter).persistLowWatermark(any(Long.class)); TSOServerConfig config = new TSOServerConfig(); config.setConflictMapSize(CONFLICT_MAP_SIZE); - requestProc = new RequestProcessorImpl(metrics, timestampOracle, persist, new MockPanicker(), config); + requestProc = new RequestProcessorPersistCT(metrics, timestampOracle, persist, new MockPanicker(), config, lowWatermarkWriter); // Initialize the state for the experiment stateManager.register(requestProc); @@ -86,16 +88,16 @@ public void beforeMethod() throws Exception { @Test(timeOut = 30_000) public void testTimestamp() throws Exception { - requestProc.timestampRequest(null, new MonitoringContext(metrics)); + requestProc.timestampRequest(null, new MonitoringContextImpl(metrics)); ArgumentCaptor firstTScapture = ArgumentCaptor.forClass(Long.class); verify(persist, timeout(100).times(1)).addTimestampToBatch( - firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class)); + firstTScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class)); long firstTS = firstTScapture.getValue(); // verify that timestamps increase monotonically for (int i = 0; i < 100; i++) { - requestProc.timestampRequest(null, new MonitoringContext(metrics)); - verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS++), any(Channel.class), any(MonitoringContext.class)); + requestProc.timestampRequest(null, new MonitoringContextImpl(metrics)); + verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS++), any(Channel.class), any(MonitoringContextImpl.class)); } } @@ -103,39 +105,39 @@ public void testTimestamp() throws Exception { @Test(timeOut = 30_000) public void testCommit() throws Exception { - requestProc.timestampRequest(null, new MonitoringContext(metrics)); + requestProc.timestampRequest(null, new MonitoringContextImpl(metrics)); ArgumentCaptor TScapture = ArgumentCaptor.forClass(Long.class); verify(persist, timeout(100).times(1)).addTimestampToBatch( - TScapture.capture(), any(Channel.class), any(MonitoringContext.class)); + TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class)); long firstTS = TScapture.getValue(); List writeSet = Lists.newArrayList(1L, 20L, 203L); - requestProc.commitRequest(firstTS - 1, writeSet, false, null, new MonitoringContext(metrics)); - verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), any(Channel.class), any(MonitoringContext.class)); + requestProc.commitRequest(firstTS - 1, writeSet, false, null, new MonitoringContextImpl(metrics)); + verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), any(Channel.class), any(MonitoringContextImpl.class)); - requestProc.commitRequest(firstTS, writeSet, false, null, new MonitoringContext(metrics)); + requestProc.commitRequest(firstTS, writeSet, false, null, new MonitoringContextImpl(metrics)); ArgumentCaptor commitTScapture = ArgumentCaptor.forClass(Long.class); - verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class)); + verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class)); assertTrue(commitTScapture.getValue() > firstTS, "Commit TS must be greater than start TS"); // test conflict - requestProc.timestampRequest(null, new MonitoringContext(metrics)); + requestProc.timestampRequest(null, new MonitoringContextImpl(metrics)); TScapture = ArgumentCaptor.forClass(Long.class); verify(persist, timeout(100).times(2)).addTimestampToBatch( - TScapture.capture(), any(Channel.class), any(MonitoringContext.class)); + TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class)); long secondTS = TScapture.getValue(); - requestProc.timestampRequest(null, new MonitoringContext(metrics)); + requestProc.timestampRequest(null, new MonitoringContextImpl(metrics)); TScapture = ArgumentCaptor.forClass(Long.class); verify(persist, timeout(100).times(3)).addTimestampToBatch( - TScapture.capture(), any(Channel.class), any(MonitoringContext.class)); + TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class)); long thirdTS = TScapture.getValue(); - requestProc.commitRequest(thirdTS, writeSet, false, null, new MonitoringContext(metrics)); - verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class)); - requestProc.commitRequest(secondTS, writeSet, false, null, new MonitoringContext(metrics)); - verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContext.class)); + requestProc.commitRequest(thirdTS, writeSet, false, null, new MonitoringContextImpl(metrics)); + verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class)); + requestProc.commitRequest(secondTS, writeSet, false, null, new MonitoringContextImpl(metrics)); + verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContextImpl.class)); } @@ -145,11 +147,11 @@ public void testCommitRequestAbortsWhenResettingRequestProcessorState() throws E List writeSet = Collections.emptyList(); // Start a transaction... - requestProc.timestampRequest(null, new MonitoringContext(metrics)); + requestProc.timestampRequest(null, new MonitoringContextImpl(metrics)); ArgumentCaptor capturedTS = ArgumentCaptor.forClass(Long.class); verify(persist, timeout(100).times(1)).addTimestampToBatch(capturedTS.capture(), any(Channel.class), - any(MonitoringContext.class)); + any(MonitoringContextImpl.class)); long startTS = capturedTS.getValue(); // ... simulate the reset of the RequestProcessor state (e.g. due to @@ -157,8 +159,8 @@ public void testCommitRequestAbortsWhenResettingRequestProcessorState() throws E stateManager.initialize(); // ...check that the transaction is aborted when trying to commit - requestProc.commitRequest(startTS, writeSet, false, null, new MonitoringContext(metrics)); - verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContext.class)); + requestProc.commitRequest(startTS, writeSet, false, null, new MonitoringContextImpl(metrics)); + verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContextImpl.class)); } @@ -173,17 +175,17 @@ public void testLowWatermarkIsStoredOnlyWhenACacheElementIsEvicted() throws Exce for (long i = 0; i < CONFLICT_MAP_SIZE + CONFLICT_MAP_ASSOCIATIVITY; i++) { long writeSetElementHash = i + 1; // This is to match the assigned CT: K/V in cache = WS Element Hash/CT List writeSet = Lists.newArrayList(writeSetElementHash); - requestProc.commitRequest(ANY_START_TS, writeSet, false, null, new MonitoringContext(metrics)); + requestProc.commitRequest(ANY_START_TS, writeSet, false, null, new MonitoringContextImpl(metrics)); } Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing // Check that first time its called is on init - verify(persist, timeout(100).times(1)).persistLowWatermark(eq(0L)); + verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(0L)); // Then, check it is called when cache is full and the first element is evicted (should be a 1) - verify(persist, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED)); + verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED)); // Finally it should never be called with the next element - verify(persist, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED)); + verify(lowWatermarkWriter, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED)); } diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java index 54302d00d..920618748 100644 --- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java +++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java @@ -25,7 +25,6 @@ import org.apache.omid.committable.CommitTable.CommitTimestamp; import org.apache.omid.committable.InMemoryCommitTable; import org.apache.omid.metrics.MetricsRegistry; -import org.apache.omid.metrics.NullMetricsProvider; import org.jboss.netty.channel.Channel; import org.mockito.ArgumentCaptor; import org.mockito.Mock; @@ -37,6 +36,7 @@ import org.testng.annotations.Test; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; @@ -57,6 +57,8 @@ public class TestRetryProcessor { private Panicker panicker; @Mock private MetricsRegistry metrics; + @Mock + private MonitoringContextImpl monCtx; private CommitTable commitTable; @@ -75,10 +77,10 @@ public void testRetriedRequestForANonExistingTxReturnsAbort() throws Exception { RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool); // Test we'll reply with an abort for a retry request when the start timestamp IS NOT in the commit table - retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics)); + retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, monCtx); ArgumentCaptor firstTSCapture = ArgumentCaptor.forClass(Long.class); - verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class)); + verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class), any(MonitoringContextImpl.class)); long startTS = firstTSCapture.getValue(); assertEquals(startTS, NON_EXISTING_ST_TX, "Captured timestamp should be the same as NON_EXISTING_ST_TX"); } @@ -92,13 +94,13 @@ public void testRetriedRequestForAnExistingTxReturnsCommit() throws Exception { // Test we'll reply with a commit for a retry request when the start timestamp IS in the commit table commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1); - retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics)); + retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContextImpl(metrics)); ArgumentCaptor firstTSCapture = ArgumentCaptor.forClass(Long.class); ArgumentCaptor secondTSCapture = ArgumentCaptor.forClass(Long.class); verify(replyProc, timeout(100).times(1)).sendCommitResponse(firstTSCapture.capture(), secondTSCapture.capture(), - any(Channel.class)); + any(Channel.class), any(MonitoringContextImpl.class)); long startTS = firstTSCapture.getValue(); long commitTS = secondTSCapture.getValue(); @@ -125,9 +127,9 @@ public void testRetriedRequestForInvalidatedTransactionReturnsAnAbort() throws E RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool); // Test we return an Abort to a retry request when the transaction id IS in the commit table BUT invalidated - retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics)); + retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContextImpl(metrics)); ArgumentCaptor startTSCapture = ArgumentCaptor.forClass(Long.class); - verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class)); + verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class), any(MonitoringContextImpl.class)); long startTS = startTSCapture.getValue(); Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX"); diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java index 968f4a989..bbc81325d 100644 --- a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java +++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java @@ -258,9 +258,9 @@ private void testWritingTimestampRequest(Channel channel) throws InterruptedExce tsBuilder.setTimestampRequest(tsRequestBuilder.build()); // Write into the channel channel.write(tsBuilder.build()).await(); - verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContext.class)); + verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class)); verify(requestProcessor, timeout(100).never()) - .commitRequest(anyLong(), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContext.class)); + .commitRequest(anyLong(), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContextImpl.class)); } private void testWritingCommitRequest(Channel channel) throws InterruptedException { @@ -275,9 +275,9 @@ private void testWritingCommitRequest(Channel channel) throws InterruptedExcepti assertTrue(r.hasCommitRequest()); // Write into the channel channel.write(commitBuilder.build()).await(); - verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContext.class)); + verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class)); verify(requestProcessor, timeout(100).times(1)) - .commitRequest(eq(666L), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContext.class)); + .commitRequest(eq(666L), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContextImpl.class)); } // ---------------------------------------------------------------------------------------------------------------- From cdfd0843cd20bde35dacd31d08daa89502e3b87f Mon Sep 17 00:00:00 2001 From: Yonatan Gottesman Date: Wed, 13 Jun 2018 10:12:51 +0300 Subject: [PATCH 2/2] remove low latency param from client config --- .../apache/omid/tso/client/OmidClientConfiguration.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java b/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java index 48ec31086..3542c55df 100644 --- a/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java +++ b/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java @@ -50,7 +50,6 @@ public enum PostCommitMode {SYNC, ASYNC} // Transaction Manager related params - private boolean lowLatency; private PostCommitMode postCommitMode = PostCommitMode.SYNC; // ---------------------------------------------------------------------------------------------------------------- @@ -69,12 +68,6 @@ public ConnType getConnectionType() { return connectionType; } - @Inject(optional = true) - @Named("omid.client.lowLatency") - public void setLowLatency(boolean lowLatency) { this.lowLatency = lowLatency;} - - public boolean getLowLatency() { return lowLatency;} - @Inject(optional = true) @Named("omid.client.connectionType") public void setConnectionType(ConnType connectionType) {