Skip to content

Commit 93c2fe8

Browse files
authored
Add compression cache (#64)
* Implement caching for compression * Handle cache type and some improvements * Update more docstrings * Add tests and some improvements * Adapt stream_file_serializer and test * Use local cache and add note * Wrap compression cache and use unordered_map * Use tuple as key to cache and add TODO note
1 parent 659f2ce commit 93c2fe8

16 files changed

+510
-162
lines changed

include/sparrow_ipc/chunk_memory_serializer.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ namespace sparrow_ipc
165165
std::vector<uint8_t> buffer;
166166
memory_output_stream stream(buffer);
167167
any_output_stream astream(stream);
168-
serialize_record_batch(rb, astream, m_compression);
168+
CompressionCache compressed_buffers_cache;
169+
serialize_record_batch(rb, astream, m_compression, compressed_buffers_cache);
169170
m_pstream->write(std::move(buffer));
170171
}
171172
}

include/sparrow_ipc/compression.hpp

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <cstdint>
4+
#include <optional>
45
#include <span>
56
#include <variant>
67
#include <vector>
@@ -15,9 +16,41 @@ namespace sparrow_ipc
1516
ZSTD
1617
};
1718

18-
[[nodiscard]] SPARROW_IPC_API std::vector<std::uint8_t> compress(
19+
class CompressionCacheImpl;
20+
21+
class SPARROW_IPC_API CompressionCache
22+
{
23+
public:
24+
CompressionCache();
25+
~CompressionCache();
26+
27+
CompressionCache(CompressionCache&&) noexcept;
28+
CompressionCache& operator=(CompressionCache&&) noexcept;
29+
30+
CompressionCache(const CompressionCache&) = delete;
31+
CompressionCache& operator=(const CompressionCache&) = delete;
32+
33+
std::optional<std::span<const std::uint8_t>> find(const void* data_ptr, const size_t data_size);
34+
std::span<const std::uint8_t> store(const void* data_ptr, const size_t data_size, std::vector<std::uint8_t>&& data);
35+
36+
size_t size() const;
37+
size_t count(const void* data_ptr, const size_t data_size) const;
38+
bool empty() const;
39+
void clear();
40+
41+
private:
42+
std::unique_ptr<CompressionCacheImpl> m_pimpl;
43+
};
44+
45+
[[nodiscard]] SPARROW_IPC_API std::span<const std::uint8_t> compress(
1946
const CompressionType compression_type,
20-
std::span<const std::uint8_t> data);
47+
const std::span<const std::uint8_t>& data,
48+
CompressionCache& cache);
49+
50+
[[nodiscard]] SPARROW_IPC_API size_t get_compressed_size(
51+
const CompressionType compression_type,
52+
const std::span<const std::uint8_t>& data,
53+
CompressionCache& cache);
2154

2255
[[nodiscard]] SPARROW_IPC_API std::variant<std::vector<std::uint8_t>, std::span<const std::uint8_t>> decompress(
2356
const CompressionType compression_type,

include/sparrow_ipc/deserialize.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#pragma once
22

33
#include <optional>
4-
#include <string>
4+
#include <span>
55
#include <vector>
66

77
#include <sparrow/record_batch.hpp>
@@ -63,4 +63,4 @@ namespace sparrow_ipc
6363
*/
6464
[[nodiscard]] SPARROW_IPC_API std::vector<sparrow::record_batch>
6565
deserialize_stream(std::span<const uint8_t> data);
66-
}
66+
}

include/sparrow_ipc/flatbuffer_utils.hpp

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -255,12 +255,14 @@ namespace sparrow_ipc
255255
* @param flatbuf_compressed_buffers A vector to store the resulting compressed buffer metadata.
256256
* @param offset The current offset in the buffer layout, which will be updated by the function.
257257
* @param compression_type The compression algorithm to use.
258+
* @param cache A cache to store compressed buffers and avoid recompression.
258259
*/
259260
void fill_compressed_buffers(
260261
const sparrow::arrow_proxy& arrow_proxy,
261262
std::vector<org::apache::arrow::flatbuf::Buffer>& flatbuf_compressed_buffers,
262263
int64_t& offset,
263-
const CompressionType compression_type
264+
const CompressionType compression_type,
265+
CompressionCache& cache
264266
);
265267

