diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index c473bfd63..da1e92931 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -167,6 +167,7 @@ add_library( src/cuda_event.cpp src/integrations/cudf/partition.cpp src/integrations/cudf/utils.cpp + src/nvcomp.cpp src/pausable_thread_loop.cpp src/progress_thread.cpp src/rmm_resource_adaptor.cpp diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index da5bd7be3..01ec5528a 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -61,6 +61,36 @@ install( EXCLUDE_FROM_ALL ) +# ---------------------------------------------------------------------------- +# nvCOMP compression + Communicator benchmark +find_package(nvcomp CONFIG REQUIRED) +add_executable(bench_comp_comm "bench_comp_comm.cpp") +set_target_properties( + bench_comp_comm + PROPERTIES RUNTIME_OUTPUT_DIRECTORY "$" + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON + # For std:: support of __int128_t. Can be removed once using cuda::std + CXX_EXTENSIONS ON + CUDA_STANDARD 20 + CUDA_STANDARD_REQUIRED ON + LINK_FLAGS "-Wl,--allow-shlib-undefined" +) +target_compile_options( + bench_comp_comm PRIVATE "$<$:${RAPIDSMPF_CXX_FLAGS}>" + "$<$:${RAPIDSMPF_CUDA_FLAGS}>" +) +target_link_libraries( + bench_comp_comm PRIVATE rapidsmpf::rapidsmpf ucxx::ucxx $ + nvcomp::nvcomp $ maybe_asan bench_utils +) +install( + TARGETS bench_comp_comm + COMPONENT benchmarking + DESTINATION bin/benchmarks/librapidsmpf + EXCLUDE_FROM_ALL +) + # Find or install GoogleBench include(${rapids-cmake-dir}/cpm/gbench.cmake) rapids_cpm_gbench(BUILD_STATIC) diff --git a/cpp/benchmarks/bench_comp_comm.cpp b/cpp/benchmarks/bench_comp_comm.cpp new file mode 100644 index 000000000..7fd1af243 --- /dev/null +++ b/cpp/benchmarks/bench_comp_comm.cpp @@ -0,0 +1,968 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef RAPIDSMPF_HAVE_CUPTI +#include +#endif + +#include "utils/misc.hpp" +#include "utils/rmm_stack.hpp" + +using namespace rapidsmpf; + +namespace { + +enum class PackMode { + Table, + Column +}; + +struct Args { + std::string comm_type{"mpi"}; + std::uint64_t num_runs{1}; + std::uint64_t num_warmups{0}; + std::string rmm_mr{"pool"}; + std::string file_pattern; // required + PackMode pack_mode{PackMode::Table}; + Algo algo{Algo::Cascaded}; + KvParams params{}; + std::uint64_t num_ops{1}; + bool enable_cupti_monitoring{false}; + std::string cupti_csv_prefix; + bool data_only{false}; +}; + +struct ArgumentParser { + ArgumentParser(int argc, char* const* argv, bool use_mpi) { + int rank = 0; + if (use_mpi) { + RAPIDSMPF_EXPECTS(mpi::is_initialized() == true, "MPI is not initialized"); + RAPIDSMPF_MPI(MPI_Comm_rank(MPI_COMM_WORLD, &rank)); + } + try { + int opt; + // C: comm, r: runs, w: warmups, m: rmm, F: files, P: pack mode, A: algo, K: + // kv, p: ops, M: cupti, D: data-only, h: help + while ((opt = getopt(argc, argv, "C:r:w:m:F:P:A:K:p:M:Dh")) != -1) { + switch (opt) { + case 'C': + args_.comm_type = std::string{optarg}; + break; + case 'r': + parse_integer(args_.num_runs, optarg); + break; + case 'w': + parse_integer(args_.num_warmups, optarg); + break; + case 'm': + args_.rmm_mr = std::string{optarg}; + break; + case 'F': + args_.file_pattern = std::string{optarg}; + break; + case 'P': + { + std::string v{optarg}; + if (v == "table") + args_.pack_mode = PackMode::Table; + else if (v == "column") + args_.pack_mode = PackMode::Column; + else + RAPIDSMPF_FAIL( + "-P must be one of {table, column}", std::invalid_argument + ); + break; + } + case 'A': + { + std::string v{optarg}; + if (v == "cascaded") + args_.algo = Algo::Cascaded; + else if (v == "lz4") + args_.algo = Algo::LZ4; + else if (v == "zstd") + args_.algo = Algo::Zstd; + else if (v == "snappy") + args_.algo = Algo::Snappy; + else + RAPIDSMPF_FAIL( + "-A must be one of {cascaded, lz4, zstd, snappy}", + std::invalid_argument + ); + break; + } + case 'K': + args_.params = parse_kv_params(std::string{optarg}); + break; + case 'p': + parse_integer(args_.num_ops, optarg); + break; + case 'M': + args_.enable_cupti_monitoring = true; + args_.cupti_csv_prefix = std::string{optarg}; + break; + case 'D': + args_.data_only = true; + break; + case 'h': + default: + { + std::stringstream ss; + ss << "Usage: " << argv[0] << " [options]\n" + << "Options:\n" + << " -C {mpi, ucxx} (default: mpi)\n" + << " -r Number of runs (default: 1)\n" + << " -w Number of warmup runs (default: 0)\n" + << " -m RMM MR {cuda, pool, async, managed} " + "(default: pool)\n" + << " -F Parquet file glob/pattern (required)\n" + << " -P Packing mode {table, column} (default: " + "table)\n" + << " -A {cascaded, lz4, zstd, snappy} (default: " + "cascaded)\n" + << " -K Algo params, e.g. " + "chunk_size=1MiB,delta=1,rle=1,bitpack=1\n" + << " -p Number of concurrent ops (default: 1)\n" + << " -M CUPTI CSV path prefix (enable CUPTI)\n" + << " -D Data-only mode (compress/transfer data " + "buffers only)\n" + << " -h Show this help\n"; + if (rank == 0) + std::cerr << ss.str(); + if (use_mpi) + RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, 0)); + else + std::exit(0); + } + } + } + } catch (std::exception const& e) { + std::cerr << "Error parsing arguments: " << e.what() << std::endl; + if (use_mpi) + RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, -1)); + else + std::exit(-1); + } + if (args_.file_pattern.empty()) { + std::cerr << "-F is required" << std::endl; + if (use_mpi) + RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, -1)); + else + std::exit(-1); + } + if (args_.rmm_mr == "cuda") { + if (rank == 0) { + std::cout << "WARNING: using the default cuda memory resource (-m cuda) " + "might leak memory!" + << std::endl; + } + } + } + + Args const& get() const { + return args_; + } + + static std::size_t parse_nbytes(std::string const& s) { + // Simple parser: supports suffixes KiB, MiB, GiB, KB, MB, GB, or no suffix. + auto to_lower = [](char c) { return static_cast(std::tolower(c)); }; + std::string v; + v.reserve(s.size()); + for (char c : s) + v.push_back(to_lower(c)); + + std::size_t mult = 1; + if (v.ends_with("kib")) { + mult = 1ull << 10; + v = v.substr(0, v.size() - 3); + } else if (v.ends_with("mib")) { + mult = 1ull << 20; + v = v.substr(0, v.size() - 3); + } else if (v.ends_with("gib")) { + mult = 1ull << 30; + v = v.substr(0, v.size() - 3); + } else if (v.ends_with("kb")) { + mult = 1000ull; + v = v.substr(0, v.size() - 2); + } else if (v.ends_with("mb")) { + mult = 1000ull * 1000ull; + v = v.substr(0, v.size() - 2); + } else if (v.ends_with("gb")) { + mult = 1000ull * 1000ull * 1000ull; + v = v.substr(0, v.size() - 2); + } + + return static_cast(std::stoll(v)) * mult; + } + + static KvParams parse_kv_params(std::string const& kv) { + KvParams p{}; + if (kv.empty()) + return p; + std::size_t start = 0; + while (start < kv.size()) { + auto comma = kv.find(',', start); + auto part = kv.substr( + start, comma == std::string::npos ? std::string::npos : comma - start + ); + auto eq = part.find('='); + if (eq != std::string::npos) { + std::string key = part.substr(0, eq); + std::string val = part.substr(eq + 1); + if (key == "chunk_size") + p.chunk_size = parse_nbytes(val); + else if (key == "delta") + p.cascaded_delta = std::stoi(val); + else if (key == "rle") + p.cascaded_rle = std::stoi(val); + else if (key == "bitpack") + p.cascaded_bitpack = std::stoi(val); + } + if (comma == std::string::npos) + break; + start = comma + 1; + } + return p; + } + + private: + Args args_{}; +}; + +struct PackedItem { + // Ownership: we store size and buffer pointer for the packed payload + std::unique_ptr packed; // original packed cudf table/column + // Data-only mode: directly owned GPU buffer containing column/table data + std::unique_ptr raw_data; +}; + +struct BuffersToSend { + // For each op, we will send these items + std::vector items; + // Uncompressed bytes that represent the actual payload we transmit (device data only) + std::size_t total_uncompressed_bytes{0}; + // Convenience: device payload bytes (same as total_uncompressed_bytes here) + std::size_t total_payload_bytes{0}; +}; + +static inline Buffer& item_data_buffer(PackedItem& it) { + return it.packed ? *it.packed->data : *it.raw_data; +} + +static inline Buffer const& item_data_buffer(PackedItem const& it) { + return it.packed ? *it.packed->data : *it.raw_data; +} + +struct Timings { + double pack_s{0.0}; + double compress_s{0.0}; + double decompress_s{0.0}; + // Round-trip totals measured at initiator + double rt_nocomp_s{0.0}; + double rt_comp_s{0.0}; +}; + +struct Counters { + std::size_t logical_uncompressed_bytes{0}; + std::size_t logical_compressed_bytes{0}; +}; + +struct RunResult { + Timings times; + Counters counts; +}; + +std::vector expand_glob(std::string const& pattern) { + std::vector files; + glob_t glob_result{}; + int rc = glob(pattern.c_str(), GLOB_TILDE, nullptr, &glob_result); + if (rc == 0) { + for (std::size_t i = 0; i < glob_result.gl_pathc; ++i) { + files.emplace_back(glob_result.gl_pathv[i]); + } + } + globfree(&glob_result); + return files; +} + +static inline void ensure_ready(Buffer& buf) { + if (!buf.is_latest_write_done()) { + buf.stream().synchronize(); + } +} + +static std::unique_ptr pack_table_to_packed( + cudf::table_view tv, rmm::cuda_stream_view stream, BufferResource* br +) { + auto packed = cudf::pack(tv, stream, br->device_mr()); + auto ret = std::make_unique( + std::move(packed.metadata), br->move(std::move(packed.gpu_data), stream) + ); + return ret; +} + +BuffersToSend make_packed_items( + cudf::table const& table, + PackMode mode, + rmm::cuda_stream_view stream, + BufferResource* br +) { + BuffersToSend ret{}; + if (mode == PackMode::Table) { + auto item = PackedItem{}; + item.packed = pack_table_to_packed(table.view(), stream, br); + ensure_ready(*item.packed->data); + ret.total_uncompressed_bytes += item.packed->data->size; + ret.total_payload_bytes += item.packed->data->size; + ret.items.emplace_back(std::move(item)); + } else { + auto tv = table.view(); + for (cudf::size_type i = 0; i < tv.num_columns(); ++i) { + cudf::table_view col_tv{std::vector{tv.column(i)}}; + auto item = PackedItem{}; + item.packed = pack_table_to_packed(col_tv, stream, br); + ret.total_uncompressed_bytes += item.packed->data->size; + ret.total_payload_bytes += item.packed->data->size; + ret.items.emplace_back(std::move(item)); + } + for (std::size_t i = 0; i < ret.items.size(); ++i) { + ensure_ready(*ret.items[i].packed->data); + } + } + return ret; +} + +// Collect leaf data buffers recursively (device memory ranges) from a column. +static void collect_leaf_data_buffers( + cudf::column_view const& col, + std::vector>& buffers +) { + for (auto it = col.child_begin(); it != col.child_end(); ++it) { + collect_leaf_data_buffers(*it, buffers); + } + if (col.num_children() == 0 && col.size() > 0) { + auto const elem_size = static_cast(cudf::size_of(col.type())); + if (elem_size == 0) { + return; + } + auto const* base = col.head(); + if (base == nullptr) { + return; + } + auto const byte_offset = static_cast(col.offset()) * elem_size; + auto const num_bytes = static_cast(col.size()) * elem_size; + buffers.emplace_back(static_cast(base + byte_offset), num_bytes); + } +} + +static std::vector> collect_table_data_buffers( + cudf::table_view const& tv +) { + std::vector> out; + for (auto const& col : tv) { + collect_leaf_data_buffers(col, out); + } + return out; +} + +// Data-only path: build items consisting solely of copies of each leaf device buffer. +BuffersToSend make_data_only_items( + cudf::table const& table, rmm::cuda_stream_view stream, BufferResource* br +) { + BuffersToSend ret{}; + auto leaves = collect_table_data_buffers(table.view()); + ret.items.reserve(leaves.size()); + for (auto const& [src_ptr, nbytes] : leaves) { + if (nbytes == 0) { + continue; + } + auto reservation = br->reserve_or_fail(nbytes, MemoryType::DEVICE); + auto buf = br->allocate(nbytes, stream, reservation); + buf->write_access([&](std::byte* dst, rmm::cuda_stream_view s) { + RAPIDSMPF_CUDA_TRY_ALLOC( + cudaMemcpyAsync(dst, src_ptr, nbytes, cudaMemcpyDefault, s) + ); + }); + ensure_ready(*buf); + PackedItem item{}; + item.raw_data = std::move(buf); + ret.total_uncompressed_bytes += nbytes; + ret.total_payload_bytes += nbytes; + ret.items.emplace_back(std::move(item)); + } + return ret; +} + +static inline std::unique_ptr alloc_device( + BufferResource* br, rmm::cuda_stream_view stream, std::size_t size +) { + auto res = br->reserve_or_fail(size, MemoryType::DEVICE); + return br->allocate(size, stream, res); +} + +static inline std::unique_ptr alloc_and_copy_device( + BufferResource* br, rmm::cuda_stream_view stream, Buffer const& src +) { + auto out = alloc_device(br, stream, src.size); + buffer_copy(*out, src, src.size); + return out; +} + +static inline std::uint64_t exchange_u64_header( + std::shared_ptr const& comm, + BufferResource* br, + rmm::cuda_stream_view stream, + Rank peer, + Tag send_tag, + Tag recv_tag, + std::uint64_t send_value +) { + // Post header send + auto send_hdr_res = br->reserve_or_fail(sizeof(std::uint64_t), MemoryType::HOST); + auto send_hdr = br->allocate(sizeof(std::uint64_t), stream, send_hdr_res); + send_hdr->write_access([&](std::byte* p, rmm::cuda_stream_view) { + std::memcpy(p, &send_value, sizeof(std::uint64_t)); + }); + ensure_ready(*send_hdr); + auto send_hdr_fut = comm->send(std::move(send_hdr), peer, send_tag); + // Post header recv + auto recv_hdr_res = br->reserve_or_fail(sizeof(std::uint64_t), MemoryType::HOST); + auto recv_hdr = br->allocate(sizeof(std::uint64_t), stream, recv_hdr_res); + ensure_ready(*recv_hdr); + auto recv_hdr_fut = comm->recv(peer, recv_tag, std::move(recv_hdr)); + // Wait recv, read value, then ensure send completion + auto recv_hdr_buf = comm->wait(std::move(recv_hdr_fut)); + std::uint64_t recv_value = 0; + { + auto* p = recv_hdr_buf->exclusive_data_access(); + std::memcpy(&recv_value, p, sizeof(std::uint64_t)); + recv_hdr_buf->unlock(); + } + std::ignore = comm->wait(std::move(send_hdr_fut)); + return recv_value; +} + +static inline void exchange_payload( + std::shared_ptr const& comm, + BufferResource* br, + rmm::cuda_stream_view stream, + Rank peer, + Tag send_tag, + Tag recv_tag, + std::unique_ptr send_buf, + std::size_t recv_size +) { + std::unique_ptr data_send_fut; + std::unique_ptr data_recv_fut; + if (recv_size > 0) { + auto recv_buf = alloc_device(br, stream, recv_size); + ensure_ready(*recv_buf); + data_recv_fut = comm->recv(peer, recv_tag, std::move(recv_buf)); + } + if (send_buf && send_buf->size > 0) { + ensure_ready(*send_buf); + data_send_fut = comm->send(std::move(send_buf), peer, send_tag); + } + if (data_recv_fut) { + std::ignore = comm->wait(std::move(data_recv_fut)); + } + if (data_send_fut) { + std::ignore = comm->wait(std::move(data_send_fut)); + } +} + +RunResult run_once( + std::shared_ptr const& comm, + Args const& args, + rmm::cuda_stream_view stream, + BufferResource* br, + cudf::table const& table, + PackMode pack_mode, + NvcompCodec& codec, + std::uint64_t run_index +) { + auto const nranks = comm->nranks(); + auto const rank = comm->rank(); + auto const dst = static_cast((rank + 1) % nranks); + auto const src = static_cast((rank - 1 + nranks) % nranks); + + Tag tag_ping_nc{10, 0}; + Tag tag_pong_nc{10, 1}; + Tag tag_ping_c{11, 0}; + Tag tag_pong_c{11, 1}; + + // Pack data per iteration + auto p0 = Clock::now(); + auto packed = args.data_only ? make_data_only_items(table, stream, br) + : make_packed_items(table, pack_mode, stream, br); + auto p1 = Clock::now(); + + // Clone packed items into raw device buffers for repeated ops + std::vector> nocomp_payloads; + nocomp_payloads.reserve(packed.items.size()); + for (auto const& it : packed.items) { + // Copy metadata + data into a contiguous device buffer for pure send path? + // For pure send/recv, we only send the device payload; metadata isn't needed for + // metrics. We'll send the packed->data buffer. + auto const& src_buf = item_data_buffer(it); + auto reservation = br->reserve_or_fail(src_buf.size, MemoryType::DEVICE); + auto buf = br->allocate(src_buf.size, stream, reservation); + buffer_copy(*buf, src_buf, src_buf.size); + nocomp_payloads.emplace_back(std::move(buf)); + } + + // Pre-allocate compression outputs for each item + std::vector> comp_outputs; + std::vector comp_output_sizes(packed.items.size()); + comp_outputs.reserve(packed.items.size()); + for (std::size_t i = 0; i < packed.items.size(); ++i) { + auto const in_bytes = item_data_buffer(packed.items[i]).size; + std::size_t const max_out = + (in_bytes == 0) ? 1 : codec.get_max_compressed_bytes(in_bytes, stream); + auto reservation = br->reserve_or_fail(max_out, MemoryType::DEVICE); + comp_outputs.emplace_back(br->allocate(max_out, stream, reservation)); + } + + RAPIDSMPF_CUDA_TRY(cudaDeviceSynchronize()); + auto t0 = Clock::now(); + // Compress all items (single batch) on stream + std::vector comp_streams(packed.items.size()); + for (std::size_t i = 0; i < packed.items.size(); ++i) { + auto const in_bytes = item_data_buffer(packed.items[i]).size; + if (in_bytes == 0) { + comp_output_sizes[i] = 0; + continue; + } + // Launch compression on the output buffer's stream and record an event after + comp_outputs[i]->write_access( + [&codec, &packed, i, in_bytes, &comp_output_sizes, &comp_streams, br]( + std::byte* out_ptr, rmm::cuda_stream_view out_stream + ) { + // Lock input for raw pointer access + auto* in_raw = item_data_buffer(packed.items[i]).exclusive_data_access(); + codec.compress( + static_cast(in_raw), + in_bytes, + static_cast(out_ptr), + &comp_output_sizes[i], + out_stream, + br + ); + comp_streams[i] = out_stream; + } + ); + } + // Synchronize streams and unlock inputs + for (std::size_t i = 0; i < packed.items.size(); ++i) { + auto const in_bytes = item_data_buffer(packed.items[i]).size; + if (in_bytes == 0) { + continue; + } + RAPIDSMPF_CUDA_TRY(cudaStreamSynchronize(comp_streams[i].value())); + item_data_buffer(packed.items[i]).unlock(); + } + auto t1 = Clock::now(); + + // Phase A (RTT no compression): ping-pong per op (sequential per item) + Duration rt_nc_total{0}; + for (std::uint64_t op = 0; op < args.num_ops; ++op) { + bool initiator = + ((static_cast(rank) + op + run_index) % 2ull) == 0ull; + auto rt_start = Clock::now(); + Rank peer = initiator ? dst : src; + Tag send_tag_nc = initiator ? tag_ping_nc : tag_pong_nc; + Tag recv_tag_nc = initiator ? tag_pong_nc : tag_ping_nc; + for (std::size_t i = 0; i < nocomp_payloads.size(); ++i) { + // Exchange payload sizes + std::uint64_t local_sz = static_cast(nocomp_payloads[i]->size); + std::uint64_t remote_sz = exchange_u64_header( + comm, br, stream, peer, send_tag_nc, recv_tag_nc, local_sz + ); + std::unique_ptr send_buf; + if (local_sz > 0) { + send_buf = alloc_and_copy_device(br, stream, *nocomp_payloads[i]); + } + exchange_payload( + comm, + br, + stream, + peer, + send_tag_nc, + recv_tag_nc, + std::move(send_buf), + static_cast(remote_sz) + ); + } + auto rt_end = Clock::now(); + // Each rank measures its own RTT locally + rt_nc_total += (rt_end - rt_start); + } + + // Phase B (RTT compressed payload only): ping-pong with size headers per item + Duration rt_c_total{0}; + for (std::uint64_t op = 0; op < args.num_ops; ++op) { + bool initiator = + ((static_cast(rank) + op + run_index) % 2ull) == 0ull; + auto rt_start = Clock::now(); + Rank peer = initiator ? dst : src; + Tag send_tag_c = initiator ? tag_ping_c : tag_pong_c; + Tag recv_tag_c = initiator ? tag_pong_c : tag_ping_c; + for (std::size_t i = 0; i < packed.items.size(); ++i) { + // Header exchange: send our size, receive peer size + std::uint64_t local_sz = static_cast(comp_output_sizes[i]); + std::uint64_t remote_sz = exchange_u64_header( + comm, br, stream, peer, send_tag_c, recv_tag_c, local_sz + ); + // Prepare send buffer if needed + std::unique_ptr send_buf; + if (local_sz > 0) { + send_buf = alloc_device(br, stream, local_sz); + if (comp_output_sizes[i] > 0) + buffer_copy(*send_buf, *comp_outputs[i], comp_output_sizes[i]); + } + // Payload exchange using the same tags + exchange_payload( + comm, + br, + stream, + peer, + send_tag_c, + recv_tag_c, + std::move(send_buf), + static_cast(remote_sz) + ); + } + auto rt_end = Clock::now(); + rt_c_total += (rt_end - rt_start); + } + + // Decompress received buffers (simulate by decompressing our own produced outputs) + auto c0 = Clock::now(); + std::vector decomp_streams(packed.items.size()); + for (std::size_t i = 0; i < packed.items.size(); ++i) { + auto const out_bytes = item_data_buffer(packed.items[i]).size; + if (out_bytes == 0) { + continue; + } + auto res = br->reserve_or_fail(out_bytes, MemoryType::DEVICE); + auto out = br->allocate(out_bytes, stream, res); + out->write_access( + [&codec, &comp_outputs, &comp_output_sizes, &decomp_streams, i, out_bytes]( + std::byte* out_ptr, rmm::cuda_stream_view out_stream + ) { + auto* in_raw = comp_outputs[i]->exclusive_data_access(); + codec.decompress( + static_cast(in_raw), + comp_output_sizes[i], + static_cast(out_ptr), + out_bytes, + out_stream + ); + decomp_streams[i] = out_stream; + } + ); + } + // Synchronize each decomp stream and then unlock the corresponding input + for (std::size_t i = 0; i < packed.items.size(); ++i) { + auto const out_bytes = item_data_buffer(packed.items[i]).size; + if (out_bytes == 0) { + continue; + } + RAPIDSMPF_CUDA_TRY(cudaStreamSynchronize(decomp_streams[i].value())); + comp_outputs[i]->unlock(); + } + auto c1 = Clock::now(); + + RunResult result{}; + result.times.pack_s = std::chrono::duration(p1 - p0).count(); + result.times.compress_s = std::chrono::duration(t1 - t0).count(); + result.times.rt_nocomp_s = rt_nc_total.count(); + result.times.rt_comp_s = rt_c_total.count(); + result.times.decompress_s = std::chrono::duration(c1 - c0).count(); + + // Use payload bytes as the logical uncompressed size for throughput + result.counts.logical_uncompressed_bytes = packed.total_payload_bytes * args.num_ops; + result.counts.logical_compressed_bytes = + std::accumulate( + comp_output_sizes.begin(), comp_output_sizes.end(), std::size_t{0} + ) + * args.num_ops; + return result; +} + +} // namespace + +int main(int argc, char** argv) { + // Check if we should use bootstrap mode with rrun + // This is determined by checking for RAPIDSMPF_RANK environment variable + bool use_bootstrap = std::getenv("RAPIDSMPF_RANK") != nullptr; + + int provided = 0; + if (!use_bootstrap) { + // Explicitly initialize MPI with thread support, as this is needed for both mpi + // and ucxx communicators. + RAPIDSMPF_MPI(MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided)); + RAPIDSMPF_EXPECTS( + provided == MPI_THREAD_MULTIPLE, + "didn't get the requested thread level support: MPI_THREAD_MULTIPLE" + ); + } + + ArgumentParser parser{argc, argv, !use_bootstrap}; + Args const& args = parser.get(); + + // Initialize configuration options from environment variables. + rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables()}; + + std::shared_ptr comm; + if (args.comm_type == "mpi") { + if (use_bootstrap) { + std::cerr << "Error: MPI communicator requires MPI initialization. Don't use " + "with rrun or unset RAPIDSMPF_RANK." + << std::endl; + return 1; + } + mpi::init(&argc, &argv); + comm = std::make_shared(MPI_COMM_WORLD, options); + } else if (args.comm_type == "ucxx") { + if (use_bootstrap) { + // Launched with rrun - use bootstrap backend + comm = rapidsmpf::bootstrap::create_ucxx_comm( + rapidsmpf::bootstrap::Backend::AUTO, options + ); + } else { + // Launched with mpirun - use MPI bootstrap + comm = rapidsmpf::ucxx::init_using_mpi(MPI_COMM_WORLD, options); + } + } else { + std::cerr << "Unknown communicator: " << args.comm_type << std::endl; + return 1; + } + + auto& log = comm->logger(); + rmm::cuda_stream_view stream = cudf::get_default_stream(); + + auto const mr_stack = set_current_rmm_stack(args.rmm_mr); + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref(); + BufferResource br{mr}; + + // Print benchmark/hardware info. + { + std::stringstream ss; + auto const cur_dev = rmm::get_current_cuda_device().value(); + std::string pci_bus_id(16, '\0'); + RAPIDSMPF_CUDA_TRY( + cudaDeviceGetPCIBusId(pci_bus_id.data(), pci_bus_id.size(), cur_dev) + ); + cudaDeviceProp properties; + RAPIDSMPF_CUDA_TRY(cudaGetDeviceProperties(&properties, 0)); + ss << "Hardware setup: \n"; + ss << " GPU (" << properties.name << "): \n"; + ss << " Device number: " << cur_dev << "\n"; + ss << " PCI Bus ID: " << pci_bus_id.substr(0, pci_bus_id.find('\0')) << "\n"; + ss << " Total Memory: " << format_nbytes(properties.totalGlobalMem, 0) << "\n"; + ss << " Comm: " << *comm << "\n"; + log.print(ss.str()); + } + +#ifdef RAPIDSMPF_HAVE_CUPTI + // Create CUPTI monitor if enabled + std::unique_ptr cupti_monitor; + if (args.enable_cupti_monitoring) { + cupti_monitor = std::make_unique(); + cupti_monitor->start_monitoring(); + log.print("CUPTI memory monitoring enabled"); + } +#endif + + // File selection per rank + auto files = expand_glob(args.file_pattern); + if (files.empty()) { + if (comm->rank() == 0) + log.print("No files matched pattern: " + args.file_pattern); + if (!use_bootstrap) + RAPIDSMPF_MPI(MPI_Finalize()); + return 1; + } + auto my_file = files[static_cast(comm->rank()) % files.size()]; + if (comm->rank() == 0) + log.print( + "Using file pattern: " + args.file_pattern + ", first file: " + files.front() + ); + log.print("Rank " + std::to_string(comm->rank()) + " reading: " + my_file); + + cudf::io::parquet_reader_options reader_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{my_file}); + auto table_with_md = cudf::io::read_parquet(reader_opts); + auto& table = table_with_md.tbl; + + auto codec = make_codec(args.algo, args.params); + + std::vector pack_t, compress_t, decompress_t, rt_nc_t, rt_c_t; + pack_t.reserve(args.num_runs); + compress_t.reserve(args.num_runs); + decompress_t.reserve(args.num_runs); + rt_nc_t.reserve(args.num_runs); + rt_c_t.reserve(args.num_runs); + + std::size_t logical_bytes = 0; + std::size_t logical_compressed_bytes_last = 0; + + for (std::uint64_t i = 0; i < args.num_warmups + args.num_runs; ++i) { + auto rr = run_once(comm, args, stream, &br, *table, args.pack_mode, *codec, i); + + logical_bytes = rr.counts.logical_uncompressed_bytes; + double pBps = + static_cast(rr.counts.logical_uncompressed_bytes) / rr.times.pack_s; + double cBps = static_cast(rr.counts.logical_uncompressed_bytes) + / rr.times.compress_s; + double dBps = static_cast(rr.counts.logical_uncompressed_bytes) + / rr.times.decompress_s; + logical_compressed_bytes_last = rr.counts.logical_compressed_bytes; + double ratio = (rr.counts.logical_compressed_bytes > 0) + ? static_cast(rr.counts.logical_uncompressed_bytes) + / static_cast(rr.counts.logical_compressed_bytes) + : 0.0; + // Round-trip one-way throughput: 2 * bytes_one_way / RTT + double rt_nc_Bps = + rr.times.rt_nocomp_s > 0.0 + ? (2.0 * static_cast(rr.counts.logical_uncompressed_bytes)) + / rr.times.rt_nocomp_s + : 0.0; + double const rt_comp_total_s = + rr.times.compress_s + rr.times.rt_comp_s + rr.times.decompress_s; + double rt_c_Bps = + rt_comp_total_s > 0.0 + ? (2.0 * static_cast(rr.counts.logical_uncompressed_bytes)) + / rt_comp_total_s + : 0.0; + + std::stringstream ss; + ss << "pack: " << format_nbytes(pBps) << "/s | compress: " << format_nbytes(cBps) + << "/s | decompress: " << format_nbytes(dBps) + << "/s | rt(nocomp): " << format_nbytes(rt_nc_Bps) + << "/s | rt(comp): " << format_nbytes(rt_c_Bps) << "/s" + << " | comp ratio: " << std::fixed << std::setprecision(2) << ratio << "x"; + if (i < args.num_warmups) + ss << " (warmup run)"; + log.print(ss.str()); + + if (i >= args.num_warmups) { + pack_t.push_back( + static_cast(rr.counts.logical_uncompressed_bytes) / pBps + ); + compress_t.push_back( + static_cast(rr.counts.logical_uncompressed_bytes) / cBps + ); + decompress_t.push_back( + static_cast(rr.counts.logical_uncompressed_bytes) / dBps + ); + rt_nc_t.push_back(rr.times.rt_nocomp_s); + rt_c_t.push_back(rt_comp_total_s); + } + } + + auto harmonic_mean = [](std::vector const& v) { + double denom_sum = 0.0; + for (auto x : v) + denom_sum += 1.0 / x; + return static_cast(v.size()) / denom_sum; + }; + + if (!compress_t.empty()) { + double mean_elapsed_p = harmonic_mean(pack_t); + double mean_elapsed_c = harmonic_mean(compress_t); + double mean_elapsed_d = harmonic_mean(decompress_t); + double mean_rt_nc = harmonic_mean(rt_nc_t); + double mean_rt_c = harmonic_mean(rt_c_t); + + std::stringstream ss; + ss << "means: pack: " << format_nbytes(logical_bytes / mean_elapsed_p) << "/s" + << " | compress: " << format_nbytes(logical_bytes / mean_elapsed_c) << "/s" + << " | decompress: " << format_nbytes(logical_bytes / mean_elapsed_d) << "/s" + << " | rt(nocomp): " + << format_nbytes((2.0 * static_cast(logical_bytes)) / mean_rt_nc) + << "/s | rt(comp): " + << format_nbytes((2.0 * static_cast(logical_bytes)) / mean_rt_c) + << "/s"; + if (logical_compressed_bytes_last > 0) { + double mean_ratio = static_cast(logical_bytes) + / static_cast(logical_compressed_bytes_last); + ss << " | comp ratio: " << std::fixed << std::setprecision(2) << mean_ratio + << "x"; + } else { + ss << " | comp ratio: n/a"; + } + log.print(ss.str()); + } + +#ifdef RAPIDSMPF_HAVE_CUPTI + if (args.enable_cupti_monitoring && cupti_monitor) { + cupti_monitor->stop_monitoring(); + std::string csv_filename = + args.cupti_csv_prefix + std::to_string(comm->rank()) + ".csv"; + try { + cupti_monitor->write_csv(csv_filename); + log.print( + "CUPTI memory data written to " + csv_filename + " (" + + std::to_string(cupti_monitor->get_sample_count()) + " samples, " + + std::to_string(cupti_monitor->get_total_callback_count()) + + " callbacks)" + ); + if (comm->rank() == 0) { + log.print( + "CUPTI Callback Summary:\n" + cupti_monitor->get_callback_summary() + ); + } + } catch (std::exception const& e) { + log.print("Failed to write CUPTI CSV file: " + std::string(e.what())); + } + } +#endif + + if (!use_bootstrap) { + RAPIDSMPF_MPI(MPI_Finalize()); + } + return 0; +} diff --git a/cpp/include/rapidsmpf/nvcomp.h b/cpp/include/rapidsmpf/nvcomp.h new file mode 100644 index 000000000..5b4038348 --- /dev/null +++ b/cpp/include/rapidsmpf/nvcomp.h @@ -0,0 +1,116 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include + +#include + +namespace rapidsmpf { + +class BufferResource; // forward declaration + +enum class Algo { + Cascaded, + LZ4, + Zstd, + Snappy +}; + +/** + * @brief Parameters for nvCOMP codec configuration + * + * Holds configuration parameters for both generic and algorithm-specific compression + * settings. + */ +struct KvParams { + /// Chunk size for compression operations (default: 1 MiB) + std::size_t chunk_size{1 << 20}; + + /// Number of run-length encoding passes in Cascaded codec (must be non-negative, + /// default: 1) + int cascaded_rle{1}; + + /// Number of delta encoding passes in Cascaded codec (must be non-negative, default: + /// 1) + int cascaded_delta{1}; + + /// Enable bitpacking in Cascaded codec (default: enabled) + bool cascaded_bitpack{true}; +}; + +/** + * @brief Abstract base class for nvCOMP codec implementations + * + * Provides a unified interface for different compression algorithms (LZ4, Cascaded, etc.) + * to perform compression and decompression operations on GPU device memory. + */ +class NvcompCodec { + public: + virtual ~NvcompCodec() = default; + + /** + * @brief Calculate the maximum compressed size for the given input size + * + * @param uncompressed_bytes Size of the uncompressed data in bytes + * @param stream CUDA stream for operations + * @return Maximum possible compressed size in bytes + */ + virtual std::size_t get_max_compressed_bytes( + std::size_t uncompressed_bytes, rmm::cuda_stream_view stream + ) = 0; + + /** + * @brief Compress data on the GPU + * + * @param d_in Pointer to uncompressed data on device + * @param in_bytes Size of uncompressed data in bytes + * @param d_out Pointer to output buffer on device for compressed data + * @param out_bytes Pointer to output variable that will be set to actual compressed + * size + * @param stream CUDA stream for operations + * @param br Optional buffer resource used for temporary allocations (e.g., to capture + * compressed size on device and copy back to host). + */ + virtual void compress( + void const* d_in, + std::size_t in_bytes, + void* d_out, + std::size_t* out_bytes, + rmm::cuda_stream_view stream, + BufferResource* br = nullptr + ) = 0; + + /** + * @brief Decompress data on the GPU + * + * @param d_in Pointer to compressed data on device + * @param in_bytes Size of compressed data in bytes + * @param d_out Pointer to output buffer on device for decompressed data + * @param out_bytes Expected size of decompressed data in bytes + * @param stream CUDA stream for operations + */ + virtual void decompress( + void const* d_in, + std::size_t in_bytes, + void* d_out, + std::size_t out_bytes, + rmm::cuda_stream_view stream + ) = 0; +}; + +/** + * @brief Create an nvCOMP codec instance + * + * @param algo The compression algorithm to use + * @param p Parameters for the codec + * @return A unique pointer to an NvcompCodec instance + */ +std::unique_ptr make_codec(Algo algo, KvParams const& p); + +} // namespace rapidsmpf diff --git a/cpp/src/nvcomp.cpp b/cpp/src/nvcomp.cpp new file mode 100644 index 000000000..9abad0310 --- /dev/null +++ b/cpp/src/nvcomp.cpp @@ -0,0 +1,332 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include + +#include +#include + +#include + +#include +#include + +#include +#include +#include +#include + +namespace rapidsmpf { + +namespace { +template +void invoke_compress_with_device_size_buffer( + InvokeCompressFn&& invoke_compress, + std::size_t* out_bytes, + rmm::cuda_stream_view stream, + BufferResource* br +) { + if (br != nullptr) { + auto reservation = br->reserve_or_fail(sizeof(size_t), MemoryType::DEVICE); + auto size_buf = br->allocate(sizeof(size_t), stream, reservation); + size_buf->write_access([&](std::byte* sz_ptr, rmm::cuda_stream_view s) { + invoke_compress(reinterpret_cast(sz_ptr)); + RAPIDSMPF_CUDA_TRY(cudaMemcpyAsync( + out_bytes, sz_ptr, sizeof(size_t), cudaMemcpyDeviceToHost, s + )); + }); + RAPIDSMPF_CUDA_TRY(cudaStreamSynchronize(stream.value())); + } else { + size_t* d_size = nullptr; + RAPIDSMPF_CUDA_TRY(cudaMallocAsync( + reinterpret_cast(&d_size), sizeof(size_t), stream.value() + )); + invoke_compress(d_size); + RAPIDSMPF_CUDA_TRY(cudaMemcpyAsync( + out_bytes, d_size, sizeof(size_t), cudaMemcpyDeviceToHost, stream.value() + )); + RAPIDSMPF_CUDA_TRY(cudaStreamSynchronize(stream.value())); + RAPIDSMPF_CUDA_TRY(cudaFreeAsync(d_size, stream.value())); + } +} +} // namespace + +class LZ4Codec final : public NvcompCodec { + public: + explicit LZ4Codec(std::size_t chunk_size) : chunk_size_{chunk_size} {} + + std::size_t get_max_compressed_bytes( + std::size_t in_bytes, rmm::cuda_stream_view stream + ) override { + nvcompBatchedLZ4CompressOpts_t copts = nvcompBatchedLZ4CompressDefaultOpts; + nvcompBatchedLZ4DecompressOpts_t dopts = nvcompBatchedLZ4DecompressDefaultOpts; + nvcomp::LZ4Manager mgr{chunk_size_, copts, dopts, stream.value()}; + auto cfg = mgr.configure_compression(in_bytes); + return cfg.max_compressed_buffer_size; + } + + void compress( + void const* d_in, + std::size_t in_bytes, + void* d_out, + std::size_t* out_bytes, + rmm::cuda_stream_view stream, + BufferResource* br + ) override { + nvcompBatchedLZ4CompressOpts_t copts = nvcompBatchedLZ4CompressDefaultOpts; + nvcompBatchedLZ4DecompressOpts_t dopts = nvcompBatchedLZ4DecompressDefaultOpts; + nvcomp::LZ4Manager mgr{chunk_size_, copts, dopts, stream.value()}; + auto cfg = mgr.configure_compression(in_bytes); + invoke_compress_with_device_size_buffer( + [&](size_t* sz_ptr) { + mgr.compress( + static_cast(d_in), + static_cast(d_out), + cfg, + sz_ptr + ); + }, + out_bytes, + stream, + br + ); + } + + void decompress( + void const* d_in, + std::size_t in_bytes, + void* d_out, + std::size_t out_bytes, + rmm::cuda_stream_view stream + ) override { + (void)out_bytes; + nvcompBatchedLZ4CompressOpts_t copts = nvcompBatchedLZ4CompressDefaultOpts; + nvcompBatchedLZ4DecompressOpts_t dopts = nvcompBatchedLZ4DecompressDefaultOpts; + nvcomp::LZ4Manager mgr{chunk_size_, copts, dopts, stream.value()}; + const uint8_t* in_ptrs[1] = {static_cast(d_in)}; + size_t in_sizes[1] = {in_bytes}; + auto cfgs = mgr.configure_decompression(in_ptrs, 1, in_sizes); + uint8_t* out_ptrs[1] = {static_cast(d_out)}; + mgr.decompress(out_ptrs, in_ptrs, cfgs, nullptr); + } + + private: + std::size_t chunk_size_; +}; + +class CascadedCodec final : public NvcompCodec { + public: + CascadedCodec(std::size_t chunk_size, int rle, int delta, bool bitpack) + : chunk_size_{chunk_size} { + copts_ = nvcompBatchedCascadedCompressDefaultOpts; + copts_.num_RLEs = rle; + copts_.num_deltas = delta; + copts_.use_bp = bitpack ? 1 : 0; + dopts_ = nvcompBatchedCascadedDecompressDefaultOpts; + } + + std::size_t get_max_compressed_bytes( + std::size_t in_bytes, rmm::cuda_stream_view stream + ) override { + nvcomp::CascadedManager mgr{chunk_size_, copts_, dopts_, stream.value()}; + auto cfg = mgr.configure_compression(in_bytes); + return cfg.max_compressed_buffer_size; + } + + void compress( + void const* d_in, + std::size_t in_bytes, + void* d_out, + std::size_t* out_bytes, + rmm::cuda_stream_view stream, + BufferResource* br + ) override { + nvcomp::CascadedManager mgr{chunk_size_, copts_, dopts_, stream.value()}; + auto cfg = mgr.configure_compression(in_bytes); + invoke_compress_with_device_size_buffer( + [&](size_t* sz_ptr) { + mgr.compress( + static_cast(d_in), + static_cast(d_out), + cfg, + sz_ptr + ); + }, + out_bytes, + stream, + br + ); + } + + void decompress( + void const* d_in, + std::size_t in_bytes, + void* d_out, + std::size_t out_bytes, + rmm::cuda_stream_view stream + ) override { + (void)out_bytes; + nvcomp::CascadedManager mgr{chunk_size_, copts_, dopts_, stream.value()}; + const uint8_t* in_ptrs[1] = {static_cast(d_in)}; + size_t in_sizes[1] = {in_bytes}; + auto cfgs = mgr.configure_decompression(in_ptrs, 1, in_sizes); + uint8_t* out_ptrs[1] = {static_cast(d_out)}; + mgr.decompress(out_ptrs, in_ptrs, cfgs, nullptr); + } + + private: + std::size_t chunk_size_{}; + nvcompBatchedCascadedCompressOpts_t copts_{}; + nvcompBatchedCascadedDecompressOpts_t dopts_{}; +}; + +class SnappyCodec final : public NvcompCodec { + public: + explicit SnappyCodec(std::size_t chunk_size) : chunk_size_{chunk_size} {} + + std::size_t get_max_compressed_bytes( + std::size_t in_bytes, rmm::cuda_stream_view stream + ) override { + nvcompBatchedSnappyCompressOpts_t copts = nvcompBatchedSnappyCompressDefaultOpts; + nvcompBatchedSnappyDecompressOpts_t dopts = + nvcompBatchedSnappyDecompressDefaultOpts; + nvcomp::SnappyManager mgr{chunk_size_, copts, dopts, stream.value()}; + auto cfg = mgr.configure_compression(in_bytes); + return cfg.max_compressed_buffer_size; + } + + void compress( + void const* d_in, + std::size_t in_bytes, + void* d_out, + std::size_t* out_bytes, + rmm::cuda_stream_view stream, + BufferResource* br + ) override { + nvcompBatchedSnappyCompressOpts_t copts = nvcompBatchedSnappyCompressDefaultOpts; + nvcompBatchedSnappyDecompressOpts_t dopts = + nvcompBatchedSnappyDecompressDefaultOpts; + nvcomp::SnappyManager mgr{chunk_size_, copts, dopts, stream.value()}; + auto cfg = mgr.configure_compression(in_bytes); + invoke_compress_with_device_size_buffer( + [&](size_t* sz_ptr) { + mgr.compress( + static_cast(d_in), + static_cast(d_out), + cfg, + sz_ptr + ); + }, + out_bytes, + stream, + br + ); + } + + void decompress( + void const* d_in, + std::size_t in_bytes, + void* d_out, + std::size_t out_bytes, + rmm::cuda_stream_view stream + ) override { + (void)out_bytes; + nvcompBatchedSnappyCompressOpts_t copts = nvcompBatchedSnappyCompressDefaultOpts; + nvcompBatchedSnappyDecompressOpts_t dopts = + nvcompBatchedSnappyDecompressDefaultOpts; + nvcomp::SnappyManager mgr{chunk_size_, copts, dopts, stream.value()}; + const uint8_t* in_ptrs[1] = {static_cast(d_in)}; + size_t in_sizes[1] = {in_bytes}; + auto cfgs = mgr.configure_decompression(in_ptrs, 1, in_sizes); + uint8_t* out_ptrs[1] = {static_cast(d_out)}; + mgr.decompress(out_ptrs, in_ptrs, cfgs, nullptr); + } + + private: + std::size_t chunk_size_; +}; + +class ZstdCodec final : public NvcompCodec { + public: + explicit ZstdCodec(std::size_t chunk_size) : chunk_size_{chunk_size} {} + + std::size_t get_max_compressed_bytes( + std::size_t in_bytes, rmm::cuda_stream_view stream + ) override { + nvcompBatchedZstdCompressOpts_t copts = nvcompBatchedZstdCompressDefaultOpts; + nvcompBatchedZstdDecompressOpts_t dopts = nvcompBatchedZstdDecompressDefaultOpts; + nvcomp::ZstdManager mgr{chunk_size_, copts, dopts, stream.value()}; + auto cfg = mgr.configure_compression(in_bytes); + return cfg.max_compressed_buffer_size; + } + + void compress( + void const* d_in, + std::size_t in_bytes, + void* d_out, + std::size_t* out_bytes, + rmm::cuda_stream_view stream, + BufferResource* br + ) override { + nvcompBatchedZstdCompressOpts_t copts = nvcompBatchedZstdCompressDefaultOpts; + nvcompBatchedZstdDecompressOpts_t dopts = nvcompBatchedZstdDecompressDefaultOpts; + nvcomp::ZstdManager mgr{chunk_size_, copts, dopts, stream.value()}; + auto cfg = mgr.configure_compression(in_bytes); + invoke_compress_with_device_size_buffer( + [&](size_t* sz_ptr) { + mgr.compress( + static_cast(d_in), + static_cast(d_out), + cfg, + sz_ptr + ); + }, + out_bytes, + stream, + br + ); + } + + void decompress( + void const* d_in, + std::size_t in_bytes, + void* d_out, + std::size_t out_bytes, + rmm::cuda_stream_view stream + ) override { + (void)out_bytes; + nvcompBatchedZstdCompressOpts_t copts = nvcompBatchedZstdCompressDefaultOpts; + nvcompBatchedZstdDecompressOpts_t dopts = nvcompBatchedZstdDecompressDefaultOpts; + nvcomp::ZstdManager mgr{chunk_size_, copts, dopts, stream.value()}; + const uint8_t* in_ptrs[1] = {static_cast(d_in)}; + size_t in_sizes[1] = {in_bytes}; + auto cfgs = mgr.configure_decompression(in_ptrs, 1, in_sizes); + uint8_t* out_ptrs[1] = {static_cast(d_out)}; + mgr.decompress(out_ptrs, in_ptrs, cfgs, nullptr); + } + + private: + std::size_t chunk_size_; +}; + +std::unique_ptr make_codec(Algo algo, KvParams const& p) { + switch (algo) { + case Algo::LZ4: + return std::make_unique(p.chunk_size); + case Algo::Zstd: + return std::make_unique(p.chunk_size); + case Algo::Snappy: + return std::make_unique(p.chunk_size); + case Algo::Cascaded: + default: + return std::make_unique( + p.chunk_size, p.cascaded_rle, p.cascaded_delta, p.cascaded_bitpack + ); + } +} + +} // namespace rapidsmpf