From 5d980462b7b227aed9b4a6ebea88602c357f035e Mon Sep 17 00:00:00 2001 From: Norbert Meszaros Date: Fri, 29 Aug 2025 16:16:09 +0200 Subject: [PATCH] OMID-319 Use shaded protobuf-java from phoenix-thirdparty --- common/pom.xml | 54 ++++++- .../omid/protobuf/OmidProtobufDecoder.java | 138 ++++++++++++++++++ .../omid/protobuf/OmidProtobufEncoder.java | 79 ++++++++++ common/src/main/proto/TSOProto.proto | 5 +- hbase-commit-table/pom.xml | 6 + .../committable/hbase/HBaseCommitTable.java | 4 +- hbase-common/pom.xml | 5 + .../hbase/KeyGeneratorImplementations.java | 4 +- .../omid/transaction/OmidSnapshotFilter.java | 2 +- pom.xml | 43 ++---- timestamp-storage/pom.xml | 6 + .../org/apache/omid/tso/client/TSOClient.java | 8 +- tso-server/pom.xml | 6 +- .../apache/omid/tso/TSOChannelHandler.java | 8 +- .../omid/tso/ProgrammableTSOServer.java | 8 +- .../omid/tso/TestTSOChannelHandlerNetty.java | 8 +- .../java/org/apache/omid/tso/TestTSOLL.java | 2 +- .../apache/omid/tso/client/TSOClientRaw.java | 8 +- 18 files changed, 326 insertions(+), 68 deletions(-) create mode 100644 common/src/main/java/org/apache/omid/protobuf/OmidProtobufDecoder.java create mode 100644 common/src/main/java/org/apache/omid/protobuf/OmidProtobufEncoder.java diff --git a/common/pom.xml b/common/pom.xml index 3329f7271..708b2fd12 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -49,9 +49,9 @@ - com.google.protobuf - protobuf-java - ${protobuf-java.version} + org.apache.phoenix.thirdparty + phoenix-shaded-protobuf + ${phoenix.thirdparty.version} compile @@ -174,11 +174,57 @@ compile - ${protobuf.group}:protoc:${protoc.version}:exe:${os.detected.classifier} + com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} + + + com.google.code.maven-replacer-plugin + replacer + 1.5.3 + + ${basedir}/target/generated-sources/ + + **/*.java + + + true + + + ([^\.])com.google.protobuf + $1org.apache.phoenix.thirdparty.com.google.protobuf + + + (public)(\W+static)?(\W+final)?(\W+class) + @javax.annotation.Generated("proto") $1$2$3$4 + + + + (@javax.annotation.Generated\("proto"\) ){2} + $1 + + + + + + + replace + + process-sources + + + org.codehaus.mojo build-helper-maven-plugin diff --git a/common/src/main/java/org/apache/omid/protobuf/OmidProtobufDecoder.java b/common/src/main/java/org/apache/omid/protobuf/OmidProtobufDecoder.java new file mode 100644 index 000000000..cde2d8087 --- /dev/null +++ b/common/src/main/java/org/apache/omid/protobuf/OmidProtobufDecoder.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.protobuf; + +import org.apache.phoenix.thirdparty.com.google.protobuf.ExtensionRegistry; +import org.apache.phoenix.thirdparty.com.google.protobuf.ExtensionRegistryLite; +import org.apache.phoenix.thirdparty.com.google.protobuf.Message; +import org.apache.phoenix.thirdparty.com.google.protobuf.MessageLite; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.util.internal.ObjectUtil; + +import java.util.List; + +/** + * This class is from netty-codec, but the imports for protobuf are changed to use phoenix-thirdparty + * so netty would not use other protobuf that might be present on the classpath. + * + * Decodes a received {@link ByteBuf} into a + * Google Protocol Buffers + * {@link Message} and {@link MessageLite}. Please note that this decoder must + * be used with a proper {@link ByteToMessageDecoder} such as {@link OmidProtobufVarint32FrameDecoder} + * or {@link LengthFieldBasedFrameDecoder} if you are using a stream-based + * transport such as TCP/IP. A typical setup for TCP/IP would be: + *
+ * {@link ChannelPipeline} pipeline = ...;
+ *
+ * // Decoders
+ * pipeline.addLast("frameDecoder",
+ *                  new {@link LengthFieldBasedFrameDecoder}(1048576, 0, 4, 0, 4));
+ * pipeline.addLast("protobufDecoder",
+ *                  new {@link OmidProtobufDecoder}(MyMessage.getDefaultInstance()));
+ *
+ * // Encoder
+ * pipeline.addLast("frameEncoder", new {@link LengthFieldPrepender}(4));
+ * pipeline.addLast("protobufEncoder", new {@link OmidProtobufEncoder}());
+ * 
+ * and then you can use a {@code MyMessage} instead of a {@link ByteBuf} + * as a message: + *
+ * void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
+ *     MyMessage req = (MyMessage) msg;
+ *     MyMessage res = MyMessage.newBuilder().setText(
+ *                               "Did you say '" + req.getText() + "'?").build();
+ *     ch.write(res);
+ * }
+ * 
+ */ +@Sharable +public class OmidProtobufDecoder extends MessageToMessageDecoder { + + private static final boolean HAS_PARSER; + + static { + boolean hasParser = false; + try { + // MessageLite.getParserForType() is not available until protobuf 2.5.0. + MessageLite.class.getDeclaredMethod("getParserForType"); + hasParser = true; + } catch (Throwable t) { + // Ignore + } + + HAS_PARSER = hasParser; + } + + private final MessageLite prototype; + private final ExtensionRegistryLite extensionRegistry; + + /** + * Creates a new instance. + */ + public OmidProtobufDecoder(MessageLite prototype) { + this(prototype, null); + } + + public OmidProtobufDecoder(MessageLite prototype, ExtensionRegistry extensionRegistry) { + this(prototype, (ExtensionRegistryLite) extensionRegistry); + } + + public OmidProtobufDecoder(MessageLite prototype, ExtensionRegistryLite extensionRegistry) { + this.prototype = ObjectUtil.checkNotNull(prototype, "prototype").getDefaultInstanceForType(); + this.extensionRegistry = extensionRegistry; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) + throws Exception { + final byte[] array; + final int offset; + final int length = msg.readableBytes(); + if (msg.hasArray()) { + array = msg.array(); + offset = msg.arrayOffset() + msg.readerIndex(); + } else { + array = ByteBufUtil.getBytes(msg, msg.readerIndex(), length, false); + offset = 0; + } + + if (extensionRegistry == null) { + if (HAS_PARSER) { + out.add(prototype.getParserForType().parseFrom(array, offset, length)); + } else { + out.add(prototype.newBuilderForType().mergeFrom(array, offset, length).build()); + } + } else { + if (HAS_PARSER) { + out.add(prototype.getParserForType().parseFrom( + array, offset, length, extensionRegistry)); + } else { + out.add(prototype.newBuilderForType().mergeFrom( + array, offset, length, extensionRegistry).build()); + } + } + } +} diff --git a/common/src/main/java/org/apache/omid/protobuf/OmidProtobufEncoder.java b/common/src/main/java/org/apache/omid/protobuf/OmidProtobufEncoder.java new file mode 100644 index 000000000..de35b8508 --- /dev/null +++ b/common/src/main/java/org/apache/omid/protobuf/OmidProtobufEncoder.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.protobuf; + +import org.apache.phoenix.thirdparty.com.google.protobuf.Message; +import org.apache.phoenix.thirdparty.com.google.protobuf.MessageLite; +import org.apache.phoenix.thirdparty.com.google.protobuf.MessageLiteOrBuilder; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.codec.MessageToMessageEncoder; + +import java.util.List; + +import static io.netty.buffer.Unpooled.wrappedBuffer; + +/** + * This class is from netty-codec, but the imports for protobuf are changed to use phoenix-thirdparty + * so netty would not use other protobuf that might be present on the classpath. + * + * Encodes the requested Google + * Protocol Buffers {@link Message} and {@link MessageLite} into a + * {@link ByteBuf}. A typical setup for TCP/IP would be: + *
+ * {@link ChannelPipeline} pipeline = ...;
+ *
+ * // Decoders
+ * pipeline.addLast("frameDecoder",
+ *                  new {@link LengthFieldBasedFrameDecoder}(1048576, 0, 4, 0, 4));
+ * pipeline.addLast("protobufDecoder",
+ *                  new {@link OmidProtobufDecoder}(MyMessage.getDefaultInstance()));
+ *
+ * // Encoder
+ * pipeline.addLast("frameEncoder", new {@link LengthFieldPrepender}(4));
+ * pipeline.addLast("protobufEncoder", new {@link OmidProtobufEncoder}());
+ * 
+ * and then you can use a {@code MyMessage} instead of a {@link ByteBuf} + * as a message: + *
+ * void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
+ *     MyMessage req = (MyMessage) msg;
+ *     MyMessage res = MyMessage.newBuilder().setText(
+ *                               "Did you say '" + req.getText() + "'?").build();
+ *     ch.write(res);
+ * }
+ * 
+ */ +@Sharable +public class OmidProtobufEncoder extends MessageToMessageEncoder { + @Override + protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List out) + throws Exception { + if (msg instanceof MessageLite) { + out.add(wrappedBuffer(((MessageLite) msg).toByteArray())); + return; + } + if (msg instanceof MessageLite.Builder) { + out.add(wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray())); + } + } +} diff --git a/common/src/main/proto/TSOProto.proto b/common/src/main/proto/TSOProto.proto index 311bb99ef..6d43285a2 100644 --- a/common/src/main/proto/TSOProto.proto +++ b/common/src/main/proto/TSOProto.proto @@ -16,6 +16,7 @@ // limitations under the License. // +syntax = "proto3"; option java_package = "org.apache.omid.proto"; option optimize_for = SPEED; @@ -32,7 +33,7 @@ message TimestampRequest { message CommitRequest { optional int64 startTimestamp = 1; - optional bool isRetry = 2 [default = false]; + optional bool isRetry = 2; repeated int64 cellId = 3; repeated int64 TableId = 4; } @@ -75,7 +76,7 @@ message HandshakeRequest { message HandshakeResponse { optional bool clientCompatible = 1; optional Capabilities serverCapabilities = 2; - optional bool lowLatency = 3[default= false]; + optional bool lowLatency = 3; } message Transaction { diff --git a/hbase-commit-table/pom.xml b/hbase-commit-table/pom.xml index 8bbde7705..01a70e458 100644 --- a/hbase-commit-table/pom.xml +++ b/hbase-commit-table/pom.xml @@ -57,6 +57,12 @@ + + org.apache.phoenix.thirdparty + phoenix-shaded-protobuf + ${phoenix.thirdparty.version} + + diff --git a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java index 6ef9146f5..2aabf354a 100644 --- a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java +++ b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java @@ -48,8 +48,8 @@ import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.AbstractFuture; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; +import org.apache.phoenix.thirdparty.com.google.protobuf.CodedInputStream; +import org.apache.phoenix.thirdparty.com.google.protobuf.CodedOutputStream; public class HBaseCommitTable implements CommitTable { diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml index 5f37da664..08d1bb6b2 100644 --- a/hbase-common/pom.xml +++ b/hbase-common/pom.xml @@ -50,6 +50,11 @@ commons-lang3 + + org.apache.phoenix.thirdparty + phoenix-shaded-protobuf + ${phoenix.thirdparty.version} + org.apache.phoenix.thirdparty phoenix-shaded-guava diff --git a/hbase-common/src/main/java/org/apache/omid/committable/hbase/KeyGeneratorImplementations.java b/hbase-common/src/main/java/org/apache/omid/committable/hbase/KeyGeneratorImplementations.java index 3ee33ff63..b82c08e25 100644 --- a/hbase-common/src/main/java/org/apache/omid/committable/hbase/KeyGeneratorImplementations.java +++ b/hbase-common/src/main/java/org/apache/omid/committable/hbase/KeyGeneratorImplementations.java @@ -17,8 +17,8 @@ */ package org.apache.omid.committable.hbase; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; +import org.apache.phoenix.thirdparty.com.google.protobuf.CodedInputStream; +import org.apache.phoenix.thirdparty.com.google.protobuf.CodedOutputStream; import java.io.IOException; diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java index d387d7ae7..981de0080 100644 --- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java +++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java @@ -17,7 +17,7 @@ */ package org.apache.omid.transaction; -import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.phoenix.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Scan; diff --git a/pom.xml b/pom.xml index 0349a3da5..2b45a6f99 100644 --- a/pom.xml +++ b/pom.xml @@ -147,16 +147,14 @@ 2.5.12-hadoop3 3.4.2 - 2.1.0 + 2.2.0 5.1.0 7.5.1 1.7.36 2.21.0 4.1.119.Final - - com.google.protobuf - 2.5.0 - 2.5.0 + + 4.32.0 0.6.1 4.13.2 4.11.0 @@ -310,6 +308,13 @@ org.apache.commons.logging.LogFactory + + true + Use shaded version in phoenix-thirdparty + + com.google.protobuf.** + + true Use shaded version in phoenix-thirdparty @@ -747,34 +752,6 @@ - - linux-aarch64 - - - linux - aarch64 - - - - - org.openlabtesting.protobuf - 2.5.0.2 - - - - - osx-aarch64 - - - mac - aarch64 - - - - osx-x86_64 - - - code-coverage diff --git a/timestamp-storage/pom.xml b/timestamp-storage/pom.xml index 082fa4ed9..3c3c008dd 100644 --- a/timestamp-storage/pom.xml +++ b/timestamp-storage/pom.xml @@ -53,6 +53,12 @@ org.apache.curator curator-recipes + + org.apache.phoenix.thirdparty + phoenix-shaded-protobuf + ${phoenix.thirdparty.version} + + diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java index 1d825087b..ec9c17e21 100644 --- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java +++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java @@ -21,6 +21,8 @@ import org.apache.phoenix.thirdparty.com.google.common.net.HostAndPort; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.AbstractFuture; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.omid.protobuf.OmidProtobufDecoder; +import org.apache.omid.protobuf.OmidProtobufEncoder; import org.apache.omid.proto.TSOProto; import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel; @@ -45,8 +47,6 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; -import io.netty.handler.codec.protobuf.ProtobufDecoder; -import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.util.HashedWheelTimer; @@ -181,8 +181,8 @@ public void initChannel(SocketChannel channel) throws Exception { } pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4)); pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); - pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance())); - pipeline.addLast("protobufencoder", new ProtobufEncoder()); + pipeline.addLast("protobufdecoder", new OmidProtobufDecoder(TSOProto.Response.getDefaultInstance())); + pipeline.addLast("protobufencoder", new OmidProtobufEncoder()); pipeline.addLast("inboundHandler", new Handler(fsm)); } diff --git a/tso-server/pom.xml b/tso-server/pom.xml index 79a7067dd..c8ba32c20 100644 --- a/tso-server/pom.xml +++ b/tso-server/pom.xml @@ -141,9 +141,9 @@ netty-codec - com.google.protobuf - protobuf-java - ${protobuf-java.version} + org.apache.phoenix.thirdparty + phoenix-shaded-protobuf + ${phoenix.thirdparty.version} org.apache.curator diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java index 7e38d5a3b..35bf8c0f1 100644 --- a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java +++ b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java @@ -27,6 +27,8 @@ import org.apache.omid.metrics.MetricsRegistry; import org.apache.omid.proto.TSOProto; +import org.apache.omid.protobuf.OmidProtobufDecoder; +import org.apache.omid.protobuf.OmidProtobufEncoder; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; @@ -47,8 +49,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; -import io.netty.handler.codec.protobuf.ProtobufDecoder; -import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.ssl.OptionalSslHandler; import io.netty.handler.ssl.SslContext; import io.netty.util.AttributeKey; @@ -114,8 +114,8 @@ public void initChannel(SocketChannel channel) throws Exception { } pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4)); pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); - pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Request.getDefaultInstance())); - pipeline.addLast("protobufencoder", new ProtobufEncoder()); + pipeline.addLast("protobufdecoder", new OmidProtobufDecoder(TSOProto.Request.getDefaultInstance())); + pipeline.addLast("protobufencoder", new OmidProtobufEncoder()); pipeline.addLast("handler", TSOChannelHandler.this); } }); diff --git a/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java b/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java index 7142ef1c4..1d144625a 100644 --- a/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java +++ b/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java @@ -17,6 +17,8 @@ */ package org.apache.omid.tso; +import org.apache.omid.protobuf.OmidProtobufDecoder; +import org.apache.omid.protobuf.OmidProtobufEncoder; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.omid.proto.TSOProto; import org.apache.omid.tso.ProgrammableTSOServer.Response.ResponseType; @@ -36,8 +38,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; -import io.netty.handler.codec.protobuf.ProtobufDecoder; -import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.util.AttributeKey; import io.netty.util.concurrent.GlobalEventExecutor; @@ -85,8 +85,8 @@ public void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4)); pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); - pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Request.getDefaultInstance())); - pipeline.addLast("protobufencoder", new ProtobufEncoder()); + pipeline.addLast("protobufdecoder", new OmidProtobufDecoder(TSOProto.Request.getDefaultInstance())); + pipeline.addLast("protobufencoder", new OmidProtobufEncoder()); pipeline.addLast("handler", ProgrammableTSOServer.this); } }); diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java index d71bd85a1..c4b4d7aa5 100644 --- a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java +++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java @@ -18,6 +18,8 @@ package org.apache.omid.tso; import org.apache.omid.NetworkUtils; +import org.apache.omid.protobuf.OmidProtobufDecoder; +import org.apache.omid.protobuf.OmidProtobufEncoder; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.omid.metrics.NullMetricsProvider; import org.apache.omid.proto.TSOProto; @@ -38,8 +40,6 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; -import io.netty.handler.codec.protobuf.ProtobufDecoder; -import io.netty.handler.codec.protobuf.ProtobufEncoder; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.slf4j.Logger; @@ -342,8 +342,8 @@ public void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4)); pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); - pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance())); - pipeline.addLast("protobufencoder", new ProtobufEncoder()); + pipeline.addLast("protobufdecoder", new OmidProtobufDecoder(TSOProto.Response.getDefaultInstance())); + pipeline.addLast("protobufencoder", new OmidProtobufEncoder()); pipeline.addLast("testhandler", new ChannelInboundHandlerAdapter() { @Override diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java index c40dc8493..e2a2d6988 100644 --- a/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java +++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java @@ -123,7 +123,7 @@ public void testNoWriteToCommitTable() throws Exception { long ts1 = client.getNewStartTimestamp().get(); TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet)); - assertTrue(response1.getCommitResponse().hasCommitTimestamp()); + assertTrue(response1.getCommitResponse().getCommitTimestamp() != 0); Optional cts = commitTable.getClient().getCommitTimestamp(ts1).get(); assertTrue(cts.isPresent() == false); diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java index 03f1609f6..ca0e80a42 100644 --- a/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java +++ b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java @@ -17,6 +17,8 @@ */ package org.apache.omid.tso.client; +import org.apache.omid.protobuf.OmidProtobufDecoder; +import org.apache.omid.protobuf.OmidProtobufEncoder; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.omid.proto.TSOProto; @@ -37,8 +39,6 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; -import io.netty.handler.codec.protobuf.ProtobufDecoder; -import io.netty.handler.codec.protobuf.ProtobufEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,8 +78,8 @@ public void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4)); pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); - pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance())); - pipeline.addLast("protobufencoder", new ProtobufEncoder()); + pipeline.addLast("protobufdecoder", new OmidProtobufDecoder(TSOProto.Response.getDefaultInstance())); + pipeline.addLast("protobufencoder", new OmidProtobufEncoder()); pipeline.addLast("rawHandler", new RawHandler()); } });