266268
/**
@@ -273,11 +275,14 @@ namespace sparrow_ipc
273275
*
274276
* @param record_batch The record batch whose buffers' compressed metadata is to be retrieved.
275277
* @param compression_type The compression algorithm that would be applied (e.g., LZ4_FRAME, ZSTD).
278+
* @param cache A cache to store compressed buffers and avoid recompression.
276279
* @return A vector of FlatBuffer Buffer objects, each describing the offset and
277280
* size of a corresponding compressed buffer within a larger message body.
278281
*/
279282
[[nodiscard]] std::vector<org::apache::arrow::flatbuf::Buffer>
280-
get_compressed_buffers(const sparrow::record_batch& record_batch, const CompressionType compression_type);
283+
get_compressed_buffers(const sparrow::record_batch& record_batch,
284+
const CompressionType compression_type,
285+
CompressionCache& cache);
281286

282287
/**
283288
* @brief Calculates the total aligned size in bytes of all buffers in an Arrow array structure.
@@ -286,11 +291,16 @@ namespace sparrow_ipc
286291
* in an Arrow array structure, including buffers from child arrays. Each
287292
* buffer size is aligned to 8-byte boundaries as required by the Arrow format.
288293
*
289-
* @param arrow_proxy The Arrow array proxy containing buffers and child arrays
290-
* @param compression The compression type to use when serializing
291-
* @return int64_t The total aligned size in bytes of all buffers in the array hierarchy
294+
* @param arrow_proxy The Arrow array proxy containing buffers and child arrays.
295+
* @param compression Optional: The compression type to use when serializing.
296+
* @param cache Optional: A cache to store and retrieve compressed buffer sizes, avoiding recompression.
297+
* If compression is given, cache should be set as well.
298+
* @return int64_t The total aligned size in bytes of all buffers in the array hierarchy.
299+
* @throws std::invalid_argument if compression is given but not cache.
292300
*/
293-
[[nodiscard]] int64_t calculate_body_size(const sparrow::arrow_proxy& arrow_proxy, std::optional<CompressionType> compression = std::nullopt);
301+
[[nodiscard]] int64_t calculate_body_size(const sparrow::arrow_proxy& arrow_proxy,
302+
std::optional<CompressionType> compression = std::nullopt,
303+
std::optional<std::reference_wrapper<CompressionCache>> cache = std::nullopt);
294304

295305
/**
296306
* @brief Calculates the total body size of a record batch by summing the body sizes of all its columns.
@@ -299,11 +309,15 @@ namespace sparrow_ipc
299309
* the body size of each column's underlying Arrow array proxy. The body size represents
300310
* the total memory required for the serialized data content of the record batch.
301311
*
302-
* @param record_batch The sparrow record batch containing columns to calculate size for
303-
* @param compression The compression type to use when serializing
304-
* @return int64_t The total body size in bytes of all columns in the record batch
312+
* @param record_batch The sparrow record batch containing columns to calculate size for.
313+
* @param compression Optional: The compression type to use when serializing. If not provided, sizes are for uncompressed buffers.
314+
* @param cache Optional: A cache to store and retrieve compressed buffer sizes, avoiding recompression.
315+
* If compression is given, cache should be set as well.
316+
* @return int64_t The total body size in bytes of all columns in the record batch.
305317
*/
306-
[[nodiscard]] int64_t calculate_body_size(const sparrow::record_batch& record_batch, std::optional<CompressionType> compression = std::nullopt);
318+
[[nodiscard]] int64_t calculate_body_size(const sparrow::record_batch& record_batch,
319+
std::optional<CompressionType> compression = std::nullopt,
320+
std::optional<std::reference_wrapper<CompressionCache>> cache = std::nullopt);
307321

