diff --git a/build.sbt b/build.sbt
index 156bd6d..950c570 100644
--- a/build.sbt
+++ b/build.sbt
@@ -189,6 +189,7 @@ lazy val kafka =
name := "protocol-kafka"
, libraryDependencies ++= Seq(
"org.xerial.snappy" % "snappy-java" % "1.1.2.1" // for supporting a Snappy compression of message sets
+ , "org.lz4" % "lz4-java" % "1.4.1" // for supporting a LZ4 compression of message sets
, "org.apache.kafka" %% "kafka" % "0.10.2.0" % "test"
)
).dependsOn(
diff --git a/kafka/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/kafka/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
new file mode 100644
index 0000000..048551f
--- /dev/null
+++ b/kafka/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH;
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
+import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import net.jpountz.lz4.LZ4Exception;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+/**
+ * A partial implementation of the v1.5.1 LZ4 Frame format.
+ *
+ * @see LZ4 Frame Format
+ */
+public final class KafkaLZ4BlockInputStream extends FilterInputStream {
+
+ public static final String PREMATURE_EOS = "Stream ended prematurely";
+ public static final String NOT_SUPPORTED = "Stream unsupported (invalid magic bytes)";
+ public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
+ public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
+
+ private final LZ4SafeDecompressor decompressor;
+ private final XXHash32 checksum;
+ private final byte[] buffer;
+ private final byte[] compressedBuffer;
+ private final int maxBlockSize;
+ private final boolean ignoreFlagDescriptorChecksum;
+ private FLG flg;
+ private BD bd;
+ private int bufferOffset;
+ private int bufferSize;
+ private boolean finished;
+
+ /**
+ * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
+ *
+ * @param in The stream to decompress
+ * @param ignoreFlagDescriptorChecksum for compatibility with old kafka clients, ignore incorrect HC byte
+ * @throws IOException
+ */
+ public KafkaLZ4BlockInputStream(InputStream in, boolean ignoreFlagDescriptorChecksum) throws IOException {
+ super(in);
+ decompressor = LZ4Factory.fastestInstance().safeDecompressor();
+ checksum = XXHashFactory.fastestInstance().hash32();
+ this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
+ readHeader();
+ maxBlockSize = bd.getBlockMaximumSize();
+ buffer = new byte[maxBlockSize];
+ compressedBuffer = new byte[maxBlockSize];
+ bufferOffset = 0;
+ bufferSize = 0;
+ finished = false;
+ }
+
+ /**
+ * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
+ *
+ * @param in The stream to decompress
+ * @throws IOException
+ */
+ public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
+ this(in, false);
+ }
+
+ /**
+ * Check whether KafkaLZ4BlockInputStream is configured to ignore the
+ * Frame Descriptor checksum, which is useful for compatibility with
+ * old client implementations that use incorrect checksum calculations.
+ */
+ public boolean ignoreFlagDescriptorChecksum() {
+ return this.ignoreFlagDescriptorChecksum;
+ }
+
+ /**
+ * Reads the magic number and frame descriptor from the underlying {@link InputStream}.
+ *
+ * @throws IOException
+ */
+ private void readHeader() throws IOException {
+ byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
+
+ // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
+ int headerOffset = 6;
+ if (in.read(header, 0, headerOffset) != headerOffset) {
+ throw new IOException(PREMATURE_EOS);
+ }
+
+ if (MAGIC != ByteUtils.readUnsignedIntLE(header, headerOffset - 6)) {
+ throw new IOException(NOT_SUPPORTED);
+ }
+ flg = FLG.fromByte(header[headerOffset - 2]);
+ bd = BD.fromByte(header[headerOffset - 1]);
+
+ if (flg.isContentSizeSet()) {
+ if (in.read(header, headerOffset, 8) != 8)
+ throw new IOException(PREMATURE_EOS);
+ headerOffset += 8;
+ }
+
+ // Final byte of Frame Descriptor is HC checksum
+ header[headerOffset++] = (byte) in.read();
+
+ // Old implementations produced incorrect HC checksums
+ if (ignoreFlagDescriptorChecksum)
+ return;
+
+ int offset = 4;
+ int len = headerOffset - offset - 1; // dont include magic bytes or HC
+ byte hash = (byte) ((checksum.hash(header, offset, len, 0) >> 8) & 0xFF);
+ if (hash != header[headerOffset - 1])
+ throw new IOException(DESCRIPTOR_HASH_MISMATCH);
+ }
+
+ /**
+ * Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32 checksum, and writes the
+ * result to a buffer.
+ *
+ * @throws IOException
+ */
+ private void readBlock() throws IOException {
+ int blockSize = ByteUtils.readUnsignedIntLE(in);
+
+ // Check for EndMark
+ if (blockSize == 0) {
+ finished = true;
+ if (flg.isContentChecksumSet())
+ ByteUtils.readUnsignedIntLE(in); // TODO: verify this content checksum
+ return;
+ } else if (blockSize > maxBlockSize) {
+ throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
+ }
+
+ boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
+ byte[] bufferToRead;
+ if (compressed) {
+ bufferToRead = compressedBuffer;
+ } else {
+ blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
+ bufferToRead = buffer;
+ bufferSize = blockSize;
+ }
+
+ if (in.read(bufferToRead, 0, blockSize) != blockSize) {
+ throw new IOException(PREMATURE_EOS);
+ }
+
+ // verify checksum
+ if (flg.isBlockChecksumSet() && ByteUtils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
+ throw new IOException(BLOCK_HASH_MISMATCH);
+ }
+
+ if (compressed) {
+ try {
+ bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize);
+ } catch (LZ4Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ bufferOffset = 0;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (finished) {
+ return -1;
+ }
+ if (available() == 0) {
+ readBlock();
+ }
+ if (finished) {
+ return -1;
+ }
+
+ return buffer[bufferOffset++] & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ net.jpountz.util.SafeUtils.checkRange(b, off, len);
+ if (finished) {
+ return -1;
+ }
+ if (available() == 0) {
+ readBlock();
+ }
+ if (finished) {
+ return -1;
+ }
+ len = Math.min(len, available());
+ System.arraycopy(buffer, bufferOffset, b, off, len);
+ bufferOffset += len;
+ return len;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (finished) {
+ return 0;
+ }
+ if (available() == 0) {
+ readBlock();
+ }
+ if (finished) {
+ return 0;
+ }
+ n = Math.min(n, available());
+ bufferOffset += n;
+ return n;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return bufferSize - bufferOffset;
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ throw new RuntimeException("mark not supported");
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ throw new RuntimeException("reset not supported");
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+}
\ No newline at end of file
diff --git a/kafka/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/kafka/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
new file mode 100644
index 0000000..591ab16
--- /dev/null
+++ b/kafka/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+/**
+ * A partial implementation of the v1.5.1 LZ4 Frame format.
+ *
+ * @see LZ4 Frame Format
+ *
+ * This class is not thread-safe.
+ */
+public final class KafkaLZ4BlockOutputStream extends OutputStream {
+
+ public static final int MAGIC = 0x184D2204;
+ public static final int LZ4_MAX_HEADER_LENGTH = 19;
+ public static final int LZ4_FRAME_INCOMPRESSIBLE_MASK = 0x80000000;
+
+ public static final String CLOSED_STREAM = "The stream is already closed";
+
+ public static final int BLOCKSIZE_64KB = 4;
+ public static final int BLOCKSIZE_256KB = 5;
+ public static final int BLOCKSIZE_1MB = 6;
+ public static final int BLOCKSIZE_4MB = 7;
+
+ private final LZ4Compressor compressor;
+ private final XXHash32 checksum;
+ private final boolean useBrokenFlagDescriptorChecksum;
+ private final FLG flg;
+ private final BD bd;
+ private final int maxBlockSize;
+ private OutputStream out;
+ private byte[] buffer;
+ private byte[] compressedBuffer;
+ private int bufferOffset;
+ private boolean finished;
+
+ /**
+ * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+ *
+ * @param out The output stream to compress
+ * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
+ * values will generate an exception
+ * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for
+ * every block of data
+ * @param useBrokenFlagDescriptorChecksum Default: false. When true, writes an incorrect FrameDescriptor checksum
+ * compatible with older kafka clients.
+ * @throws IOException
+ */
+ public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws IOException {
+ this.out = out;
+ compressor = LZ4Factory.fastestInstance().fastCompressor();
+ checksum = XXHashFactory.fastestInstance().hash32();
+ this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
+ bd = new BD(blockSize);
+ flg = new FLG(blockChecksum);
+ bufferOffset = 0;
+ maxBlockSize = bd.getBlockMaximumSize();
+ buffer = new byte[maxBlockSize];
+ compressedBuffer = new byte[compressor.maxCompressedLength(maxBlockSize)];
+ finished = false;
+ writeHeader();
+ }
+
+ /**
+ * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+ *
+ * @param out The output stream to compress
+ * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
+ * values will generate an exception
+ * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for
+ * every block of data
+ * @throws IOException
+ */
+ public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException {
+ this(out, blockSize, blockChecksum, false);
+ }
+
+ /**
+ * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+ *
+ * @param out The stream to compress
+ * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
+ * values will generate an exception
+ * @throws IOException
+ */
+ public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException {
+ this(out, blockSize, false, false);
+ }
+
+ /**
+ * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+ *
+ * @param out The output stream to compress
+ * @throws IOException
+ */
+ public KafkaLZ4BlockOutputStream(OutputStream out) throws IOException {
+ this(out, BLOCKSIZE_64KB);
+ }
+
+ public KafkaLZ4BlockOutputStream(OutputStream out, boolean useBrokenHC) throws IOException {
+ this(out, BLOCKSIZE_64KB, false, useBrokenHC);
+ }
+
+ /**
+ * Check whether KafkaLZ4BlockInputStream is configured to write an
+ * incorrect Frame Descriptor checksum, which is useful for
+ * compatibility with old client implementations.
+ */
+ public boolean useBrokenFlagDescriptorChecksum() {
+ return this.useBrokenFlagDescriptorChecksum;
+ }
+
+ /**
+ * Writes the magic number and frame descriptor to the underlying {@link OutputStream}.
+ *
+ * @throws IOException
+ */
+ private void writeHeader() throws IOException {
+ ByteUtils.writeUnsignedIntLE(buffer, 0, MAGIC);
+ bufferOffset = 4;
+ buffer[bufferOffset++] = flg.toByte();
+ buffer[bufferOffset++] = bd.toByte();
+ // TODO write uncompressed content size, update flg.validate()
+
+ // compute checksum on all descriptor fields
+ int offset = 4;
+ int len = bufferOffset - offset;
+ if (this.useBrokenFlagDescriptorChecksum) {
+ len += offset;
+ offset = 0;
+ }
+ byte hash = (byte) ((checksum.hash(buffer, offset, len, 0) >> 8) & 0xFF);
+ buffer[bufferOffset++] = hash;
+
+ // write out frame descriptor
+ out.write(buffer, 0, bufferOffset);
+ bufferOffset = 0;
+ }
+
+ /**
+ * Compresses buffered data, optionally computes an XXHash32 checksum, and writes the result to the underlying
+ * {@link OutputStream}.
+ *
+ * @throws IOException
+ */
+ private void writeBlock() throws IOException {
+ if (bufferOffset == 0) {
+ return;
+ }
+
+ int compressedLength = compressor.compress(buffer, 0, bufferOffset, compressedBuffer, 0);
+ byte[] bufferToWrite = compressedBuffer;
+ int compressMethod = 0;
+
+ // Store block uncompressed if compressed length is greater (incompressible)
+ if (compressedLength >= bufferOffset) {
+ bufferToWrite = buffer;
+ compressedLength = bufferOffset;
+ compressMethod = LZ4_FRAME_INCOMPRESSIBLE_MASK;
+ }
+
+ // Write content
+ ByteUtils.writeUnsignedIntLE(out, compressedLength | compressMethod);
+ out.write(bufferToWrite, 0, compressedLength);
+
+ // Calculate and write block checksum
+ if (flg.isBlockChecksumSet()) {
+ int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0);
+ ByteUtils.writeUnsignedIntLE(out, hash);
+ }
+ bufferOffset = 0;
+ }
+
+ /**
+ * Similar to the {@link #writeBlock()} method. Writes a 0-length block (without block checksum) to signal the end
+ * of the block stream.
+ *
+ * @throws IOException
+ */
+ private void writeEndMark() throws IOException {
+ ByteUtils.writeUnsignedIntLE(out, 0);
+ // TODO implement content checksum, update flg.validate()
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ ensureNotFinished();
+ if (bufferOffset == maxBlockSize) {
+ writeBlock();
+ }
+ buffer[bufferOffset++] = (byte) b;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ net.jpountz.util.SafeUtils.checkRange(b, off, len);
+ ensureNotFinished();
+
+ int bufferRemainingLength = maxBlockSize - bufferOffset;
+ // while b will fill the buffer
+ while (len > bufferRemainingLength) {
+ // fill remaining space in buffer
+ System.arraycopy(b, off, buffer, bufferOffset, bufferRemainingLength);
+ bufferOffset = maxBlockSize;
+ writeBlock();
+ // compute new offset and length
+ off += bufferRemainingLength;
+ len -= bufferRemainingLength;
+ bufferRemainingLength = maxBlockSize;
+ }
+
+ System.arraycopy(b, off, buffer, bufferOffset, len);
+ bufferOffset += len;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (!finished) {
+ writeBlock();
+ }
+ if (out != null) {
+ out.flush();
+ }
+ }
+
+ /**
+ * A simple state check to ensure the stream is still open.
+ */
+ private void ensureNotFinished() {
+ if (finished) {
+ throw new IllegalStateException(CLOSED_STREAM);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (!finished) {
+ // basically flush the buffer writing the last block
+ writeBlock();
+ // write the end block
+ writeEndMark();
+ }
+ } finally {
+ try {
+ if (out != null) {
+ try (OutputStream outStream = out) {
+ outStream.flush();
+ }
+ }
+ } finally {
+ out = null;
+ buffer = null;
+ compressedBuffer = null;
+ finished = true;
+ }
+ }
+ }
+
+ public static class FLG {
+
+ private static final int VERSION = 1;
+
+ private final int reserved;
+ private final int contentChecksum;
+ private final int contentSize;
+ private final int blockChecksum;
+ private final int blockIndependence;
+ private final int version;
+
+ public FLG() {
+ this(false);
+ }
+
+ public FLG(boolean blockChecksum) {
+ this(0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION);
+ }
+
+ private FLG(int reserved,
+ int contentChecksum,
+ int contentSize,
+ int blockChecksum,
+ int blockIndependence,
+ int version) {
+ this.reserved = reserved;
+ this.contentChecksum = contentChecksum;
+ this.contentSize = contentSize;
+ this.blockChecksum = blockChecksum;
+ this.blockIndependence = blockIndependence;
+ this.version = version;
+ validate();
+ }
+
+ public static FLG fromByte(byte flg) {
+ int reserved = (flg >>> 0) & 3;
+ int contentChecksum = (flg >>> 2) & 1;
+ int contentSize = (flg >>> 3) & 1;
+ int blockChecksum = (flg >>> 4) & 1;
+ int blockIndependence = (flg >>> 5) & 1;
+ int version = (flg >>> 6) & 3;
+
+ return new FLG(reserved,
+ contentChecksum,
+ contentSize,
+ blockChecksum,
+ blockIndependence,
+ version);
+ }
+
+ public byte toByte() {
+ return (byte) (((reserved & 3) << 0) | ((contentChecksum & 1) << 2)
+ | ((contentSize & 1) << 3) | ((blockChecksum & 1) << 4) | ((blockIndependence & 1) << 5) | ((version & 3) << 6));
+ }
+
+ private void validate() {
+ if (reserved != 0) {
+ throw new RuntimeException("Reserved bits must be 0");
+ }
+ if (blockIndependence != 1) {
+ throw new RuntimeException("Dependent block stream is unsupported");
+ }
+ if (version != VERSION) {
+ throw new RuntimeException(String.format("Version %d is unsupported", version));
+ }
+ }
+
+ public boolean isContentChecksumSet() {
+ return contentChecksum == 1;
+ }
+
+ public boolean isContentSizeSet() {
+ return contentSize == 1;
+ }
+
+ public boolean isBlockChecksumSet() {
+ return blockChecksum == 1;
+ }
+
+ public boolean isBlockIndependenceSet() {
+ return blockIndependence == 1;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+ }
+
+ public static class BD {
+
+ private final int reserved2;
+ private final int blockSizeValue;
+ private final int reserved3;
+
+ public BD() {
+ this(0, BLOCKSIZE_64KB, 0);
+ }
+
+ public BD(int blockSizeValue) {
+ this(0, blockSizeValue, 0);
+ }
+
+ private BD(int reserved2, int blockSizeValue, int reserved3) {
+ this.reserved2 = reserved2;
+ this.blockSizeValue = blockSizeValue;
+ this.reserved3 = reserved3;
+ validate();
+ }
+
+ public static BD fromByte(byte bd) {
+ int reserved2 = (bd >>> 0) & 15;
+ int blockMaximumSize = (bd >>> 4) & 7;
+ int reserved3 = (bd >>> 7) & 1;
+
+ return new BD(reserved2, blockMaximumSize, reserved3);
+ }
+
+ private void validate() {
+ if (reserved2 != 0) {
+ throw new RuntimeException("Reserved2 field must be 0");
+ }
+ if (blockSizeValue < 4 || blockSizeValue > 7) {
+ throw new RuntimeException("Block size value must be between 4 and 7");
+ }
+ if (reserved3 != 0) {
+ throw new RuntimeException("Reserved3 field must be 0");
+ }
+ }
+
+ // 2^(2n+8)
+ public int getBlockMaximumSize() {
+ return 1 << ((2 * blockSizeValue) + 8);
+ }
+
+ public byte toByte() {
+ return (byte) (((reserved2 & 15) << 0) | ((blockSizeValue & 7) << 4) | ((reserved3 & 1) << 7));
+ }
+ }
+
+}
diff --git a/kafka/src/main/java/org/apache/kafka/common/utils/ByteUtils.java b/kafka/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
new file mode 100644
index 0000000..6efe311
--- /dev/null
+++ b/kafka/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * This classes exposes low-level methods for reading/writing from byte streams or buffers.
+ */
+public final class ByteUtils {
+
+ private ByteUtils() {}
+
+ /**
+ * Read an unsigned integer from the current position in the buffer, incrementing the position by 4 bytes
+ *
+ * @param buffer The buffer to read from
+ * @return The integer read, as a long to avoid signedness
+ */
+ public static long readUnsignedInt(ByteBuffer buffer) {
+ return buffer.getInt() & 0xffffffffL;
+ }
+
+ /**
+ * Read an unsigned integer from the given position without modifying the buffers position
+ *
+ * @param buffer the buffer to read from
+ * @param index the index from which to read the integer
+ * @return The integer read, as a long to avoid signedness
+ */
+ public static long readUnsignedInt(ByteBuffer buffer, int index) {
+ return buffer.getInt(index) & 0xffffffffL;
+ }
+
+ /**
+ * Read an unsigned integer stored in little-endian format from the {@link InputStream}.
+ *
+ * @param in The stream to read from
+ * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
+ */
+ public static int readUnsignedIntLE(InputStream in) throws IOException {
+ return in.read()
+ | (in.read() << 8)
+ | (in.read() << 16)
+ | (in.read() << 24);
+ }
+
+ /**
+ * Read an unsigned integer stored in little-endian format from a byte array
+ * at a given offset.
+ *
+ * @param buffer The byte array to read from
+ * @param offset The position in buffer to read from
+ * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
+ */
+ public static int readUnsignedIntLE(byte[] buffer, int offset) {
+ return (buffer[offset] << 0 & 0xff)
+ | ((buffer[offset + 1] & 0xff) << 8)
+ | ((buffer[offset + 2] & 0xff) << 16)
+ | ((buffer[offset + 3] & 0xff) << 24);
+ }
+
+ /**
+ * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
+ *
+ * @param buffer The buffer to write to
+ * @param index The position in the buffer at which to begin writing
+ * @param value The value to write
+ */
+ public static void writeUnsignedInt(ByteBuffer buffer, int index, long value) {
+ buffer.putInt(index, (int) (value & 0xffffffffL));
+ }
+
+ /**
+ * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
+ *
+ * @param buffer The buffer to write to
+ * @param value The value to write
+ */
+ public static void writeUnsignedInt(ByteBuffer buffer, long value) {
+ buffer.putInt((int) (value & 0xffffffffL));
+ }
+
+ /**
+ * Write an unsigned integer in little-endian format to the {@link OutputStream}.
+ *
+ * @param out The stream to write to
+ * @param value The value to write
+ */
+ public static void writeUnsignedIntLE(OutputStream out, int value) throws IOException {
+ out.write(value);
+ out.write(value >>> 8);
+ out.write(value >>> 16);
+ out.write(value >>> 24);
+ }
+
+ /**
+ * Write an unsigned integer in little-endian format to a byte array
+ * at a given offset.
+ *
+ * @param buffer The byte array to write to
+ * @param offset The position in buffer to write to
+ * @param value The value to write
+ */
+ public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) {
+ buffer[offset] = (byte) value;
+ buffer[offset + 1] = (byte) (value >>> 8);
+ buffer[offset + 2] = (byte) (value >>> 16);
+ buffer[offset + 3] = (byte) (value >>> 24);
+ }
+
+ /**
+ * Read an integer stored in variable-length format using zig-zag decoding from
+ * Google Protocol Buffers.
+ *
+ * @param buffer The buffer to read from
+ * @return The integer read
+ *
+ * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have been read
+ */
+ public static int readVarint(ByteBuffer buffer) {
+ int value = 0;
+ int i = 0;
+ int b;
+ while (((b = buffer.get()) & 0x80) != 0) {
+ value |= (b & 0x7f) << i;
+ i += 7;
+ if (i > 28)
+ throw illegalVarintException(value);
+ }
+ value |= b << i;
+ return (value >>> 1) ^ -(value & 1);
+ }
+
+ /**
+ * Read an integer stored in variable-length format using zig-zag decoding from
+ * Google Protocol Buffers.
+ *
+ * @param in The input to read from
+ * @return The integer read
+ *
+ * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have been read
+ * @throws IOException if {@link DataInput} throws {@link IOException}
+ */
+ public static int readVarint(DataInput in) throws IOException {
+ int value = 0;
+ int i = 0;
+ int b;
+ while (((b = in.readByte()) & 0x80) != 0) {
+ value |= (b & 0x7f) << i;
+ i += 7;
+ if (i > 28)
+ throw illegalVarintException(value);
+ }
+ value |= b << i;
+ return (value >>> 1) ^ -(value & 1);
+ }
+
+ /**
+ * Read a long stored in variable-length format using zig-zag decoding from
+ * Google Protocol Buffers.
+ *
+ * @param in The input to read from
+ * @return The long value read
+ *
+ * @throws IllegalArgumentException if variable-length value does not terminate after 10 bytes have been read
+ * @throws IOException if {@link DataInput} throws {@link IOException}
+ */
+ public static long readVarlong(DataInput in) throws IOException {
+ long value = 0L;
+ int i = 0;
+ long b;
+ while (((b = in.readByte()) & 0x80) != 0) {
+ value |= (b & 0x7f) << i;
+ i += 7;
+ if (i > 63)
+ throw illegalVarlongException(value);
+ }
+ value |= b << i;
+ return (value >>> 1) ^ -(value & 1);
+ }
+
+ /**
+ * Read a long stored in variable-length format using zig-zag decoding from
+ * Google Protocol Buffers.
+ *
+ * @param buffer The buffer to read from
+ * @return The long value read
+ *
+ * @throws IllegalArgumentException if variable-length value does not terminate after 10 bytes have been read
+ */
+ public static long readVarlong(ByteBuffer buffer) {
+ long value = 0L;
+ int i = 0;
+ long b;
+ while (((b = buffer.get()) & 0x80) != 0) {
+ value |= (b & 0x7f) << i;
+ i += 7;
+ if (i > 63)
+ throw illegalVarlongException(value);
+ }
+ value |= b << i;
+ return (value >>> 1) ^ -(value & 1);
+ }
+
+ /**
+ * Write the given integer following the variable-length zig-zag encoding from
+ * Google Protocol Buffers
+ * into the output.
+ *
+ * @param value The value to write
+ * @param out The output to write to
+ */
+ public static void writeVarint(int value, DataOutput out) throws IOException {
+ int v = (value << 1) ^ (value >> 31);
+ while ((v & 0xffffff80) != 0L) {
+ out.writeByte((v & 0x7f) | 0x80);
+ v >>>= 7;
+ }
+ out.writeByte((byte) v);
+ }
+
+ /**
+ * Write the given integer following the variable-length zig-zag encoding from
+ * Google Protocol Buffers
+ * into the buffer.
+ *
+ * @param value The value to write
+ * @param buffer The output to write to
+ */
+ public static void writeVarint(int value, ByteBuffer buffer) {
+ int v = (value << 1) ^ (value >> 31);
+ while ((v & 0xffffff80) != 0L) {
+ byte b = (byte) ((v & 0x7f) | 0x80);
+ buffer.put(b);
+ v >>>= 7;
+ }
+ buffer.put((byte) v);
+ }
+
+ /**
+ * Write the given integer following the variable-length zig-zag encoding from
+ * Google Protocol Buffers
+ * into the output.
+ *
+ * @param value The value to write
+ * @param out The output to write to
+ */
+ public static void writeVarlong(long value, DataOutput out) throws IOException {
+ long v = (value << 1) ^ (value >> 63);
+ while ((v & 0xffffffffffffff80L) != 0L) {
+ out.writeByte(((int) v & 0x7f) | 0x80);
+ v >>>= 7;
+ }
+ out.writeByte((byte) v);
+ }
+
+ /**
+ * Write the given integer following the variable-length zig-zag encoding from
+ * Google Protocol Buffers
+ * into the buffer.
+ *
+ * @param value The value to write
+ * @param buffer The buffer to write to
+ */
+ public static void writeVarlong(long value, ByteBuffer buffer) {
+ long v = (value << 1) ^ (value >> 63);
+ while ((v & 0xffffffffffffff80L) != 0L) {
+ byte b = (byte) ((v & 0x7f) | 0x80);
+ buffer.put(b);
+ v >>>= 7;
+ }
+ buffer.put((byte) v);
+ }
+
+ /**
+ * Number of bytes needed to encode an integer in variable-length format.
+ *
+ * @param value The signed value
+ */
+ public static int sizeOfVarint(int value) {
+ int v = (value << 1) ^ (value >> 31);
+ int bytes = 1;
+ while ((v & 0xffffff80) != 0L) {
+ bytes += 1;
+ v >>>= 7;
+ }
+ return bytes;
+ }
+
+ /**
+ * Number of bytes needed to encode a long in variable-length format.
+ *
+ * @param value The signed value
+ */
+ public static int sizeOfVarlong(long value) {
+ long v = (value << 1) ^ (value >> 63);
+ int bytes = 1;
+ while ((v & 0xffffffffffffff80L) != 0L) {
+ bytes += 1;
+ v >>>= 7;
+ }
+ return bytes;
+ }
+
+ private static IllegalArgumentException illegalVarintException(int value) {
+ throw new IllegalArgumentException("Varint is too long, the most significant bit in the 5th byte is set, " +
+ "converted value: " + Integer.toHexString(value));
+ }
+
+ private static IllegalArgumentException illegalVarlongException(long value) {
+ throw new IllegalArgumentException("Varlong is too long, most significant bit in the 10th byte is set, " +
+ "converted value: " + Long.toHexString(value));
+ }
+}
diff --git a/kafka/src/main/scala/spinoco/protocol/kafka/codec/MessageSetCodec.scala b/kafka/src/main/scala/spinoco/protocol/kafka/codec/MessageSetCodec.scala
index d691ac6..8275703 100644
--- a/kafka/src/main/scala/spinoco/protocol/kafka/codec/MessageSetCodec.scala
+++ b/kafka/src/main/scala/spinoco/protocol/kafka/codec/MessageSetCodec.scala
@@ -87,6 +87,7 @@ object MessageSetCodec {
compressionType match {
case Compression.GZIP => GZipCompression.inflate(v) flatMap decodeCompressed(Compression.GZIP)
case Compression.Snappy => SnappyCompression.inflate(v) flatMap decodeCompressed(Compression.Snappy)
+ case Compression.LZ4 if version == MessageVersion.V1 => LZ4Compression.inflate(v) flatMap decodeCompressed(Compression.LZ4)
case Compression.LZ4 => Attempt.failure(Err("LZ4 Compression not yet supported"))
}
}
@@ -117,7 +118,7 @@ object MessageSetCodec {
cm.compression match {
case Compression.GZIP => encodeCompressed(cm.messages).flatMap(GZipCompression.deflate)
case Compression.Snappy => encodeCompressed(cm.messages).flatMap(SnappyCompression.deflate)
- case Compression.LZ4 => Attempt.failure(Err("LZ4 Compression not yet supported"))
+ case Compression.LZ4 => encodeCompressed(cm.messages).flatMap(LZ4Compression.deflate)
}
val (timeFlag, time) = mkTime(cm.timeStamp)
diff --git a/kafka/src/main/scala/spinoco/protocol/kafka/codec/compression.scala b/kafka/src/main/scala/spinoco/protocol/kafka/codec/compression.scala
index 68972c9..1f96121 100644
--- a/kafka/src/main/scala/spinoco/protocol/kafka/codec/compression.scala
+++ b/kafka/src/main/scala/spinoco/protocol/kafka/codec/compression.scala
@@ -5,6 +5,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, Output
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
+import org.apache.kafka.common.record.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream}
import scodec.Attempt
import scodec.bits.ByteVector
@@ -36,6 +37,16 @@ object SnappyCompression {
}
+object LZ4Compression {
+
+ /** deflates uncompressed bytes **/
+ def deflate(bv:ByteVector):Attempt[ByteVector] =
+ StreamCompression.inflate(bv)(new KafkaLZ4BlockOutputStream(_))
+
+ def inflate(bv:ByteVector):Attempt[ByteVector] =
+ StreamCompression.deflate(bv)(new KafkaLZ4BlockInputStream(_ , false))
+
+}
object StreamCompression {
diff --git a/kafka/src/test/scala/spinoco/protocol/kafka/codec/MessageSetCodecSpec.scala b/kafka/src/test/scala/spinoco/protocol/kafka/codec/MessageSetCodecSpec.scala
index 712af94..6be0bb9 100644
--- a/kafka/src/test/scala/spinoco/protocol/kafka/codec/MessageSetCodecSpec.scala
+++ b/kafka/src/test/scala/spinoco/protocol/kafka/codec/MessageSetCodecSpec.scala
@@ -3,7 +3,7 @@ package spinoco.protocol.kafka.codec
import java.util.Date
-import kafka.message.{ByteBufferMessageSet, GZIPCompressionCodec, MessageAndOffset, SnappyCompressionCodec, Message => KMessage}
+import kafka.message.{ByteBufferMessageSet, GZIPCompressionCodec, MessageAndOffset, SnappyCompressionCodec, LZ4CompressionCodec, Message => KMessage}
import spinoco.protocol.common.ProtocolSpec
import scodec.{Attempt, DecodeResult}
import scodec.bits.{BitVector, ByteVector}
@@ -94,6 +94,15 @@ class MessageSetCodecSpec extends ProtocolSpec {
}
+ // "and fails when messages are compressed (LZ4) " in {
+ // val buff = new ByteBufferMessageSet(LZ4CompressionCodec, kMessages0: _*)
+
+ // val r = MessageSetCodec.messageSetCodec.decode(ByteVector(buff.getBuffer).bits)
+
+ // r shouldBe Attempt.failure(Err("0/Message Entry/Message/V0: LZ4 Compression not yet supported"))
+
+ // }
+
}
"deserializes V1" - {
@@ -153,6 +162,28 @@ class MessageSetCodecSpec extends ProtocolSpec {
}
+ "when messages are compressed (LZ4)" in {
+ val buff = new ByteBufferMessageSet(LZ4CompressionCodec, kMessages1: _*)
+
+ val r = MessageSetCodec.messageSetCodec.decode(ByteVector(buff.getBuffer).bits)
+
+ r shouldBe Attempt.successful {
+ DecodeResult(
+ Vector(
+ CompressedMessages(
+ offset = 2
+ , version = MessageVersion.V1
+ , compression = Compression.LZ4
+ , timeStamp = Some(CreateTime(now))
+ , messages = sMessages1
+ )
+ )
+ , BitVector.empty
+ )
+ }
+
+ }
+
}
"serializes V0" - {
@@ -181,6 +212,14 @@ class MessageSetCodecSpec extends ProtocolSpec {
kafkaSet.iterator.toVector.map { case MessageAndOffset(m,o) => kafka2Spinoco(m,o) } shouldBe sMessages0
}
+ "when messages are compressed (LZ4)" in {
+ val compressed = CompressedMessages(10,MessageVersion.V0,Compression.LZ4, None, sMessages0)
+ val encoded = MessageSetCodec.messageSetCodec.encode(Vector(compressed))
+ val kafkaSet = new ByteBufferMessageSet(encoded.map(_.toByteBuffer).getOrElse(fail("Failed to encode")))
+
+ kafkaSet.iterator.toVector.map { case MessageAndOffset(m,o) => kafka2Spinoco(m,o) } shouldBe sMessages0
+ }
+
}
@@ -211,6 +250,14 @@ class MessageSetCodecSpec extends ProtocolSpec {
kafkaSet.iterator.toVector.map { case MessageAndOffset(m,o) => kafka2Spinoco(m,o-8) } shouldBe sMessages1
}
+ "when messages are compressed (LZ4)" in {
+ val compressed = CompressedMessages(10,MessageVersion.V1,Compression.LZ4, Some(CreateTime(now)), sMessages1)
+ val encoded = MessageSetCodec.messageSetCodec.encode(Vector(compressed))
+ val kafkaSet = new ByteBufferMessageSet(encoded.map(_.toByteBuffer).getOrElse(fail("Failed to encode")))
+
+ kafkaSet.iterator.toVector.map { case MessageAndOffset(m,o) => kafka2Spinoco(m,o-8) } shouldBe sMessages1
+ }
+
}