Skip to content

Commit 93994d3

Browse files
committed
Add interval array
1 parent f630a81 commit 93994d3

File tree

4 files changed

+127
-6
lines changed

4 files changed

+127
-6
lines changed

CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,11 @@ set(SPARROW_IPC_HEADERS
120120
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/arrow_interface/arrow_schema/private_data.hpp
121121
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/chunk_memory_output_stream.hpp
122122
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/chunk_memory_serializer.hpp
123+
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/compression.hpp
123124
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/config.hpp
124125
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/sparrow_ipc_version.hpp
125-
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/compression.hpp
126126
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_fixedsizebinary_array.hpp
127+
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_interval_array.hpp
127128
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_primitive_array.hpp
128129
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_utils.hpp
129130
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#pragma once
2+
3+
#include <optional>
4+
#include <vector>
5+
6+
#include <sparrow/arrow_interface/arrow_array_schema_proxy.hpp>
7+
#include <sparrow/interval_array.hpp>
8+
9+
#include "Message_generated.h"
10+
#include "sparrow_ipc/arrow_interface/arrow_array.hpp"
11+
#include "sparrow_ipc/arrow_interface/arrow_schema.hpp"
12+
#include "sparrow_ipc/deserialize_utils.hpp"
13+
14+
namespace sparrow_ipc
15+
{
16+
template <typename T>
17+
[[nodiscard]] sparrow::interval_array<T> deserialize_non_owning_interval_array(
18+
const org::apache::arrow::flatbuf::RecordBatch& record_batch,
19+
std::span<const uint8_t> body,
20+
std::string_view name,
21+
const std::optional<std::vector<sparrow::metadata_pair>>& metadata,
22+
size_t& buffer_index
23+
)
24+
{
25+
const std::string_view format = data_type_to_format(
26+
sparrow::detail::get_data_type_from_array<sparrow::interval_array<T>>::get()
27+
);
28+
ArrowSchema schema = make_non_owning_arrow_schema(
29+
format,
30+
name.data(),
31+
metadata,
32+
std::nullopt,
33+
0,
34+
nullptr,
35+
nullptr
36+
);
37+
38+
const auto compression = record_batch.compression();
39+
std::vector<arrow_array_private_data::optionally_owned_buffer> buffers;
40+
41+
auto validity_buffer_span = utils::get_buffer(record_batch, body, buffer_index);
42+
auto data_buffer_span = utils::get_buffer(record_batch, body, buffer_index);
43+
44+
if (compression)
45+
{
46+
buffers.push_back(utils::get_decompressed_buffer(validity_buffer_span, compression));
47+
buffers.push_back(utils::get_decompressed_buffer(data_buffer_span, compression));
48+
}
49+
else
50+
{
51+
buffers.emplace_back(validity_buffer_span);
52+
buffers.emplace_back(data_buffer_span);
53+
}
54+
55+
// TODO bitmap_ptr is not used anymore... Leave it for now, and remove later if no need confirmed
56+
const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count(validity_buffer_span, record_batch.length());
57+
58+
ArrowArray array = make_arrow_array<arrow_array_private_data>(
59+
record_batch.length(),
60+
null_count,
61+
0,
62+
0,
63+
nullptr,
64+
nullptr,
65+
std::move(buffers)
66+
);
67+
68+
sparrow::arrow_proxy ap{std::move(array), std::move(schema)};
69+
return sparrow::interval_array<T>{std::move(ap)};
70+
}
71+
}

src/deserialize.cpp

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <sparrow/types/data_type.hpp>
44

55
#include "sparrow_ipc/deserialize_fixedsizebinary_array.hpp"
6+
#include "sparrow_ipc/deserialize_interval_array.hpp"
67
#include "sparrow_ipc/deserialize_primitive_array.hpp"
78
#include "sparrow_ipc/deserialize_variable_size_binary_array.hpp"
89
#include "sparrow_ipc/encapsulated_message.hpp"
@@ -200,6 +201,50 @@ namespace sparrow_ipc
200201
)
201202
);
202203
break;
204+
case org::apache::arrow::flatbuf::Type::Interval:
205+
{
206+
const auto interval_type = field->type_as_Interval();
207+
org::apache::arrow::flatbuf::IntervalUnit interval_unit = interval_type->unit();
208+
switch (interval_unit)
209+
{
210+
case org::apache::arrow::flatbuf::IntervalUnit::YEAR_MONTH:
211+
arrays.emplace_back(
212+
deserialize_non_owning_interval_array<sparrow::chrono::months>(
213+
record_batch,
214+
encapsulated_message.body(),
215+
name,
216+
metadata,
217+
buffer_index
218+
)
219+
);
220+
break;
221+
case org::apache::arrow::flatbuf::IntervalUnit::DAY_TIME:
222+
arrays.emplace_back(
223+
deserialize_non_owning_interval_array<sparrow::days_time_interval>(
224+
record_batch,
225+
encapsulated_message.body(),
226+
name,
227+
metadata,
228+
buffer_index
229+
)
230+
);
231+
break;
232+
case org::apache::arrow::flatbuf::IntervalUnit::MONTH_DAY_NANO:
233+
arrays.emplace_back(
234+
deserialize_non_owning_interval_array<sparrow::month_day_nanoseconds_interval>(
235+
record_batch,
236+
encapsulated_message.body(),
237+
name,
238+
metadata,
239+
buffer_index
240+
)
241+
);
242+
break;
243+
default:
244+
throw std::runtime_error("Unsupported interval unit.");
245+
}
246+
}
247+
break;
203248
default:
204249
throw std::runtime_error("Unsupported type.");
205250
}
@@ -217,7 +262,8 @@ namespace sparrow_ipc
217262
std::vector<std::optional<std::vector<sparrow::metadata_pair>>> fields_metadata;
218263
do
219264
{
220-
// Check for end-of-stream marker here as data could contain only that (if no record batches present/written)
265+
// Check for end-of-stream marker here as data could contain only that (if no record batches
266+
// present/written)
221267
if (data.size() >= 8 && is_end_of_stream(data.subspan(0, 8)))
222268
{
223269
break;
@@ -243,11 +289,12 @@ namespace sparrow_ipc
243289

244290
for (const auto field : *(schema->fields()))
245291
{
246-
if(field != nullptr && field->name() != nullptr)
292+
if (field != nullptr && field->name() != nullptr)
247293
{
248-
field_names.emplace_back(field->name()->str());
294+
field_names.emplace_back(field->name()->str());
249295
}
250-
else {
296+
else
297+
{
251298
field_names.emplace_back("_unnamed_");
252299
}
253300
fields_nullable.push_back(field->nullable());
@@ -278,7 +325,8 @@ namespace sparrow_ipc
278325
encapsulated_message,
279326
fields_metadata
280327
);
281-
auto names_copy = field_names; // TODO: Remove when issue with the to_vector of record_batch is fixed
328+
auto names_copy = field_names; // TODO: Remove when issue with the to_vector of
329+
// record_batch is fixed
282330
sparrow::record_batch sp_record_batch(std::move(names_copy), std::move(arrays));
283331
record_batches.emplace_back(std::move(sp_record_batch));
284332
}

tests/test_de_serialization_with_files.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const std::vector<std::filesystem::path> files_paths_to_test = {
3333
tests_resources_files_path / "generated_large_binary",
3434
tests_resources_files_path / "generated_binary_zerolength",
3535
tests_resources_files_path / "generated_binary_no_batches",
36+
tests_resources_files_path / "generated_interval",
3637
};
3738

3839
const std::vector<std::filesystem::path> files_paths_to_test_with_lz4_compression = {

0 commit comments

Comments
 (0)