308322
/**
309323
* @brief Creates a FlatBuffer message containing a serialized Apache Arrow RecordBatch.
@@ -312,15 +326,20 @@ namespace sparrow_ipc
312326
* along with its metadata (field nodes and buffer information) into a FlatBuffer
313327
* format that conforms to the Arrow IPC specification.
314328
*
315-
* @param record_batch The source record batch containing the data to be serialized
316-
* @param compression Optional: The compression algorithm to be used for the message body
329+
* @param record_batch The source record batch containing the data to be serialized.
330+
* @param compression Optional: The compression algorithm to be used for the message body.
331+
* @param cache Optional: A cache for compressed buffers to avoid recompression if compression is enabled.
332+
* If compression is given, cache should be set as well.
317333
* @return A FlatBufferBuilder containing the complete serialized message ready for
318334
* transmission or storage. The builder is finished and ready to be accessed
319335
* via GetBufferPointer() and GetSize().
336+
* @throws std::invalid_argument if compression is given but not cache.
320337
*
321-
* @note The returned message uses Arrow IPC format version V5
322-
* @note Variadic buffer counts is not currently implemented (set to 0)
338+
* @note The returned message uses Arrow IPC format version V5.
339+
* @note Variadic buffer counts is not currently implemented (set to 0).
323340
*/
324341
[[nodiscard]] flatbuffers::FlatBufferBuilder
325-
get_record_batch_message_builder(const sparrow::record_batch& record_batch, std::optional<CompressionType> compression = std::nullopt);
342+
get_record_batch_message_builder(const sparrow::record_batch& record_batch,
343+
std::optional<CompressionType> compression = std::nullopt,
344+
std::optional<std::reference_wrapper<CompressionCache>> cache = std::nullopt);
326345
}

