Skip to content

Commit 89b7963

Browse files
author
dsebban
committed
Importing java lz4 files
1 parent 3f0358b commit 89b7963

File tree

6 files changed

+1155
-3
lines changed

6 files changed

+1155
-3
lines changed

build.sbt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ lazy val commonSettings = Seq(
2626
"-Ywarn-value-discard"
2727
// "-Ywarn-unused-import"
2828
),
29+
javaHome.in(Compile) := {
30+
Some(file(sys.props("java.home")).getParentFile)
31+
},
2932
scalacOptions in (Compile, console) ~= {_.filterNot("-Ywarn-unused-import" == _)},
3033
scalacOptions in (Test, console) <<= (scalacOptions in (Compile, console)),
3134
libraryDependencies ++= Seq(
@@ -190,7 +193,7 @@ lazy val kafka =
190193
, libraryDependencies ++= Seq(
191194
"org.xerial.snappy" % "snappy-java" % "1.1.2.1" // for supporting a Snappy compression of message sets
192195
, "org.lz4" % "lz4-java" % "1.4.1" // for supporting a LZ4 compression of message sets
193-
, "org.apache.kafka" %% "kafka" % "0.10.2.0"
196+
, "org.apache.kafka" %% "kafka" % "0.10.2.0" % "test"
194197
)
195198
).dependsOn(
196199
common
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.common.record;
19+
20+
import java.nio.ByteBuffer;
21+
import java.util.ArrayDeque;
22+
import java.util.Deque;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
26+
/**
27+
* Simple non-threadsafe interface for caching byte buffers. This is suitable for simple cases like ensuring that
28+
* a given KafkaConsumer reuses the same decompression buffer when iterating over fetched records. For small record
29+
* batches, allocating a potentially large buffer (64 KB for LZ4) will dominate the cost of decompressing and
30+
* iterating over the records in the batch.
31+
*/
32+
public abstract class BufferSupplier implements AutoCloseable {
33+
34+
public static final BufferSupplier NO_CACHING = new BufferSupplier() {
35+
@Override
36+
public ByteBuffer get(int capacity) {
37+
return ByteBuffer.allocate(capacity);
38+
}
39+
40+
@Override
41+
public void release(ByteBuffer buffer) {}
42+
43+
@Override
44+
public void close() {}
45+
};
46+
47+
public static BufferSupplier create() {
48+
return new DefaultSupplier();
49+
}
50+
51+
/**
52+
* Supply a buffer with the required capacity. This may return a cached buffer or allocate a new instance.
53+
*/
54+
public abstract ByteBuffer get(int capacity);
55+
56+
/**
57+
* Return the provided buffer to be reused by a subsequent call to `get`.
58+
*/
59+
public abstract void release(ByteBuffer buffer);
60+
61+
/**
62+
* Release all resources associated with this supplier.
63+
*/
64+
public abstract void close();
65+
66+
private static class DefaultSupplier extends BufferSupplier {
67+
// We currently use a single block size, so optimise for that case
68+
private final Map<Integer, Deque<ByteBuffer>> bufferMap = new HashMap<>(1);
69+
70+
@Override
71+
public ByteBuffer get(int size) {
72+
Deque<ByteBuffer> bufferQueue = bufferMap.get(size);
73+
if (bufferQueue == null || bufferQueue.isEmpty())
74+
return ByteBuffer.allocate(size);
75+
else
76+
return bufferQueue.pollFirst();
77+
}
78+
79+
@Override
80+
public void release(ByteBuffer buffer) {
81+
buffer.clear();
82+
Deque<ByteBuffer> bufferQueue = bufferMap.get(buffer.capacity());
83+
if (bufferQueue == null) {
84+
// We currently keep a single buffer in flight, so optimise for that case
85+
bufferQueue = new ArrayDeque<>(1);
86+
bufferMap.put(buffer.capacity(), bufferQueue);
87+
}
88+
bufferQueue.addLast(buffer);
89+
}
90+
91+
@Override
92+
public void close() {
93+
bufferMap.clear();
94+
}
95+
}
96+
97+
/**
98+
* Simple buffer supplier for single-threaded usage. It caches a single buffer, which grows
99+
* monotonically as needed to fulfill the allocation request.
100+
*/
101+
public static class GrowableBufferSupplier extends BufferSupplier {
102+
private ByteBuffer cachedBuffer;
103+
104+
@Override
105+
public ByteBuffer get(int minCapacity) {
106+
if (cachedBuffer != null && cachedBuffer.capacity() >= minCapacity) {
107+
ByteBuffer res = cachedBuffer;
108+
cachedBuffer = null;
109+
return res;
110+
} else {
111+
cachedBuffer = null;
112+
return ByteBuffer.allocate(minCapacity);
113+
}
114+
}
115+
116+
@Override
117+
public void release(ByteBuffer buffer) {
118+
buffer.clear();
119+
cachedBuffer = buffer;
120+
}
121+
122+
@Override
123+
public void close() {
124+
cachedBuffer = null;
125+
}
126+
}
127+
128+
}

0 commit comments

Comments
 (0)