From 7d9457484d94d926861b13bf24677ddd77887fb7 Mon Sep 17 00:00:00 2001 From: Jim Cipar Date: Mon, 13 May 2024 14:13:34 -0400 Subject: [PATCH 1/2] datalake: convert protobuf messages to parquet files Introduces a protobuf_to_arrow converter that accepts messages in protobuf format, parses them, and adds them to an Arrow table. The Arrow table can be used to write a Parquet file. --- install-dependencies.sh | 14 +- licenses/third_party.md | 1 + src/v/CMakeLists.txt | 1 + src/v/README.md | 1 + src/v/datalake/CMakeLists.txt | 20 + src/v/datalake/errors.h | 32 ++ src/v/datalake/logger.h | 19 + src/v/datalake/proto_to_arrow_interface.h | 71 ++++ src/v/datalake/proto_to_arrow_scalar.cc | 99 +++++ src/v/datalake/proto_to_arrow_scalar.h | 65 +++ src/v/datalake/proto_to_arrow_struct.cc | 167 ++++++++ src/v/datalake/proto_to_arrow_struct.h | 53 +++ src/v/datalake/protobuf_to_arrow_converter.cc | 167 ++++++++ src/v/datalake/protobuf_to_arrow_converter.h | 80 ++++ src/v/datalake/tests/CMakeLists.txt | 13 + src/v/datalake/tests/proto_to_arrow_gtest.cc | 186 +++++++++ .../tests/proto_to_arrow_test_utils.h | 373 ++++++++++++++++++ 17 files changed, 1360 insertions(+), 2 deletions(-) create mode 100644 src/v/datalake/CMakeLists.txt create mode 100644 src/v/datalake/errors.h create mode 100644 src/v/datalake/logger.h create mode 100644 src/v/datalake/proto_to_arrow_interface.h create mode 100644 src/v/datalake/proto_to_arrow_scalar.cc create mode 100644 src/v/datalake/proto_to_arrow_scalar.h create mode 100644 src/v/datalake/proto_to_arrow_struct.cc create mode 100644 src/v/datalake/proto_to_arrow_struct.h create mode 100644 src/v/datalake/protobuf_to_arrow_converter.cc create mode 100644 src/v/datalake/protobuf_to_arrow_converter.h create mode 100644 src/v/datalake/tests/CMakeLists.txt create mode 100644 src/v/datalake/tests/proto_to_arrow_gtest.cc create mode 100644 src/v/datalake/tests/proto_to_arrow_test_utils.h diff --git a/install-dependencies.sh b/install-dependencies.sh index 432572dd74abc..1bdd3309b117c 100755 --- a/install-dependencies.sh +++ b/install-dependencies.sh @@ -32,11 +32,13 @@ deb_deps=( cmake git golang + libarrow-dev libboost-all-dev libc-ares-dev libgssapi-krb5-2 libkrb5-dev liblz4-dev + libparquet-dev libprotobuf-dev libprotoc-dev libre2-dev @@ -71,6 +73,7 @@ fedora_deps=( golang hwloc-devel krb5-devel + libarrow-devel libxml2-devel libzstd-devel lksctp-tools-devel @@ -82,6 +85,7 @@ fedora_deps=( numactl-devel openssl openssl-devel + parquet-libs-devel procps protobuf-devel python3 @@ -124,8 +128,14 @@ arch_deps=( case "$ID" in ubuntu | debian | pop) - apt-get update - DEBIAN_FRONTEND=noninteractive apt-get install -y "${deb_deps[@]}" + export DEBIAN_FRONTEND=noninteractive + apt update + apt install -y -V ca-certificates lsb-release wget + wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + apt install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + rm apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + apt update + apt-get install -y "${deb_deps[@]}" if [[ $CLEAN_PKG_CACHE == true ]]; then rm -rf /var/lib/apt/lists/* fi diff --git a/licenses/third_party.md b/licenses/third_party.md index 7b9672fc419fe..db407b630c112 100644 --- a/licenses/third_party.md +++ b/licenses/third_party.md @@ -9,6 +9,7 @@ please keep this up to date with every new library use. | :---------- | :------------ | | abseil | Apache License 2 | | ada | Apache License 2 / MIT | +| arrow | Apache License 2 / MIT / Boost / BSD 2 & 3 clause / ZPL / LLVM / | | avro | Apache License 2 | | base64 | BSD 2 | | boost libraries | Boost Software License Version 1.0 | diff --git a/src/v/CMakeLists.txt b/src/v/CMakeLists.txt index 1260e825abf95..80d68a43605f7 100644 --- a/src/v/CMakeLists.txt +++ b/src/v/CMakeLists.txt @@ -129,6 +129,7 @@ add_subdirectory(compat) add_subdirectory(rp_util) add_subdirectory(resource_mgmt) add_subdirectory(migrations) +add_subdirectory(datalake) option(ENABLE_GIT_VERSION "Build with Git metadata" OFF) diff --git a/src/v/README.md b/src/v/README.md index 586bbfdde00e2..7bd4433c1b476 100644 --- a/src/v/README.md +++ b/src/v/README.md @@ -14,6 +14,7 @@ compression | utilities supporting compression/decompression of many config | Redpanda cluster level and node level configuration options | container | Generic Redpanda specific containers and data structures | crypto | Middleware library used to perform cryptographic operations | +datalake | Writing Redpanda data to Iceberg | features | Cluster feature flags for rolling upgrades | finjector | Failure injector framework for testing and correctness | hashing | hashing utility adaptors often used in cryptography or checksumming | diff --git a/src/v/datalake/CMakeLists.txt b/src/v/datalake/CMakeLists.txt new file mode 100644 index 0000000000000..6c55ac91dc8dd --- /dev/null +++ b/src/v/datalake/CMakeLists.txt @@ -0,0 +1,20 @@ +find_package(Arrow REQUIRED) +find_package(Parquet REQUIRED) +find_package(Protobuf REQUIRED) + + +v_cc_library( + NAME datalake + SRCS + protobuf_to_arrow_converter.cc + proto_to_arrow_scalar.cc + proto_to_arrow_struct.cc + DEPS + v::storage + Seastar::seastar + Arrow::arrow_shared + Parquet::parquet_shared + protobuf::libprotobuf +) + +add_subdirectory(tests) diff --git a/src/v/datalake/errors.h b/src/v/datalake/errors.h new file mode 100644 index 0000000000000..2a67ceec281a7 --- /dev/null +++ b/src/v/datalake/errors.h @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include +namespace datalake { + +// TODO: Make an std::error_category instance for this +enum class arrow_converter_status { + ok, + + // User errors + parse_error, + + // System Errors + internal_error, +}; + +class initialization_error : public std::runtime_error { +public: + explicit initialization_error(const std::string& what_arg) + : std::runtime_error(what_arg) {} +}; + +} // namespace datalake diff --git a/src/v/datalake/logger.h b/src/v/datalake/logger.h new file mode 100644 index 0000000000000..f3d23fc007a6e --- /dev/null +++ b/src/v/datalake/logger.h @@ -0,0 +1,19 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#pragma once + +#include "base/seastarx.h" + +#include + +namespace datalake { +inline ss::logger datalake_log("datalake"); +} // namespace datalake diff --git a/src/v/datalake/proto_to_arrow_interface.h b/src/v/datalake/proto_to_arrow_interface.h new file mode 100644 index 0000000000000..23c9df37c84d2 --- /dev/null +++ b/src/v/datalake/proto_to_arrow_interface.h @@ -0,0 +1,71 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include +#include +#include + +#include +#include + +namespace google::protobuf { +class Message; +} + +namespace datalake::detail { + +// This interface is used to convert a set of Protobuf messages to an Arrow +// Array. Proto messages are passed one-by-one along to this interface, and it +// builds internal state representing the Arrow array. Finally, the `finish` +// method returns the completed array. +class proto_to_arrow_interface { +public: + proto_to_arrow_interface(const proto_to_arrow_interface&) = delete; + proto_to_arrow_interface(proto_to_arrow_interface&&) = delete; + proto_to_arrow_interface& operator=(const proto_to_arrow_interface&) + = delete; + proto_to_arrow_interface& operator=(proto_to_arrow_interface&&) = delete; + + proto_to_arrow_interface() = default; + virtual ~proto_to_arrow_interface() = default; + + // Called on a struct message to parse an individual child field. + // We expect the given child field to match the type of the column + // represented by this converter. E.g. a proto_to_arrow_scalar would + // expect the column referred to by field_idx to be an int32 column. + virtual arrow::Status + add_child_value(const google::protobuf::Message*, int field_idx) + = 0; + + /// Return an Arrow field descriptor for this Array. Used for building + /// A schema. + // The Arrow API is built around shared_ptr: the creation functions return + // shared pointers, and other expect them as arguments. + // TODO: investigate if we can change the shared_ptr type in Arrow to use + // ss::shared_ptr + virtual std::shared_ptr field(const std::string& name) = 0; + + /// Return the underlying ArrayBuilder. Used when this is a child of another + /// Builder + virtual std::shared_ptr builder() = 0; + + // Methods with defaults + virtual arrow::Status finish_batch() { return arrow::Status::OK(); } + std::shared_ptr finish() { + return std::make_shared(std::move(_values)); + } + +protected: + arrow::Status _arrow_status; + arrow::ArrayVector _values; +}; + +} // namespace datalake::detail diff --git a/src/v/datalake/proto_to_arrow_scalar.cc b/src/v/datalake/proto_to_arrow_scalar.cc new file mode 100644 index 0000000000000..7dc48d407da36 --- /dev/null +++ b/src/v/datalake/proto_to_arrow_scalar.cc @@ -0,0 +1,99 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#include "datalake/proto_to_arrow_scalar.h" + +#include "datalake/logger.h" + +#include +#include +#include + +#include +#include + +namespace datalake::detail { +template<> +void proto_to_arrow_scalar::do_add( + const google::protobuf::Message* msg, int field_idx) { + auto* desc = msg->GetDescriptor()->field(field_idx); + if (!desc) { + datalake_log.error("Invalid protobuf field index"); + _arrow_status = arrow::Status::Invalid("Invalid protobuf field index"); + return; + } + _arrow_status = _builder->Append( + msg->GetReflection()->GetInt32(*msg, desc)); +} + +template<> +void proto_to_arrow_scalar::do_add( + const google::protobuf::Message* msg, int field_idx) { + auto* desc = msg->GetDescriptor()->field(field_idx); + if (!desc) { + datalake_log.error("Invalid protobuf field index"); + _arrow_status = arrow::Status::Invalid("Invalid protobuf field index"); + return; + } + _arrow_status = _builder->Append( + msg->GetReflection()->GetInt64(*msg, desc)); +} + +template<> +void proto_to_arrow_scalar::do_add( + const google::protobuf::Message* msg, int field_idx) { + auto* desc = msg->GetDescriptor()->field(field_idx); + if (!desc) { + datalake_log.error("Invalid protobuf field index"); + _arrow_status = arrow::Status::Invalid("Invalid protobuf field index"); + return; + } + _arrow_status = _builder->Append(msg->GetReflection()->GetBool(*msg, desc)); +} + +template<> +void proto_to_arrow_scalar::do_add( + const google::protobuf::Message* msg, int field_idx) { + auto* desc = msg->GetDescriptor()->field(field_idx); + if (!desc) { + datalake_log.error("Invalid protobuf field index"); + _arrow_status = arrow::Status::Invalid("Invalid protobuf field index"); + return; + } + _arrow_status = _builder->Append( + msg->GetReflection()->GetFloat(*msg, desc)); +} + +template<> +void proto_to_arrow_scalar::do_add( + const google::protobuf::Message* msg, int field_idx) { + auto* desc = msg->GetDescriptor()->field(field_idx); + if (!desc) { + datalake_log.error("Invalid protobuf field index"); + _arrow_status = arrow::Status::Invalid("Invalid protobuf field index"); + return; + } + _arrow_status = _builder->Append( + msg->GetReflection()->GetDouble(*msg, desc)); +} + +template<> +void proto_to_arrow_scalar::do_add( + const google::protobuf::Message* msg, int field_idx) { + auto* desc = msg->GetDescriptor()->field(field_idx); + if (!desc) { + datalake_log.error("Invalid protobuf field index"); + _arrow_status = arrow::Status::Invalid("Invalid protobuf field index"); + return; + } + _arrow_status = _builder->Append( + msg->GetReflection()->GetString(*msg, desc)); +} + +} // namespace datalake::detail diff --git a/src/v/datalake/proto_to_arrow_scalar.h b/src/v/datalake/proto_to_arrow_scalar.h new file mode 100644 index 0000000000000..42097b2f309b8 --- /dev/null +++ b/src/v/datalake/proto_to_arrow_scalar.h @@ -0,0 +1,65 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "datalake/proto_to_arrow_interface.h" + +#include + +namespace datalake::detail { + +template +class proto_to_arrow_scalar : public proto_to_arrow_interface { + using BuilderType = arrow::TypeTraits::BuilderType; + +public: + proto_to_arrow_scalar() + : _builder(std::make_shared()) {} + + arrow::Status add_child_value( + const google::protobuf::Message* msg, int field_idx) override { + if (!_arrow_status.ok()) { + return _arrow_status; + } + do_add(msg, field_idx); + return _arrow_status; + } + + arrow::Status finish_batch() override { + if (!_arrow_status.ok()) { + return _arrow_status; + } + auto builder_result = _builder->Finish(); + _arrow_status = builder_result.status(); + std::shared_ptr array; + if (!_arrow_status.ok()) { + return _arrow_status; + } + + // Safe because we validated the status after calling `Finish` + array = std::move(builder_result).ValueUnsafe(); + _values.push_back(array); + return _arrow_status; + } + + std::shared_ptr field(const std::string& name) override { + return arrow::field( + name, arrow::TypeTraits::type_singleton()); + } + + std::shared_ptr builder() override { return _builder; } + +private: + void do_add(const google::protobuf::Message* msg, int field_idx); + + std::shared_ptr _builder; +}; + +} // namespace datalake::detail diff --git a/src/v/datalake/proto_to_arrow_struct.cc b/src/v/datalake/proto_to_arrow_struct.cc new file mode 100644 index 0000000000000..cf46430f4bac5 --- /dev/null +++ b/src/v/datalake/proto_to_arrow_struct.cc @@ -0,0 +1,167 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#include "datalake/proto_to_arrow_struct.h" + +#include "datalake/logger.h" + +#include +#include +#include +#include + +arrow::Status datalake::detail::proto_to_arrow_struct::finish_batch() { + if (!_arrow_status.ok()) { + return _arrow_status; + } + arrow::Result> builder_result + = _builder->Finish(); + + _arrow_status = builder_result.status(); + if (!_arrow_status.ok()) { + return _arrow_status; + } + + // Safe because we validated the status after calling `Finish` + _values.push_back(std::move(builder_result).ValueUnsafe()); + + return _arrow_status; +} +arrow::Status datalake::detail::proto_to_arrow_struct::add_struct_message( + const google::protobuf::Message* msg) { + if (!_arrow_status.ok()) { + return _arrow_status; + } + for (size_t child_idx = 0; child_idx < _child_converters.size(); + child_idx++) { + _arrow_status = _child_converters[child_idx]->add_child_value( + msg, static_cast(child_idx)); + if (!_arrow_status.ok()) { + return _arrow_status; + } + } + _arrow_status = _builder->Append(); + + return _arrow_status; +} +arrow::Status datalake::detail::proto_to_arrow_struct::add_child_value( + const google::protobuf::Message* msg, int field_idx) { + if (!_arrow_status.ok()) { + return _arrow_status; + } + auto* desc = msg->GetDescriptor()->field(field_idx); + if (!desc) { + datalake_log.error("Invalid protobuf field index"); + _arrow_status = arrow::Status::Invalid("Invalid protobuf field index"); + return _arrow_status; + } + auto child_message = &msg->GetReflection()->GetMessage(*msg, desc); + return add_struct_message(child_message); +} +datalake::detail::proto_to_arrow_struct::proto_to_arrow_struct( + const google::protobuf::Descriptor* message_descriptor) { + using namespace detail; + namespace pb = google::protobuf; + + // Set up child arrays + _child_converters.reserve(message_descriptor->field_count()); + for (int field_idx = 0; field_idx < message_descriptor->field_count(); + field_idx++) { + auto* desc = message_descriptor->field(field_idx); + if (!desc) { + datalake_log.error("Invalid protobuf field index"); + _arrow_status = arrow::Status::Invalid( + "Invalid protobuf field index"); + throw datalake::initialization_error( + "Invalid protobuf field index"); + } + switch (desc->cpp_type()) { + case pb::FieldDescriptor::CPPTYPE_INT32: + _child_converters.push_back( + std::make_unique>()); + break; + case pb::FieldDescriptor::CPPTYPE_INT64: + _child_converters.push_back( + std::make_unique>()); + break; + case pb::FieldDescriptor::CPPTYPE_BOOL: + _child_converters.push_back( + std::make_unique>()); + break; + case pb::FieldDescriptor::CPPTYPE_FLOAT: + _child_converters.push_back( + std::make_unique>()); + break; + case pb::FieldDescriptor::CPPTYPE_DOUBLE: + _child_converters.push_back( + std::make_unique>()); + break; + case pb::FieldDescriptor::CPPTYPE_STRING: + _child_converters.push_back( + std::make_unique>()); + break; + case pb::FieldDescriptor::CPPTYPE_MESSAGE: { + auto field_message_descriptor = desc->message_type(); + if (field_message_descriptor == nullptr) { + _arrow_status = arrow::Status::Invalid( + "Invalid protobuf field index"); + throw datalake::initialization_error(fmt::format( + "Can't find schema for nested type : {}", + desc->cpp_type_name())); + } + _child_converters.push_back(std::make_unique( + field_message_descriptor)); + } break; + case pb::FieldDescriptor::CPPTYPE_ENUM: // TODO + case pb::FieldDescriptor::CPPTYPE_UINT32: // not supported by Iceberg + case pb::FieldDescriptor::CPPTYPE_UINT64: // not supported by Iceberg + throw datalake::initialization_error( + fmt::format("Unknown type: {}", desc->cpp_type_name())); + } + } + // Make Arrow data types + + // This could be combined into a single loop with the one above, splitting + // them out for readability. + _fields.reserve(message_descriptor->field_count()); + for (int field_idx = 0; field_idx < message_descriptor->field_count(); + field_idx++) { + auto* field_desc = message_descriptor->field(field_idx); + if (!field_desc) { + datalake_log.error("Invalid protobuf field index"); + _arrow_status = arrow::Status::Invalid( + "Invalid protobuf field index"); + throw datalake::initialization_error( + "Invalid protobuf field index"); + } + _fields.push_back( + _child_converters[field_idx]->field(field_desc->name())); + } + _arrow_data_type = arrow::struct_(_fields); + + // Make builder + std::vector> child_builders; + child_builders.reserve(_child_converters.size()); + for (auto& child : _child_converters) { + child_builders.push_back(child->builder()); + } + _builder = std::make_shared( + _arrow_data_type, arrow::default_memory_pool(), child_builders); +} +std::shared_ptr +datalake::detail::proto_to_arrow_struct::builder() { + return _builder; +} +std::shared_ptr +datalake::detail::proto_to_arrow_struct::field(const std::string& name) { + return arrow::field(name, _arrow_data_type); +} +arrow::FieldVector datalake::detail::proto_to_arrow_struct::get_field_vector() { + return _fields; +} diff --git a/src/v/datalake/proto_to_arrow_struct.h b/src/v/datalake/proto_to_arrow_struct.h new file mode 100644 index 0000000000000..caaac0c9e0871 --- /dev/null +++ b/src/v/datalake/proto_to_arrow_struct.h @@ -0,0 +1,53 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "datalake/errors.h" +#include "datalake/proto_to_arrow_interface.h" +#include "datalake/proto_to_arrow_scalar.h" + +#include +#include + +namespace google::protobuf { +class Descriptor; +} + +namespace datalake::detail { + +class proto_to_arrow_struct : public proto_to_arrow_interface { +public: + explicit proto_to_arrow_struct( + const google::protobuf::Descriptor* message_descriptor); + + // Because this is a struct builder, we expect the child referred to by + // field_idx to be a struct itself. + arrow::Status add_child_value( + const google::protobuf::Message* msg, int field_idx) override; + + // Adds the message pointed to by msg. Unlike add_child_value, this adds + // the message itself, and not a child of the message. + arrow::Status add_struct_message(const google::protobuf::Message* msg); + + arrow::Status finish_batch() override; + + std::shared_ptr field(const std::string& name) override; + std::shared_ptr builder() override; + + arrow::FieldVector get_field_vector(); + +private: + std::vector> _child_converters; + std::shared_ptr _arrow_data_type; + std::shared_ptr _builder; + arrow::FieldVector _fields; +}; + +} // namespace datalake::detail diff --git a/src/v/datalake/protobuf_to_arrow_converter.cc b/src/v/datalake/protobuf_to_arrow_converter.cc new file mode 100644 index 0000000000000..ad13807badda8 --- /dev/null +++ b/src/v/datalake/protobuf_to_arrow_converter.cc @@ -0,0 +1,167 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#include "datalake/protobuf_to_arrow_converter.h" + +#include "datalake/errors.h" +#include "datalake/logger.h" +#include "datalake/proto_to_arrow_struct.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace datalake { + +proto_to_arrow_converter::proto_to_arrow_converter(const ss::sstring& schema) { + // TODO: Most of the complex logic should go in a factory method. + initialize_protobuf_schema(schema); + if (!initialize_struct_converter()) { + throw initialization_error("Could not initialize arrow arrays"); + } +} +arrow_converter_status +proto_to_arrow_converter::add_message(const ss::sstring& serialized_message) { + std::unique_ptr message = parse_message( + serialized_message); + if (message == nullptr) { + return arrow_converter_status::parse_error; + } + + if (!_struct_converter->add_struct_message(message.get()).ok()) { + return arrow_converter_status::internal_error; + } + return arrow_converter_status::ok; +} +arrow_converter_status proto_to_arrow_converter::finish_batch() { + if (!_struct_converter->finish_batch().ok()) { + return arrow_converter_status::internal_error; + } + return arrow_converter_status::ok; +} +std::shared_ptr proto_to_arrow_converter::build_table() { + auto table_result = arrow::Table::FromChunkedStructArray( + _struct_converter->finish()); + if (table_result.ok()) { + return table_result.ValueUnsafe(); + } else { + return nullptr; + } +} + +std::shared_ptr proto_to_arrow_converter::build_schema() { + return arrow::schema(_struct_converter->get_field_vector()); +} + +void proto_to_arrow_converter::initialize_protobuf_schema( + const ss::sstring& schema) { + google::protobuf::io::ArrayInputStream proto_input_stream( + schema.c_str(), static_cast(schema.size())); + google::protobuf::io::Tokenizer tokenizer( + &proto_input_stream, &error_collector); + + google::protobuf::compiler::Parser parser; + parser.RecordErrorsTo(&error_collector); + if (!parser.Parse(&tokenizer, &_file_descriptor_proto)) { + throw initialization_error("Could not parse protobuf schema"); + } + + if (!_file_descriptor_proto.has_name()) { + _file_descriptor_proto.set_name("DefaultMessageName"); + } + + _file_desc = _protobuf_descriptor_pool.BuildFile(_file_descriptor_proto); + if (_file_desc == nullptr) { + throw initialization_error("Could not build descriptor pool"); + } +} +bool proto_to_arrow_converter::initialize_struct_converter() { + const google::protobuf::Descriptor* message_desc = message_descriptor(); + if (message_desc == nullptr) { + return false; + } + + _struct_converter = std::make_unique( + message_desc); + + return true; +} +std::unique_ptr +proto_to_arrow_converter::parse_message(const ss::sstring& message) { + // Get the message descriptor + const google::protobuf::Descriptor* message_desc = message_descriptor(); + if (message_desc == nullptr) { + return nullptr; + } + + const google::protobuf::Message* prototype_msg = _factory.GetPrototype( + message_desc); + if (prototype_msg == nullptr) { + return nullptr; + } + + std::unique_ptr mutable_msg + = std::unique_ptr(prototype_msg->New()); + if (mutable_msg == nullptr) { + return nullptr; + } + + if (!mutable_msg->ParseFromString(message)) { + datalake_log.debug( + "Failed to parse protobuf message. Protobuf error string: \"{}\"", + mutable_msg->InitializationErrorString()); + return nullptr; + } + return mutable_msg; +} + +const google::protobuf::Descriptor* +datalake::proto_to_arrow_converter::message_descriptor() { + // A Protobuf schema can contain multiple message types. + // This code assumes the first message type defined in the file + // is the main one. (In Kafka, this is considered the common case and + // optimized) + // TODO: parse message descriptor path from message + // https://github.com/confluentinc/confluent-kafka-go/blob/b6a3254310f6d9707f283f0b53ef4d3e1ff48e3b/schemaregistry/serde/protobuf/protobuf.go#L437-L456 + int message_type_count = _file_desc->message_type_count(); + if (message_type_count == 0) { + return nullptr; + } + return _file_desc->message_type(0); +} +void error_collector::AddError( + int line, int column, const std::string& message) { + // Warning level because this is an error in the input, not Redpanda + // itself. + datalake_log.warn("Protobuf Error {}:{} {}", line, column, message); +} +void error_collector::AddWarning( + int line, int column, const std::string& message) { + datalake_log.warn("Protobuf Warning {}:{} {}", line, column, message); +} +} // namespace datalake diff --git a/src/v/datalake/protobuf_to_arrow_converter.h b/src/v/datalake/protobuf_to_arrow_converter.h new file mode 100644 index 0000000000000..819dafca047f6 --- /dev/null +++ b/src/v/datalake/protobuf_to_arrow_converter.h @@ -0,0 +1,80 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once +#include "datalake/errors.h" +#include "datalake/logger.h" +#include "datalake/proto_to_arrow_interface.h" +#include "datalake/proto_to_arrow_scalar.h" +#include "datalake/proto_to_arrow_struct.h" + +#include +#include +#include +#include + +#include +#include + +namespace datalake { + +struct error_collector : ::google::protobuf::io::ErrorCollector { + void AddError(int line, int column, const std::string& message) override; + void AddWarning(int line, int column, const std::string& message) override; +}; + +/** Top-level interface for parsing Protobuf messages to an Arrow table + +This class deserializes protobuf messages and passes the deserialized messages +to an instance of proto_to_arrow_struct to recursively parse the structured +message. +*/ +class proto_to_arrow_converter { +public: + explicit proto_to_arrow_converter(const ss::sstring& schema); + + [[nodiscard]] arrow_converter_status + add_message(const ss::sstring& serialized_message); + + [[nodiscard]] arrow_converter_status finish_batch(); + + std::shared_ptr build_table(); + + std::shared_ptr build_schema(); + +private: + FRIEND_TEST(ArrowWriter, EmptyMessageTest); + FRIEND_TEST(ArrowWriter, SimpleMessageTest); + FRIEND_TEST(ArrowWriter, NestedMessageTest); + FRIEND_TEST(ArrowWriter, InvalidMessagetest); + + void initialize_protobuf_schema(const ss::sstring& schema); + + bool initialize_struct_converter(); + + /// Parse the message to a protobuf message. + /// Return nullptr on error. + // TODO: Add a version of this method that takes an iobuf + // and creates a `google::protobuf::io::ZeroCopyInputStream`. + // We can then call ParseFromBoundedZeroCopyStream + std::unique_ptr + parse_message(const ss::sstring& message); + const google::protobuf::Descriptor* message_descriptor(); + +private: + google::protobuf::DescriptorPool _protobuf_descriptor_pool; + google::protobuf::FileDescriptorProto _file_descriptor_proto; + google::protobuf::DynamicMessageFactory _factory; + const google::protobuf::FileDescriptor* _file_desc{}; + + std::unique_ptr _struct_converter; + error_collector error_collector; +}; + +} // namespace datalake diff --git a/src/v/datalake/tests/CMakeLists.txt b/src/v/datalake/tests/CMakeLists.txt new file mode 100644 index 0000000000000..d6d6b6f6c4b59 --- /dev/null +++ b/src/v/datalake/tests/CMakeLists.txt @@ -0,0 +1,13 @@ +rp_test( + UNIT_TEST + GTEST + BINARY_NAME gtest_proto_to_arrow + SOURCES + proto_to_arrow_gtest.cc + LIBRARIES + v::gtest_main + v::datalake + LABELS datalake + ARGS "-- -c 1" + ) + \ No newline at end of file diff --git a/src/v/datalake/tests/proto_to_arrow_gtest.cc b/src/v/datalake/tests/proto_to_arrow_gtest.cc new file mode 100644 index 0000000000000..2c3587fa390f7 --- /dev/null +++ b/src/v/datalake/tests/proto_to_arrow_gtest.cc @@ -0,0 +1,186 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#include "datalake/errors.h" +#include "datalake/protobuf_to_arrow_converter.h" +#include "datalake/tests/proto_to_arrow_test_utils.h" +#include "test_utils/test.h" + +#include +#include +#include + +#include +#include + +namespace datalake { +TEST(ArrowWriter, InvalidMessagetest) { + using namespace datalake; + test_data test_data; + std::string schema = test_data.empty_schema; + + proto_to_arrow_converter converter(schema); + + std::string serialized_message{"This is not a Protobuf message!"}; + + auto parsed_message = converter.parse_message(serialized_message); + EXPECT_EQ(parsed_message, nullptr); +} + +TEST(ArrowWriter, EmptyMessageTest) { + using namespace datalake; + test_data test_data; + std::string schema = test_data.empty_schema; + + proto_to_arrow_converter converter(schema); + + std::string serialized_message = generate_empty_message(); + + auto parsed_message = converter.parse_message(serialized_message); + EXPECT_NE(parsed_message, nullptr); + EXPECT_EQ(parsed_message->GetTypeName(), "datalake.proto.empty_message"); +} + +TEST(ArrowWriter, SimpleMessageTest) { + using namespace datalake; + std::string serialized_message = generate_simple_message( + "Hello world", 12345); + + test_data test_data; + proto_to_arrow_converter converter(test_data.simple_schema); + auto parsed_message = converter.parse_message(serialized_message); + EXPECT_NE(parsed_message, nullptr); + EXPECT_EQ(parsed_message->GetTypeName(), "datalake.proto.simple_message"); + + EXPECT_EQ( + arrow_converter_status::ok, converter.add_message(serialized_message)); + { + EXPECT_EQ( + arrow_converter_status::ok, + converter.add_message(generate_simple_message("I", 1))); + EXPECT_EQ( + arrow_converter_status::ok, + converter.add_message(generate_simple_message("II", 2))); + EXPECT_EQ( + arrow_converter_status::ok, + converter.add_message(generate_simple_message("III", 3))); + EXPECT_EQ( + arrow_converter_status::ok, + converter.add_message(generate_simple_message("IV", 4))); + EXPECT_EQ( + arrow_converter_status::ok, + converter.add_message(generate_simple_message("V", 5))); + } + EXPECT_EQ(arrow_converter_status::ok, converter.finish_batch()); + + auto schema = converter.build_schema(); + auto table = converter.build_table(); + EXPECT_EQ( + schema->field_names(), + std::vector( + {"label", + "number", + "big_number", + "float_number", + "double_number", + "true_or_false"})); + std::vector table_field_names; + for (const auto& field : table->fields()) { + table_field_names.push_back(field->name()); + } + EXPECT_EQ(table_field_names, schema->field_names()); + std::vector expected{ + "Hello world", "I", "II", "III", "IV", "V"}; + for (int i = 0; i < expected.size(); i++) { + EXPECT_EQ( + table->GetColumnByName("label")->GetScalar(i)->get()->ToString(), + expected[i]); + } + + expected = {"12345", "1", "2", "3", "4", "5"}; + for (int i = 0; i < expected.size(); i++) { + EXPECT_EQ( + table->GetColumnByName("number")->GetScalar(i)->get()->ToString(), + expected[i]); + } + EXPECT_EQ( + table->ToString(), + "label: string\nnumber: int32\nbig_number: int64\nfloat_number: " + "float\ndouble_number: double\ntrue_or_false: bool\n----\nlabel:\n [\n " + " [\n \"Hello world\",\n \"I\",\n \"II\",\n " + "\"III\",\n \"IV\",\n \"V\"\n ]\n ]\nnumber:\n [\n [\n " + " 12345,\n 1,\n 2,\n 3,\n 4,\n 5\n ]\n " + "]\nbig_number:\n [\n [\n 123450,\n 10,\n 20,\n " + "30,\n 40,\n 50\n ]\n ]\nfloat_number:\n [\n [\n " + "1234.5,\n 0.1,\n 0.2,\n 0.3,\n 0.4,\n 0.5\n " + "]\n ]\ndouble_number:\n [\n [\n 123.45,\n 0.01,\n " + "0.02,\n 0.03,\n 0.04,\n 0.05\n ]\n " + "]\ntrue_or_false:\n [\n [\n false,\n false,\n " + "true,\n false,\n true,\n false\n ]\n ]\n"); +} + +TEST(ArrowWriter, NestedMessageTest) { + using namespace datalake; + std::string serialized_message = generate_nested_message( + "Hello world", 12345); + + test_data test_data; + proto_to_arrow_converter converter(test_data.nested_schema); + auto parsed_message = converter.parse_message(serialized_message); + EXPECT_NE(parsed_message, nullptr); + EXPECT_EQ(parsed_message->GetTypeName(), "datalake.proto.nested_message"); + + EXPECT_EQ( + arrow_converter_status::ok, converter.add_message(serialized_message)); + { + EXPECT_EQ( + arrow_converter_status::ok, + converter.add_message(generate_nested_message("I", 1))); + EXPECT_EQ( + arrow_converter_status::ok, + converter.add_message(generate_nested_message("II", 2))); + EXPECT_EQ( + arrow_converter_status::ok, + converter.add_message(generate_nested_message("III", 3))); + EXPECT_EQ( + arrow_converter_status::ok, + converter.add_message(generate_nested_message("IV", 4))); + EXPECT_EQ( + arrow_converter_status::ok, + converter.add_message(generate_nested_message("V", 5))); + } + EXPECT_EQ(arrow_converter_status::ok, converter.finish_batch()); + + auto schema = converter.build_schema(); + auto table = converter.build_table(); + + EXPECT_EQ( + schema->field_names(), + std::vector({"label", "number", "inner_message"})); + std::vector table_field_names; + for (const auto& field : table->fields()) { + table_field_names.push_back(field->name()); + } + EXPECT_EQ(table_field_names, schema->field_names()); + EXPECT_EQ( + table->ToString(), + "label: string\nnumber: int32\ninner_message: struct\n child 0, inner_label: string\n child " + "1, inner_number: int32\n----\nlabel:\n [\n [\n \"Hello " + "world\",\n \"I\",\n \"II\",\n \"III\",\n \"IV\",\n " + " \"V\"\n ]\n ]\nnumber:\n [\n [\n 12345,\n 1,\n " + " 2,\n 3,\n 4,\n 5\n ]\n ]\ninner_message:\n [\n " + "-- is_valid: all not null\n -- child 0 type: string\n [\n " + " \"inner: Hello world\",\n \"inner: I\",\n \"inner: " + "II\",\n \"inner: III\",\n \"inner: IV\",\n " + "\"inner: V\"\n ]\n -- child 1 type: int32\n [\n " + "-12345,\n -1,\n -2,\n -3,\n -4,\n " + "-5\n ]\n ]\n"); +} +} // namespace datalake diff --git a/src/v/datalake/tests/proto_to_arrow_test_utils.h b/src/v/datalake/tests/proto_to_arrow_test_utils.h new file mode 100644 index 0000000000000..6996b6fc79331 --- /dev/null +++ b/src/v/datalake/tests/proto_to_arrow_test_utils.h @@ -0,0 +1,373 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once +#include "model/record.h" +#include "random/generators.h" + +#include + +#include +#include + +#include +#include +#include + +namespace datalake { +// This is here for now to split up the stack of commits into separate PRs +struct schema_info { + std::string schema; +}; +} // namespace datalake + +struct test_data { + std::string empty_schema = (R"schema( + syntax = "proto2"; +package datalake.proto; + +message empty_message {} + )schema"); + + std::string simple_schema = (R"schema( + syntax = "proto2"; +package datalake.proto; + +message simple_message { + optional string label = 1; + optional int32 number = 3; + optional int64 big_number = 4; + optional float float_number = 5; + optional double double_number = 6; + optional bool true_or_false = 7; +} + )schema"); + + std::string nested_schema = (R"schema( +syntax = "proto2"; +package datalake.proto; + +message nested_message { + optional string label = 1; + optional int32 number = 2; + optional inner_message_t inner_message = 3; +} + +message inner_message_t { + optional string inner_label = 1; + optional int32 inner_number = 2; +} + +)schema"); + + std::string combined_schema = (R"schema( +syntax = "proto2"; +package datalake.proto; + +message empty_message {} + +message simple_message { + optional string label = 1; + optional int32 number = 3; + optional int64 big_number = 4; + optional float float_number = 5; + optional double double_number = 6; + optional bool true_or_false = 7; +} + +message inner_message_t { + optional string inner_label = 1; + optional int32 inner_number = 2; +} + +message nested_message { + optional string label = 1; + optional int32 number = 2; + optional inner_message_t inner_message = 3; +} +)schema"); + + std::string test_message_name = "simple_message"; +}; + +// TODO: Make this a fixture? +struct test_message_builder { + test_message_builder(test_data data) + : _proto_input_stream{data.combined_schema.c_str(), int(data.combined_schema.size())} + , _tokenizer{&_proto_input_stream, nullptr} { + if (!_parser.Parse(&_tokenizer, &_file_descriptor_proto)) { + exit(-1); + } + + if (!_file_descriptor_proto.has_name()) { + _file_descriptor_proto.set_name("proto_file"); + } + + // Build a descriptor pool + _file_desc = _pool.BuildFile(_file_descriptor_proto); + assert(_file_desc != nullptr); + } + + google::protobuf::Message* generate_unserialized_message_generic( + const std::string& message_type, + const std::function& populate_message) { + // Get the message descriptor + const google::protobuf::Descriptor* message_desc + = _file_desc->FindMessageTypeByName(message_type); + assert(message_desc != nullptr); + + // Parse the actual message + const google::protobuf::Message* prototype_msg = _factory.GetPrototype( + message_desc); + assert(prototype_msg != nullptr); + + google::protobuf::Message* mutable_msg = prototype_msg->New(); + assert(mutable_msg != nullptr); + populate_message(mutable_msg); + + return mutable_msg; + } + + std::string generate_message_generic( + const std::string& message_type, + const std::function& populate_message) { + auto msg = generate_unserialized_message_generic( + message_type, populate_message); + std::string ret = msg->SerializeAsString(); + delete msg; + return ret; + } + + google::protobuf::FileDescriptorProto _file_descriptor_proto; + google::protobuf::compiler::Parser _parser; + google::protobuf::io::ArrayInputStream _proto_input_stream; + google::protobuf::io::Tokenizer _tokenizer; + google::protobuf::DescriptorPool _pool; + const google::protobuf::FileDescriptor* _file_desc; + google::protobuf::DynamicMessageFactory _factory; +}; + +inline std::string generate_empty_message() { + test_data test_data; + test_message_builder builder(test_data); + return builder.generate_message_generic( + "empty_message", [](google::protobuf::Message* message) {}); +} + +inline std::string +generate_simple_message(const std::string& label, int32_t number) { + test_data test_data; + test_message_builder builder(test_data); + return builder.generate_message_generic( + "simple_message", [&](google::protobuf::Message* message) { + auto reflection = message->GetReflection(); + // Have to use field indices here because + // message->GetReflections()->ListFields() only returns fields + // that are actually present in the message; + for (int field_idx = 0; + field_idx < message->GetDescriptor()->field_count(); + field_idx++) { + auto field_desc = message->GetDescriptor()->field(field_idx); + if (field_desc->name() == "label") { + reflection->SetString(message, field_desc, label); + } else if (field_desc->name() == "number") { + reflection->SetInt32(message, field_desc, number); + } else if (field_desc->name() == "big_number") { + reflection->SetInt64(message, field_desc, 10 * number); + } else if (field_desc->name() == "float_number") { + reflection->SetFloat(message, field_desc, number / 10.0); + } else if (field_desc->name() == "double_number") { + reflection->SetDouble(message, field_desc, number / 100.0); + } else if (field_desc->name() == "true_or_false") { + reflection->SetBool(message, field_desc, number % 2 == 0); + } + } + }); +} + +inline google::protobuf::Message* generate_inner_message( + test_message_builder& builder, const std::string& label, int32_t number) { + auto ret = builder.generate_unserialized_message_generic( + "inner_message_t", [&](google::protobuf::Message* message) { + auto reflection = message->GetReflection(); + // Have to use field indices here because + // message->GetReflections()->ListFields() only returns fields + // that are actually present in the message; + for (int field_idx = 0; + field_idx < message->GetDescriptor()->field_count(); + field_idx++) { + auto field_desc = message->GetDescriptor()->field(field_idx); + if (field_desc->name() == "inner_label") { + reflection->SetString(message, field_desc, label); + } else if (field_desc->name() == "inner_number") { + reflection->SetInt32(message, field_desc, number); + } + } + }); + + return ret; +} + +inline std::string +generate_nested_message(const std::string& label, int32_t number) { + test_data test_data; + test_message_builder builder(test_data); + auto inner = generate_inner_message(builder, "inner: " + label, -number); + + return builder.generate_message_generic( + "nested_message", [&](google::protobuf::Message* message) { + auto reflection = message->GetReflection(); + // Have to use field indices here because + // message->GetReflections()->ListFields() only returns fields + // that are actually present in the message; + + for (int field_idx = 0; + field_idx < message->GetDescriptor()->field_count(); + field_idx++) { + auto field_desc = message->GetDescriptor()->field(field_idx); + if (field_desc->name() == "label") { + reflection->SetString(message, field_desc, label); + } else if (field_desc->name() == "number") { + reflection->SetInt32(message, field_desc, number); + } else if (field_desc->name() == "inner_message") { + reflection->SetAllocatedMessage(message, inner, field_desc); + } + } + }); +} + +inline datalake::schema_info get_test_schema() { + test_data data; + return { + .schema = data.nested_schema, + }; +} + +struct protobuf_random_batches_generator { + ss::circular_buffer + operator()(std::optional base_ts = std::nullopt) { + int count = random_generators::get_int(1, 10); + bool allow_compression = true; + model::offset offset(0); + model::timestamp ts = base_ts.value_or(model::timestamp::now()); + + ss::circular_buffer ret; + ret.reserve(count); + for (int i = 0; i < count; i++) { + // TODO: See todo comment in make_random_batches + auto b = make_protobuf_batch(offset, allow_compression, ts); + offset = b.last_offset() + model::offset(1); + b.set_term(model::term_id(0)); + ret.push_back(std::move(b)); + } + return ret; + } + + // Return a batch of records containing protobuf data using the simple + // schema above. + model::record_batch make_protobuf_batch( + model::offset offset, bool allow_compression, model::timestamp ts) { + // Based on make_random_batch, skip the transaction and idempotence + // stuff. + auto num_records = random_generators::get_int(1, 30); + auto max_ts = model::timestamp(ts() + num_records - 1); + auto header = model::record_batch_header{ + .size_bytes = 0, // computed later + .base_offset = offset, + .type = model::record_batch_type::raft_data, + .crc = 0, // we reassign later + .attrs = model::record_batch_attributes( + random_generators::get_int(0, allow_compression ? 4 : 0)), + .last_offset_delta = num_records - 1, + .first_timestamp = ts, + .max_timestamp = max_ts, + .producer_id = -1, + .producer_epoch = -1, + .base_sequence = 0, + .record_count = num_records}; + + auto size = model::packed_record_batch_header_size; + model::record_batch::records_type records; + auto rs = model::record_batch::uncompressed_records(); + rs.reserve(num_records); + for (int i = 0; i < num_records; i++) { + std::stringstream key_stream; + key_stream << i; + + std::string value = generate_simple_message( + "message # " + key_stream.str(), i); + rs.emplace_back( + make_record(offset() + i, ts() + i, key_stream.str(), value)); + } + + if (header.attrs.compression() != model::compression::none) { + iobuf body; + for (auto& r : rs) { + model::append_record_to_buffer(body, r); + } + rs.clear(); + records = compression::compressor::compress( + body, header.attrs.compression()); + size += std::get(records).size_bytes(); + } else { + for (auto& r : rs) { + size += r.size_bytes(); + size += vint::vint_size(r.size_bytes()); + } + records = std::move(rs); + } + header.ctx = model::record_batch_header::context( + model::term_id(0), ss::this_shard_id()); + header.size_bytes = size; + auto batch = model::record_batch(header, std::move(records)); + batch.header().crc = model::crc_record_batch(batch); + batch.header().header_crc = model::internal_header_only_crc( + batch.header()); + return batch; + } + + model::record make_record( + int ts_delta, + int offset_delta, + std::string key_string, + std::string value_string) { + iobuf key; + iobuf value; + key.append(key_string.data(), key_string.size()); + value.append(value_string.data(), value_string.size()); + // Headers in the storages tests are just random. We can leave them + // empty. + std::vector headers; + + auto size = sizeof(model::record_attributes::type) // attributes + + vint::vint_size(ts_delta) // timestamp delta + + vint::vint_size(offset_delta) // offset delta + + vint::vint_size(key.size_bytes()) // size of key-len + + key.size_bytes() // size of key + + vint::vint_size(value.size_bytes()) // size of value + + value.size_bytes() // size of value (includes lengths) + + vint::vint_size(headers.size()); + for (auto& h : headers) { + size += vint::vint_size(h.key_size()) + h.key().size_bytes() + + vint::vint_size(h.value_size()) + h.value().size_bytes(); + } + auto key_size = static_cast(key.size_bytes()); + auto value_size = static_cast(value.size_bytes()); + return { + static_cast(size), + model::record_attributes(0), + ts_delta, + offset_delta, + key_size, + std::move(key), + value_size, + std::move(value), + std::move(headers)}; + } +}; From a1b51d966ce366b0dd93752dfc739dd6e08d6fea Mon Sep 17 00:00:00 2001 From: Jim Cipar Date: Tue, 30 Jul 2024 17:39:03 -0400 Subject: [PATCH 2/2] datalake: Convert protobuf repeated fields into Arrow lists This change parses Protobuf repeated fields into Arrow lists. --- src/v/datalake/CMakeLists.txt | 1 + src/v/datalake/proto_to_arrow_interface.h | 4 +- src/v/datalake/proto_to_arrow_repeated.cc | 90 +++++++++++++ src/v/datalake/proto_to_arrow_repeated.h | 40 ++++++ src/v/datalake/proto_to_arrow_scalar.cc | 89 +++++++++++-- src/v/datalake/proto_to_arrow_struct.cc | 120 +++++++++++------- src/v/datalake/proto_to_arrow_struct.h | 8 +- src/v/datalake/protobuf_to_arrow_converter.h | 1 + src/v/datalake/tests/proto_to_arrow_gtest.cc | 37 ++++++ .../tests/proto_to_arrow_test_utils.h | 32 +++++ 10 files changed, 364 insertions(+), 58 deletions(-) create mode 100644 src/v/datalake/proto_to_arrow_repeated.cc create mode 100644 src/v/datalake/proto_to_arrow_repeated.h diff --git a/src/v/datalake/CMakeLists.txt b/src/v/datalake/CMakeLists.txt index 6c55ac91dc8dd..a699b4fe3258f 100644 --- a/src/v/datalake/CMakeLists.txt +++ b/src/v/datalake/CMakeLists.txt @@ -7,6 +7,7 @@ v_cc_library( NAME datalake SRCS protobuf_to_arrow_converter.cc + proto_to_arrow_repeated.cc proto_to_arrow_scalar.cc proto_to_arrow_struct.cc DEPS diff --git a/src/v/datalake/proto_to_arrow_interface.h b/src/v/datalake/proto_to_arrow_interface.h index 23c9df37c84d2..9f0906cfb4d23 100644 --- a/src/v/datalake/proto_to_arrow_interface.h +++ b/src/v/datalake/proto_to_arrow_interface.h @@ -11,6 +11,7 @@ #include #include +#include #include #include @@ -57,8 +58,9 @@ class proto_to_arrow_interface { /// Builder virtual std::shared_ptr builder() = 0; + virtual arrow::Status finish_batch() = 0; + // Methods with defaults - virtual arrow::Status finish_batch() { return arrow::Status::OK(); } std::shared_ptr finish() { return std::make_shared(std::move(_values)); } diff --git a/src/v/datalake/proto_to_arrow_repeated.cc b/src/v/datalake/proto_to_arrow_repeated.cc new file mode 100644 index 0000000000000..d852d14a013ec --- /dev/null +++ b/src/v/datalake/proto_to_arrow_repeated.cc @@ -0,0 +1,90 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "datalake/proto_to_arrow_repeated.h" + +#include "datalake/logger.h" +#include "datalake/proto_to_arrow_struct.h" + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace datalake::detail { +proto_to_arrow_repeated::proto_to_arrow_repeated( + const google::protobuf::FieldDescriptor* desc) { + // The documentation for ListBuilder is not very detailed. From the linked + // StackOverflow and some trial-and-error, it looks like it's used like + // this: + // 1. A ListBuilder is created with a shared pointer to another + // arrow::ArrayBuilder as a child. + // 2. *Before* adding elements to the child, call Append() on the + // ListBuilder. + // 3. Call Append(val) on the child builder in the normal way. + // 4. *Do not* call Finish() onthe child builder. + // 5. Call Finish() on the ListBuilder to get the array. + // + // https://stackoverflow.com/questions/78277111/is-there-a-way-to-nest-an-arrowarray-in-apache-arrow + _child_converter = make_converter(desc, true); + _builder = std::make_shared( + arrow::default_memory_pool(), _child_converter->builder()); +} + +arrow::Status proto_to_arrow_repeated::add_child_value( + const google::protobuf::Message* msg, int field_idx) { + if (!_arrow_status.ok()) { + return _arrow_status; + } + _arrow_status = _builder->Append(); + if (!_arrow_status.ok()) { + return _arrow_status; + } + _arrow_status = _child_converter->add_child_value(msg, field_idx); + + return _arrow_status; +} + +std::shared_ptr +proto_to_arrow_repeated::field(const std::string& name) { + std::shared_ptr arrow_type = arrow::list(arrow::int32()); + return arrow::field(name, arrow_type); +}; + +std::shared_ptr proto_to_arrow_repeated::builder() { + return _builder; +}; + +arrow::Status proto_to_arrow_repeated::finish_batch() { + std::cerr << "finish_batch with " << _builder->length() << " items\n"; + + if (!_arrow_status.ok()) { + return _arrow_status; + } + auto builder_result = _builder->Finish(); + _arrow_status = builder_result.status(); + std::shared_ptr array; + if (!_arrow_status.ok()) { + return _arrow_status; + } + + // Safe because we validated the status after calling `Finish` + array = std::move(builder_result).ValueUnsafe(); + std::cerr << "adding array with " << array->length() + << " elements to _values\n"; + _values.push_back(array); + return _arrow_status; +} +} // namespace datalake::detail diff --git a/src/v/datalake/proto_to_arrow_repeated.h b/src/v/datalake/proto_to_arrow_repeated.h new file mode 100644 index 0000000000000..4a6d72a1bd27c --- /dev/null +++ b/src/v/datalake/proto_to_arrow_repeated.h @@ -0,0 +1,40 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "datalake/proto_to_arrow_interface.h" + +#include +#include +#include +#include + +#include + +namespace datalake::detail { + +class proto_to_arrow_repeated : public proto_to_arrow_interface { +public: + proto_to_arrow_repeated(const google::protobuf::FieldDescriptor*); + arrow::Status + add_child_value(const google::protobuf::Message*, int) override; + + std::shared_ptr field(const std::string&) override; + + std::shared_ptr builder() override; + + arrow::Status finish_batch() override; + +private: + std::shared_ptr _builder; + std::unique_ptr _child_converter; +}; + +} // namespace datalake::detail diff --git a/src/v/datalake/proto_to_arrow_scalar.cc b/src/v/datalake/proto_to_arrow_scalar.cc index 7dc48d407da36..b825f19ce3e3c 100644 --- a/src/v/datalake/proto_to_arrow_scalar.cc +++ b/src/v/datalake/proto_to_arrow_scalar.cc @@ -28,8 +28,19 @@ void proto_to_arrow_scalar::do_add( _arrow_status = arrow::Status::Invalid("Invalid protobuf field index"); return; } - _arrow_status = _builder->Append( - msg->GetReflection()->GetInt32(*msg, desc)); + if (desc->is_repeated()) { + for (int i = 0; i < msg->GetReflection()->FieldSize(*msg, desc); i++) { + const auto val = msg->GetReflection()->GetRepeatedInt32( + *msg, desc, i); + _arrow_status = _builder->Append(val); + if (!_arrow_status.ok()) { + break; + } + } + } else { + _arrow_status = _builder->Append( + msg->GetReflection()->GetInt32(*msg, desc)); + } } template<> @@ -41,8 +52,19 @@ void proto_to_arrow_scalar::do_add( _arrow_status = arrow::Status::Invalid("Invalid protobuf field index"); return; } - _arrow_status = _builder->Append( - msg->GetReflection()->GetInt64(*msg, desc)); + if (desc->is_repeated()) { + for (int i = 0; i < msg->GetReflection()->FieldSize(*msg, desc); i++) { + const auto val = msg->GetReflection()->GetRepeatedInt64( + *msg, desc, i); + _arrow_status = _builder->Append(val); + if (!_arrow_status.ok()) { + break; + } + } + } else { + _arrow_status = _builder->Append( + msg->GetReflection()->GetInt64(*msg, desc)); + } } template<> @@ -54,7 +76,19 @@ void proto_to_arrow_scalar::do_add( _arrow_status = arrow::Status::Invalid("Invalid protobuf field index"); return; } - _arrow_status = _builder->Append(msg->GetReflection()->GetBool(*msg, desc)); + if (desc->is_repeated()) { + for (int i = 0; i < msg->GetReflection()->FieldSize(*msg, desc); i++) { + const auto val = msg->GetReflection()->GetRepeatedBool( + *msg, desc, i); + _arrow_status = _builder->Append(val); + if (!_arrow_status.ok()) { + break; + } + } + } else { + _arrow_status = _builder->Append( + msg->GetReflection()->GetBool(*msg, desc)); + } } template<> @@ -66,8 +100,19 @@ void proto_to_arrow_scalar::do_add( _arrow_status = arrow::Status::Invalid("Invalid protobuf field index"); return; } - _arrow_status = _builder->Append( - msg->GetReflection()->GetFloat(*msg, desc)); + if (desc->is_repeated()) { + for (int i = 0; i < msg->GetReflection()->FieldSize(*msg, desc); i++) { + const auto val = msg->GetReflection()->GetRepeatedFloat( + *msg, desc, i); + _arrow_status = _builder->Append(val); + if (!_arrow_status.ok()) { + break; + } + } + } else { + _arrow_status = _builder->Append( + msg->GetReflection()->GetFloat(*msg, desc)); + } } template<> @@ -79,8 +124,19 @@ void proto_to_arrow_scalar::do_add( _arrow_status = arrow::Status::Invalid("Invalid protobuf field index"); return; } - _arrow_status = _builder->Append( - msg->GetReflection()->GetDouble(*msg, desc)); + if (desc->is_repeated()) { + for (int i = 0; i < msg->GetReflection()->FieldSize(*msg, desc); i++) { + const auto val = msg->GetReflection()->GetRepeatedDouble( + *msg, desc, i); + _arrow_status = _builder->Append(val); + if (!_arrow_status.ok()) { + break; + } + } + } else { + _arrow_status = _builder->Append( + msg->GetReflection()->GetDouble(*msg, desc)); + } } template<> @@ -92,8 +148,19 @@ void proto_to_arrow_scalar::do_add( _arrow_status = arrow::Status::Invalid("Invalid protobuf field index"); return; } - _arrow_status = _builder->Append( - msg->GetReflection()->GetString(*msg, desc)); + if (desc->is_repeated()) { + for (int i = 0; i < msg->GetReflection()->FieldSize(*msg, desc); i++) { + const auto val = msg->GetReflection()->GetRepeatedString( + *msg, desc, i); + _arrow_status = _builder->Append(val); + if (!_arrow_status.ok()) { + break; + } + } + } else { + _arrow_status = _builder->Append( + msg->GetReflection()->GetString(*msg, desc)); + } } } // namespace datalake::detail diff --git a/src/v/datalake/proto_to_arrow_struct.cc b/src/v/datalake/proto_to_arrow_struct.cc index cf46430f4bac5..07744154ac9da 100644 --- a/src/v/datalake/proto_to_arrow_struct.cc +++ b/src/v/datalake/proto_to_arrow_struct.cc @@ -10,6 +10,8 @@ #include "datalake/proto_to_arrow_struct.h" #include "datalake/logger.h" +#include "datalake/proto_to_arrow_repeated.h" +#include "datalake/proto_to_arrow_scalar.h" #include #include @@ -61,13 +63,24 @@ arrow::Status datalake::detail::proto_to_arrow_struct::add_child_value( _arrow_status = arrow::Status::Invalid("Invalid protobuf field index"); return _arrow_status; } - auto child_message = &msg->GetReflection()->GetMessage(*msg, desc); - return add_struct_message(child_message); + if (desc->is_repeated()) { + for (int i = 0; i < msg->GetReflection()->FieldSize(*msg, desc); i++) { + auto val = &msg->GetReflection()->GetRepeatedMessage(*msg, desc, i); + _arrow_status = _builder->Append(val); + if (!_arrow_status.ok()) { + return _arrow_status; + } + } + } else { + auto child_message = &msg->GetReflection()->GetMessage(*msg, desc); + _arrow_status = add_struct_message(child_message); + } + return _arrow_status; } + datalake::detail::proto_to_arrow_struct::proto_to_arrow_struct( const google::protobuf::Descriptor* message_descriptor) { using namespace detail; - namespace pb = google::protobuf; // Set up child arrays _child_converters.reserve(message_descriptor->field_count()); @@ -81,49 +94,13 @@ datalake::detail::proto_to_arrow_struct::proto_to_arrow_struct( throw datalake::initialization_error( "Invalid protobuf field index"); } - switch (desc->cpp_type()) { - case pb::FieldDescriptor::CPPTYPE_INT32: - _child_converters.push_back( - std::make_unique>()); - break; - case pb::FieldDescriptor::CPPTYPE_INT64: - _child_converters.push_back( - std::make_unique>()); - break; - case pb::FieldDescriptor::CPPTYPE_BOOL: - _child_converters.push_back( - std::make_unique>()); - break; - case pb::FieldDescriptor::CPPTYPE_FLOAT: - _child_converters.push_back( - std::make_unique>()); - break; - case pb::FieldDescriptor::CPPTYPE_DOUBLE: - _child_converters.push_back( - std::make_unique>()); - break; - case pb::FieldDescriptor::CPPTYPE_STRING: - _child_converters.push_back( - std::make_unique>()); - break; - case pb::FieldDescriptor::CPPTYPE_MESSAGE: { - auto field_message_descriptor = desc->message_type(); - if (field_message_descriptor == nullptr) { - _arrow_status = arrow::Status::Invalid( - "Invalid protobuf field index"); - throw datalake::initialization_error(fmt::format( - "Can't find schema for nested type : {}", - desc->cpp_type_name())); - } - _child_converters.push_back(std::make_unique( - field_message_descriptor)); - } break; - case pb::FieldDescriptor::CPPTYPE_ENUM: // TODO - case pb::FieldDescriptor::CPPTYPE_UINT32: // not supported by Iceberg - case pb::FieldDescriptor::CPPTYPE_UINT64: // not supported by Iceberg - throw datalake::initialization_error( - fmt::format("Unknown type: {}", desc->cpp_type_name())); + auto converter = make_converter(desc); + if (!converter) { + throw datalake::initialization_error(fmt::format( + "Unable to create protobuf converter for type: {}", + desc->cpp_type_name())); } + _child_converters.push_back(std::move(converter)); } // Make Arrow data types @@ -140,6 +117,12 @@ datalake::detail::proto_to_arrow_struct::proto_to_arrow_struct( throw datalake::initialization_error( "Invalid protobuf field index"); } + auto child_field = _child_converters[field_idx]->field( + field_desc->name()); + if (!child_field) { + throw datalake::initialization_error( + "Invalid protobuf child field"); + } _fields.push_back( _child_converters[field_idx]->field(field_desc->name())); } @@ -165,3 +148,50 @@ datalake::detail::proto_to_arrow_struct::field(const std::string& name) { arrow::FieldVector datalake::detail::proto_to_arrow_struct::get_field_vector() { return _fields; } + +std::unique_ptr +datalake::detail::make_converter( + const google::protobuf::FieldDescriptor* desc, bool ignore_repeated) { + if (!desc) { + // TODO logging and error codes + return nullptr; + } + switch (desc->cpp_type()) { + case google::protobuf::FieldDescriptor::CPPTYPE_INT32: + if (desc->is_repeated() && !ignore_repeated) { + return std::make_unique(desc); + } else { + return std::make_unique>(); + } + break; + case google::protobuf::FieldDescriptor::CPPTYPE_INT64: + return std::make_unique>(); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: + return std::make_unique>(); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: + return std::make_unique>(); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: + return std::make_unique>(); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_STRING: + return std::make_unique>(); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: { + auto field_message_descriptor = desc->message_type(); + if (field_message_descriptor == nullptr) { + return nullptr; + } + return std::make_unique( + field_message_descriptor); + } break; + case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: // TODO + case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: // not supported by + // Iceberg + case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: // not supported by + // Iceberg + return nullptr; + } +} diff --git a/src/v/datalake/proto_to_arrow_struct.h b/src/v/datalake/proto_to_arrow_struct.h index caaac0c9e0871..9be83cc0705bb 100644 --- a/src/v/datalake/proto_to_arrow_struct.h +++ b/src/v/datalake/proto_to_arrow_struct.h @@ -11,13 +11,16 @@ #include "datalake/errors.h" #include "datalake/proto_to_arrow_interface.h" -#include "datalake/proto_to_arrow_scalar.h" + +#include +#include #include #include namespace google::protobuf { class Descriptor; +class FieldDescriptor; } namespace datalake::detail { @@ -50,4 +53,7 @@ class proto_to_arrow_struct : public proto_to_arrow_interface { arrow::FieldVector _fields; }; +std::unique_ptr make_converter( + const google::protobuf::FieldDescriptor* desc, bool ignore_repeated = false); + } // namespace datalake::detail diff --git a/src/v/datalake/protobuf_to_arrow_converter.h b/src/v/datalake/protobuf_to_arrow_converter.h index 819dafca047f6..f8e05d5fb7301 100644 --- a/src/v/datalake/protobuf_to_arrow_converter.h +++ b/src/v/datalake/protobuf_to_arrow_converter.h @@ -53,6 +53,7 @@ class proto_to_arrow_converter { FRIEND_TEST(ArrowWriter, SimpleMessageTest); FRIEND_TEST(ArrowWriter, NestedMessageTest); FRIEND_TEST(ArrowWriter, InvalidMessagetest); + FRIEND_TEST(ArrowWriter, RepeatedFeildTest); void initialize_protobuf_schema(const ss::sstring& schema); diff --git a/src/v/datalake/tests/proto_to_arrow_gtest.cc b/src/v/datalake/tests/proto_to_arrow_gtest.cc index 2c3587fa390f7..4b649e7dc9550 100644 --- a/src/v/datalake/tests/proto_to_arrow_gtest.cc +++ b/src/v/datalake/tests/proto_to_arrow_gtest.cc @@ -183,4 +183,41 @@ TEST(ArrowWriter, NestedMessageTest) { "-12345,\n -1,\n -2,\n -3,\n -4,\n " "-5\n ]\n ]\n"); } + +TEST(ArrowWriter, RepeatedFeildTest) { + using namespace datalake; + test_data test_data; + std::string schema = test_data.repeated_schema; + + proto_to_arrow_converter converter(schema); + + std::string serialized_message = generate_repeated_field_message( + {1, 1, 2, 3, 5, 8, 13}); + + auto parsed_message = converter.parse_message(serialized_message); + EXPECT_NE(parsed_message, nullptr); + EXPECT_EQ(parsed_message->GetTypeName(), "datalake.proto.repeated_field"); + EXPECT_EQ( + parsed_message->DebugString(), + "numbers: 1\nnumbers: 1\nnumbers: 2\nnumbers: 3\nnumbers: 5\nnumbers: " + "8\nnumbers: 13\n"); + + EXPECT_EQ( + arrow_converter_status::ok, converter.add_message(serialized_message)); + EXPECT_EQ( + arrow_converter_status::ok, + converter.add_message(generate_repeated_field_message({2, 4, 6, 8}))); + EXPECT_EQ(arrow_converter_status::ok, converter.finish_batch()); + + auto table_schema = converter.build_schema(); + auto table = converter.build_table(); + EXPECT_EQ( + table_schema->field_names(), std::vector({"numbers"})); + EXPECT_EQ( + table->ToString(), + "numbers: list\n child 0, item: int32\n----\nnumbers:\n " + "[\n [\n [\n 1,\n 1,\n 2,\n 3,\n " + " 5,\n 8,\n 13\n ],\n [\n 2,\n " + "4,\n 6,\n 8\n ]\n ]\n ]\n"); +} } // namespace datalake diff --git a/src/v/datalake/tests/proto_to_arrow_test_utils.h b/src/v/datalake/tests/proto_to_arrow_test_utils.h index 6996b6fc79331..5e65a38ab5556 100644 --- a/src/v/datalake/tests/proto_to_arrow_test_utils.h +++ b/src/v/datalake/tests/proto_to_arrow_test_utils.h @@ -66,6 +66,15 @@ message inner_message_t { )schema"); + std::string repeated_schema = (R"schema( +syntax = "proto2"; +package datalake.proto; + +message repeated_field { + repeated int32 numbers = 1; +} + )schema"); + std::string combined_schema = (R"schema( syntax = "proto2"; package datalake.proto; @@ -91,6 +100,10 @@ message nested_message { optional int32 number = 2; optional inner_message_t inner_message = 3; } + +message repeated_field { + repeated int32 numbers = 1; +} )schema"); std::string test_message_name = "simple_message"; @@ -242,6 +255,25 @@ generate_nested_message(const std::string& label, int32_t number) { }); } +inline std::string +generate_repeated_field_message(std::vector numbers) { + test_data test_data; + test_message_builder builder(test_data); + return builder.generate_message_generic( + "repeated_field", [&](google::protobuf::Message* message) { + auto reflection = message->GetReflection(); + // Have to use field indices here because + // message->GetReflections()->ListFields() only returns fields + // that are actually present in the message; + assert(message->GetDescriptor()->field_count() == 1); + auto field_desc = message->GetDescriptor()->field(0); + assert(field_desc->name() == "numbers"); + for (const auto num : numbers) { + reflection->AddInt32(message, field_desc, num); + } + }); +} + inline datalake::schema_info get_test_schema() { test_data data; return {