Skip to content

Commit 7fcdea0

Browse files
committed
Add interval array
1 parent 5ad8ebb commit 7fcdea0

File tree

4 files changed

+127
-6
lines changed

4 files changed

+127
-6
lines changed

CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,11 @@ set(SPARROW_IPC_HEADERS
106106
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/arrow_interface/arrow_schema/private_data.hpp
107107
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/chunk_memory_output_stream.hpp
108108
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/chunk_memory_serializer.hpp
109+
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/compression.hpp
109110
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/config.hpp
110111
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/sparrow_ipc_version.hpp
111-
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/compression.hpp
112112
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_fixedsizebinary_array.hpp
113+
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_interval_array.hpp
113114
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_primitive_array.hpp
114115
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_utils.hpp
115116
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#pragma once
2+
3+
#include <optional>
4+
#include <vector>
5+
6+
#include <sparrow/arrow_interface/arrow_array_schema_proxy.hpp>
7+
#include <sparrow/interval_array.hpp>
8+
9+
#include "Message_generated.h"
10+
#include "sparrow_ipc/arrow_interface/arrow_array.hpp"
11+
#include "sparrow_ipc/arrow_interface/arrow_schema.hpp"
12+
#include "sparrow_ipc/deserialize_utils.hpp"
13+
14+
namespace sparrow_ipc
15+
{
16+
template <typename T>
17+
[[nodiscard]] sparrow::interval_array<T> deserialize_non_owning_interval_array(
18+
const org::apache::arrow::flatbuf::RecordBatch& record_batch,
19+
std::span<const uint8_t> body,
20+
std::string_view name,
21+
const std::optional<std::vector<sparrow::metadata_pair>>& metadata,
22+
size_t& buffer_index
23+
)
24+
{
25+
const std::string_view format = data_type_to_format(
26+
sparrow::detail::get_data_type_from_array<sparrow::interval_array<T>>::get()
27+
);
28+
ArrowSchema schema = make_non_owning_arrow_schema(
29+
format,
30+
name.data(),
31+
metadata,
32+
std::nullopt,
33+
0,
34+
nullptr,
35+
nullptr
36+
);
37+
38+
const auto compression = record_batch.compression();
39+
std::vector<arrow_array_private_data::optionally_owned_buffer> buffers;
40+
41+
auto validity_buffer_span = utils::get_buffer(record_batch, body, buffer_index);
42+
auto data_buffer_span = utils::get_buffer(record_batch, body, buffer_index);
43+
44+
if (compression)
45+
{
46+
buffers.push_back(utils::get_decompressed_buffer(validity_buffer_span, compression));
47+
buffers.push_back(utils::get_decompressed_buffer(data_buffer_span, compression));
48+
}
49+
else
50+
{
51+
buffers.emplace_back(validity_buffer_span);
52+
buffers.emplace_back(data_buffer_span);
53+
}
54+
55+
// TODO bitmap_ptr is not used anymore... Leave it for now, and remove later if no need confirmed
56+
const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count(validity_buffer_span, record_batch.length());
57+
58+
ArrowArray array = make_arrow_array<arrow_array_private_data>(
59+
record_batch.length(),
60+
null_count,
61+
0,
62+
0,
63+
nullptr,
64+
nullptr,
65+
std::move(buffers)
66+
);
67+
68+
sparrow::arrow_proxy ap{std::move(array), std::move(schema)};
69+
return sparrow::interval_array<T>{std::move(ap)};
70+
}
71+
}

src/deserialize.cpp

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <sparrow/types/data_type.hpp>
44

55
#include "sparrow_ipc/deserialize_fixedsizebinary_array.hpp"
6+
#include "sparrow_ipc/deserialize_interval_array.hpp"
67
#include "sparrow_ipc/deserialize_primitive_array.hpp"
78
#include "sparrow_ipc/deserialize_variable_size_binary_array.hpp"
89
#include "sparrow_ipc/magic_values.hpp"
@@ -191,6 +192,50 @@ namespace sparrow_ipc
191192
)
192193
);
193194
break;
195+
case org::apache::arrow::flatbuf::Type::Interval:
196+
{
197+
const auto interval_type = field->type_as_Interval();
198+
org::apache::arrow::flatbuf::IntervalUnit interval_unit = interval_type->unit();
199+
switch (interval_unit)
200+
{
201+
case org::apache::arrow::flatbuf::IntervalUnit::YEAR_MONTH:
202+
arrays.emplace_back(
203+
deserialize_non_owning_interval_array<sparrow::chrono::months>(
204+
record_batch,
205+
encapsulated_message.body(),
206+
name,
207+
metadata,
208+
buffer_index
209+
)
210+
);
211+
break;
212+
case org::apache::arrow::flatbuf::IntervalUnit::DAY_TIME:
213+
arrays.emplace_back(
214+
deserialize_non_owning_interval_array<sparrow::days_time_interval>(
215+
record_batch,
216+
encapsulated_message.body(),
217+
name,
218+
metadata,
219+
buffer_index
220+
)
221+
);
222+
break;
223+
case org::apache::arrow::flatbuf::IntervalUnit::MONTH_DAY_NANO:
224+
arrays.emplace_back(
225+
deserialize_non_owning_interval_array<sparrow::month_day_nanoseconds_interval>(
226+
record_batch,
227+
encapsulated_message.body(),
228+
name,
229+
metadata,
230+
buffer_index
231+
)
232+
);
233+
break;
234+
default:
235+
throw std::runtime_error("Unsupported interval unit.");
236+
}
237+
}
238+
break;
194239
default:
195240
throw std::runtime_error("Unsupported type.");
196241
}
@@ -208,7 +253,8 @@ namespace sparrow_ipc
208253
std::vector<std::optional<std::vector<sparrow::metadata_pair>>> fields_metadata;
209254
do
210255
{
211-
// Check for end-of-stream marker here as data could contain only that (if no record batches present/written)
256+
// Check for end-of-stream marker here as data could contain only that (if no record batches
257+
// present/written)
212258
if (data.size() >= 8 && is_end_of_stream(data.subspan(0, 8)))
213259
{
214260
break;
@@ -234,11 +280,12 @@ namespace sparrow_ipc
234280

235281
for (const auto field : *(schema->fields()))
236282
{
237-
if(field != nullptr && field->name() != nullptr)
283+
if (field != nullptr && field->name() != nullptr)
238284
{
239-
field_names.emplace_back(field->name()->str());
285+
field_names.emplace_back(field->name()->str());
240286
}
241-
else {
287+
else
288+
{
242289
field_names.emplace_back("_unnamed_");
243290
}
244291
fields_nullable.push_back(field->nullable());
@@ -269,7 +316,8 @@ namespace sparrow_ipc
269316
encapsulated_message,
270317
fields_metadata
271318
);
272-
auto names_copy = field_names; // TODO: Remove when issue with the to_vector of record_batch is fixed
319+
auto names_copy = field_names; // TODO: Remove when issue with the to_vector of
320+
// record_batch is fixed
273321
sparrow::record_batch sp_record_batch(std::move(names_copy), std::move(arrays));
274322
record_batches.emplace_back(std::move(sp_record_batch));
275323
}

tests/test_de_serialization_with_files.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const std::vector<std::filesystem::path> files_paths_to_test = {
3333
tests_resources_files_path / "generated_large_binary",
3434
tests_resources_files_path / "generated_binary_zerolength",
3535
tests_resources_files_path / "generated_binary_no_batches",
36+
tests_resources_files_path / "generated_interval",
3637
};
3738

3839
const std::vector<std::filesystem::path> files_paths_to_test_with_compression = {

0 commit comments

Comments
 (0)