include/sparrow_ipc/serialize.hpp

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ namespace sparrow_ipc
2727
* @param record_batches Collection of record batches to serialize. All batches must have identical
2828
* schemas.
2929
* @param stream The output stream where the serialized data will be written.
30-
* @param compression The compression type to use when serializing.
31-
*
30+
* @param compression Optional: The compression type to use when serializing.
31+
* @param cache Optional: A cache to store and retrieve compressed buffers, avoiding recompression.
32+
* If compression is given, cache should be set as well.
3233
* @throws std::invalid_argument If record batches have inconsistent schemas or if the collection
3334
* contains batches that cannot be serialized together.
3435
*
@@ -37,7 +38,9 @@ namespace sparrow_ipc
3738
*/
3839
template <std::ranges::input_range R>
3940
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
40-
void serialize_record_batches_to_ipc_stream(const R& record_batches, any_output_stream& stream, std::optional<CompressionType> compression)
41+
void serialize_record_batches_to_ipc_stream(const R& record_batches, any_output_stream& stream,
42+
std::optional<CompressionType> compression,
43+
std::optional<std::reference_wrapper<CompressionCache>> cache)
4144
{
4245
if (record_batches.empty())
4346
{
@@ -53,7 +56,7 @@ namespace sparrow_ipc
5356
serialize_schema_message(record_batches[0], stream);
5457
for (const auto& rb : record_batches)
5558
{
56-
serialize_record_batch(rb, stream, compression);
59+
serialize_record_batch(rb, stream, compression, cache);
5760
}
5861
stream.write(end_of_stream);
5962
}
@@ -70,14 +73,18 @@ namespace sparrow_ipc
7073
*
7174
* @param record_batch The sparrow record batch to serialize
7275
* @param stream The output stream where the serialized record batch will be written
73-
* @param compression The compression type to use when serializing.
74-
*
76+
* @param compression Optional: The compression type to use when serializing.
77+
* @param cache Optional: A cache to store and retrieve compressed buffers, avoiding recompression.
78+
* @note If compression is given, cache should be set as well.
7579
* @note The output follows Arrow IPC message format with proper alignment and
7680
* includes both metadata and data portions of the record batch
7781
*/
7882

7983
SPARROW_IPC_API void
80-
serialize_record_batch(const sparrow::record_batch& record_batch, any_output_stream& stream, std::optional<CompressionType> compression);
84+
serialize_record_batch(const sparrow::record_batch& record_batch,
85+
any_output_stream& stream,
86+
std::optional<CompressionType> compression,
87+
std::optional<std::reference_wrapper<CompressionCache>> cache);
8188

8289
/**
8390
* @brief Serializes a schema message for a record batch into a byte buffer.

include/sparrow_ipc/serialize_utils.hpp

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,16 @@ namespace sparrow_ipc
3939
* The serialization follows the Arrow IPC stream format where each record batch message
4040
* consists of a metadata section followed by a body section containing the actual data.
4141
*
42-
* @param record_batch The sparrow record batch to be serialized
43-
* @param stream The output stream where the serialized record batch will be written
44-
* @param compression The compression type to use when serializing
42+
* @param record_batch The sparrow record batch to be serialized.
43+
* @param stream The output stream where the serialized record batch will be written.
44+
* @param compression Optional: The compression type to use when serializing.
45+
* @param cache Optional: A cache for compressed buffers to avoid recompression if compression is enabled.
46+
* If compression is given, cache should be set as well.
4547
*/
4648
SPARROW_IPC_API void
47-
serialize_record_batch(const sparrow::record_batch& record_batch, any_output_stream& stream, std::optional<CompressionType> compression);
49+
serialize_record_batch(const sparrow::record_batch& record_batch, any_output_stream& stream,
50+
std::optional<CompressionType> compression,
51+
std::optional<std::reference_wrapper<CompressionCache>> cache);
4852

4953
/**
5054
* @brief Calculates the total serialized size of a schema message.
@@ -73,28 +77,36 @@ namespace sparrow_ipc
7377
* - Padding to 8-byte alignment after metadata
7478
* - Body data with 8-byte alignment between buffers
7579
*
76-
* @param record_batch The record batch to be measured
77-
* @param compression The compression type to use when serializing
78-
* @return The total size in bytes that the serialized record batch would occupy
80+
* @param record_batch The record batch to be measured.
81+
* @param compression Optional: The compression type to use when serializing.
82+
* @param cache Optional: A cache to store and retrieve compressed buffer sizes, avoiding recompression.
83+
* If compression is given, cache should be set as well.
84+
* @return The total size in bytes that the serialized record batch would occupy.
7985
*/
8086
[[nodiscard]] SPARROW_IPC_API std::size_t
81-
calculate_record_batch_message_size(const sparrow::record_batch& record_batch, std::optional<CompressionType> compression = std::nullopt);
87+
calculate_record_batch_message_size(const sparrow::record_batch& record_batch,
88+
std::optional<CompressionType> compression = std::nullopt,
89+
std::optional<std::reference_wrapper<CompressionCache>> cache = std::nullopt);
8290

8391
/**
8492
* @brief Calculates the total serialized size for a collection of record batches.
8593
*
8694
* This function computes the complete size that would be produced by serializing
8795
* a schema message followed by all record batch messages in the collection.
8896
*
89-
* @tparam R Range type containing sparrow::record_batch objects
90-
* @param record_batches Collection of record batches to be measured
91-
* @param compression The compression type to use when serializing
92-
* @return The total size in bytes for the complete serialized output
93-
* @throws std::invalid_argument if record batches have inconsistent schemas
97+
* @tparam R Range type containing sparrow::record_batch objects.
98+
* @param record_batches Collection of record batches to be measured.
99+
* @param compression Optional: The compression type to use when serializing.
100+
* @param cache Optional: A cache to store and retrieve compressed buffer sizes, avoiding recompression.
101+
* If compression is given, cache should be set as well.
102+
* @return The total size in bytes for the complete serialized output.
103+
* @throws std::invalid_argument if record batches have inconsistent schemas.
94104
*/
95105
template <std::ranges::input_range R>
96106
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
97-
[[nodiscard]] std::size_t calculate_total_serialized_size(const R& record_batches, std::optional<CompressionType> compression = std::nullopt)
107+
[[nodiscard]] std::size_t calculate_total_serialized_size(const R& record_batches,
108+
std::optional<CompressionType> compression = std::nullopt,
109+
std::optional<std::reference_wrapper<CompressionCache>> cache = std::nullopt)
98110
{
99111
if (record_batches.empty())
100112
{
@@ -113,7 +125,7 @@ namespace sparrow_ipc
113125
// Calculate record batch message sizes
114126
for (const auto& record_batch : record_batches)
115127
{
116-
total_size += calculate_record_batch_message_size(record_batch, compression);
128+
total_size += calculate_record_batch_message_size(record_batch, compression, cache);
117129
}
118130

119131
return total_size;
@@ -131,11 +143,16 @@ namespace sparrow_ipc
131143
* 8-byte boundary, which is typically required for efficient memory access and Arrow
132144
* format compliance.
133145
*
134-
* @param arrow_proxy The arrow proxy containing buffers and potential child proxies to serialize
135-
* @param stream The output stream where the serialized body data will be written
136-
* @param compression The compression type to use when serializing
146+
* @param arrow_proxy The arrow proxy containing buffers and potential child proxies to serialize.
147+
* @param stream The output stream where the serialized body data will be written.
148+
* @param compression Optional: The compression type to use when serializing.
149+
* @param cache Optional: A cache for compressed buffers to avoid recompression if compression is enabled.
150+
* If compression is given, cache should be set as well.
151+
* @throws std::invalid_argument if compression is given but not cache.
137152
*/
138-
SPARROW_IPC_API void fill_body(const sparrow::arrow_proxy& arrow_proxy, any_output_stream& stream, std::optional<CompressionType> compression = std::nullopt);
153+
SPARROW_IPC_API void fill_body(const sparrow::arrow_proxy& arrow_proxy, any_output_stream& stream,
154+
std::optional<CompressionType> compression = std::nullopt,
155+
std::optional<std::reference_wrapper<CompressionCache>> cache = std::nullopt);
139156

140157
/**
141158
* @brief Generates a serialized body from a record batch.
@@ -144,11 +161,15 @@ namespace sparrow_ipc
144161
* extracts their Arrow proxy representations, and serializes them into a
145162
* single byte vector that forms the body of the serialized data.
146163
*
147-
* @param record_batch The record batch containing columns to be serialized
148-
* @param stream The output stream where the serialized body will be written
149-
* @param compression The compression type to use when serializing
164+
* @param record_batch The record batch containing columns to be serialized.
165+
* @param stream The output stream where the serialized body will be written.
166+
* @param compression Optional: The compression type to use when serializing.
167+
* @param cache Optional: A cache for compressed buffers to avoid recompression if compression is enabled.
168+
* If compression is given, cache should be set as well.
150169
*/
151-
SPARROW_IPC_API void generate_body(const sparrow::record_batch& record_batch, any_output_stream& stream, std::optional<CompressionType> compression = std::nullopt);
170+
SPARROW_IPC_API void generate_body(const sparrow::record_batch& record_batch, any_output_stream& stream,
171+
std::optional<CompressionType> compression = std::nullopt,
172+
std::optional<std::reference_wrapper<CompressionCache>> cache = std::nullopt);
152173

153174
SPARROW_IPC_API std::vector<sparrow::data_type> get_column_dtypes(const sparrow::record_batch& rb);
154175
}

0 commit comments

Comments
 (0)