From f028ebe01d4386b5d86bf6074bbcb1f32a86decd Mon Sep 17 00:00:00 2001 From: Felix Chern Date: Mon, 9 Jun 2025 08:37:50 -0700 Subject: [PATCH] Adding a new option to reduce memory footprint of the record/chunk indices. This trade-offs speed for smaller memory usages. PiperOrigin-RevId: 769158954 --- cpp/array_record_reader.cc | 205 ++++++++++++++++++++++++-------- cpp/array_record_reader.h | 31 ++++- cpp/array_record_reader_test.cc | 12 +- 3 files changed, 194 insertions(+), 54 deletions(-) diff --git a/cpp/array_record_reader.cc b/cpp/array_record_reader.cc index cd7a39a..cf0f5c0 100644 --- a/cpp/array_record_reader.cc +++ b/cpp/array_record_reader.cc @@ -75,6 +75,84 @@ T CeilOfRatio(T x, T d) { return (x + d - 1) / d; } +// Abstract class for different index storage options. +class ChunkOffset { + public: + virtual ~ChunkOffset() {} + virtual uint64_t operator[](size_t idx) const = 0; + virtual size_t size() const = 0; + bool empty() const { return size() == 0; } +}; + +class ArrayChunkOffset : public ChunkOffset { + public: + static absl::StatusOr> Build( + ChunkDecoder& footer_decoder, uint64_t num_chunks) { + std::vector chunk_offsets; + chunk_offsets.reserve(num_chunks); + footer_decoder.SetIndex(1); + ArrayRecordFooter footer; + for (size_t i = 0; i < num_chunks; ++i) { + if (!footer_decoder.ReadRecord(footer)) { + return Annotate(footer_decoder.status(), + absl::StrCat("Failed to read at footer record: ", i, + ". Total num chunkls: ", num_chunks)); + } + if (!footer.has_chunk_offset()) { + return InvalidArgumentError("Invalid footer at index: %d", i); + } + chunk_offsets.push_back(footer.chunk_offset()); + } + return std::unique_ptr( + new ArrayChunkOffset(std::move(chunk_offsets))); + }; + + uint64_t operator[](size_t idx) const override { return chunk_offsets_[idx]; } + + uint64_t size() const override { return chunk_offsets_.size(); } + + private: + ArrayChunkOffset(std::vector chunk_offsets) + : chunk_offsets_(std::move(chunk_offsets)) {} + + std::vector chunk_offsets_; +}; + +class OffloadedChunkOffset : public ChunkOffset { + public: + OffloadedChunkOffset(ArrayRecordReaderBase* base_reader, + uint64_t footer_offset, uint64_t num_chunks) + : base_reader_(base_reader), + footer_offset_(footer_offset), + num_chunks_(num_chunks) {} + + uint64_t operator[](size_t idx) const override { + auto backing_reader = base_reader_->get_backing_reader(); + auto reader = backing_reader->NewReader(footer_offset_); + auto chunk_reader = riegeli::DefaultChunkReader<>(reader.get()); + Chunk chunk; + ChunkDecoder footer_decoder; + chunk_reader.ReadChunk(chunk); + footer_decoder.Decode(chunk); + // First item is the footer_metadata, so we want to skip. + footer_decoder.SetIndex(idx + 1); + ArrayRecordFooter footer; + footer_decoder.ReadRecord(footer); + return footer.chunk_offset(); + } + + uint64_t size() const override { return num_chunks_; } + + void update_base_reader(ArrayRecordReaderBase* base_reader) { + base_reader_ = base_reader; + } + + private: + ArrayRecordReaderBase* base_reader_; + uint64_t footer_offset_; + uint64_t num_chunks_; +}; + absl::StatusOr ArrayRecordReaderBase::Options::FromString(absl::string_view text) { ArrayRecordReaderBase::Options options; @@ -94,6 +172,13 @@ ArrayRecordReaderBase::Options::FromString(absl::string_view text) { &options.readahead_buffer_size_), ValueParser::Bytes(0, std::numeric_limits::max(), &options.readahead_buffer_size_))); + // Index storage option + options_parser.AddOption( + "index_storage_option", + ValueParser::Enum({{"", IndexStorageOption::kInMemory}, + {"in_memory", IndexStorageOption::kInMemory}, + {"offloaded", IndexStorageOption::kOffloaded}}, + &options.index_storage_option_)); if (!options_parser.FromString(text)) { return options_parser.status(); } @@ -115,7 +200,7 @@ struct ArrayRecordReaderBase::ArrayRecordReaderState { uint64_t record_group_size = 0; uint64_t chunk_group_size = 0; uint64_t footer_offset = 0; - std::vector chunk_offsets; + std::unique_ptr chunk_offsets; // Objects for managing sequential reads uint64_t record_idx = 0; @@ -128,10 +213,10 @@ struct ArrayRecordReaderBase::ArrayRecordReaderState { std::optional writer_options = std::nullopt; uint64_t ChunkEndOffset(uint64_t chunk_idx) const { - if (chunk_idx == chunk_offsets.size() - 1) { + if (chunk_idx == chunk_offsets->size() - 1) { return footer_offset; } - return chunk_offsets[chunk_idx + 1]; + return (*chunk_offsets)[chunk_idx + 1]; } }; @@ -144,6 +229,12 @@ ArrayRecordReaderBase::~ArrayRecordReaderBase() = default; ArrayRecordReaderBase::ArrayRecordReaderBase( ArrayRecordReaderBase&& other) noexcept : riegeli::Object(std::move(other)), state_(std::move(other.state_)) { + if (state_->options.index_storage_option() == + Options::IndexStorageOption::kOffloaded) { + OffloadedChunkOffset* chunk_offsets_impl = + dynamic_cast(state_->chunk_offsets.get()); + chunk_offsets_impl->update_base_reader(this); + } other.Reset(riegeli::kClosed); // NOLINT(bugprone-use-after-move) } @@ -153,6 +244,13 @@ ArrayRecordReaderBase& ArrayRecordReaderBase::operator=( riegeli::Object::operator=(static_cast(other)); // Move self state_ = std::move(other.state_); + + if (state_->options.index_storage_option() == + Options::IndexStorageOption::kOffloaded) { + OffloadedChunkOffset* chunk_offsets_impl = + dynamic_cast(state_->chunk_offsets.get()); + chunk_offsets_impl->update_base_reader(this); + } // Close other.Reset(riegeli::kClosed); return *this; @@ -247,10 +345,10 @@ void ArrayRecordReaderBase::Initialize() { postscript.DebugString())); return; } - state_->footer_offset = postscript.footer_offset(); - footer_decoder = - ReadChunk(*reader, postscript.footer_offset(), - size - kRiegeliBlockSize - postscript.footer_offset()); + auto footer_offset = postscript.footer_offset(); + state_->footer_offset = footer_offset; + footer_decoder = ReadChunk(*reader, footer_offset, + size - kRiegeliBlockSize - footer_offset); if (!footer_decoder.ReadRecord(footer_metadata)) { Fail(Annotate(footer_decoder.status(), @@ -277,48 +375,61 @@ void ArrayRecordReaderBase::Initialize() { } { AR_ENDO_SCOPE("Reading footer body"); - auto num_chunks = footer_metadata.array_record_metadata().num_chunks(); - state_->chunk_offsets.reserve(num_chunks); - for (int i = 0; i < num_chunks; ++i) { + + // Reads the group size from the first footer. This should match the + // group_size in writer options but writer_options is not always available. + if (footer_decoder.num_records() > 1) { ArrayRecordFooter footer; + footer_decoder.SetIndex(1); if (!footer_decoder.ReadRecord(footer)) { Fail(Annotate(footer_decoder.status(), "Failed to read ArrayRecordFooter")); return; } - if (!footer.has_chunk_offset()) { - Fail(InvalidArgumentError("Invalid footer")); - return; - } - if (i == 0) { - // Detect group size from first footer. This should match the group_size - // in writer options but writer_options is not always available. - state_->record_group_size = footer.num_records(); - } - state_->chunk_offsets.push_back(footer.chunk_offset()); + state_->record_group_size = footer.num_records(); + // Reset the index + footer_decoder.SetIndex(1); + } + + auto num_chunks = footer_metadata.array_record_metadata().num_chunks(); + switch (state_->options.index_storage_option()) { + case Options::IndexStorageOption::kInMemory: { + auto status_or_offsets = + ArrayChunkOffset::Build(footer_decoder, num_chunks); + if (!status_or_offsets.ok()) { + Fail(status_or_offsets.status()); + return; + } + state_->chunk_offsets = *std::move(status_or_offsets); + } break; + case Options::IndexStorageOption::kOffloaded: + state_->chunk_offsets = std::make_unique( + this, state_->footer_offset, num_chunks); + break; } - if (!state_->chunk_offsets.empty()) { + + if (!state_->chunk_offsets->empty()) { // Finds minimal chunk_group_size that is larger equals to the readahead // buffer. A chunk_group corresponds to a PRead call. Smaller // chunk_group_size is better for random access, the converse is better // for sequential reads. - for (auto i : Seq(state_->chunk_offsets.size())) { - uint64_t buf_size = - state_->ChunkEndOffset(i) - state_->chunk_offsets.front(); + uint64_t chunk_start_offset = (*state_->chunk_offsets)[0]; + for (auto i : Seq(state_->chunk_offsets->size())) { + uint64_t buf_size = state_->ChunkEndOffset(i) - chunk_start_offset; if (buf_size >= state_->options.readahead_buffer_size()) { state_->chunk_group_size = i + 1; break; } } if (!state_->chunk_group_size) { - state_->chunk_group_size = state_->chunk_offsets.size(); + state_->chunk_group_size = state_->chunk_offsets->size(); } } } } uint64_t ArrayRecordReaderBase::ChunkStartOffset(uint64_t chunk_idx) const { - return state_->chunk_offsets[chunk_idx]; + return (*state_->chunk_offsets)[chunk_idx]; } uint64_t ArrayRecordReaderBase::ChunkEndOffset(uint64_t chunk_idx) const { @@ -331,11 +442,11 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecords( if (!ok()) { return status(); } - if (state_->chunk_offsets.empty()) { + if (state_->chunk_offsets->empty()) { return absl::OkStatus(); } uint64_t num_chunk_groups = CeilOfRatio( - state_->chunk_offsets.size(), state_->chunk_group_size); + state_->chunk_offsets->size(), state_->chunk_group_size); auto reader = get_backing_reader(); auto status = ParallelForWithStatus<1>( Seq(num_chunk_groups), state_->pool, [&](size_t buf_idx) -> absl::Status { @@ -343,9 +454,9 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecords( // inclusive index, not the conventional exclusive index. uint64_t last_chunk_idx = std::min((buf_idx + 1) * state_->chunk_group_size - 1, - state_->chunk_offsets.size() - 1); + state_->chunk_offsets->size() - 1); uint64_t buf_len = state_->ChunkEndOffset(last_chunk_idx) - - state_->chunk_offsets[chunk_idx_start]; + (*state_->chunk_offsets)[chunk_idx_start]; AR_ENDO_JOB( "ArrayRecordReaderBase::ParallelReadRecords", absl::StrCat("buffer_idx: ", buf_idx, " buffer_len: ", buf_len)); @@ -354,13 +465,13 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecords( { AR_ENDO_SCOPE("MaskedReader"); masked_reader = MaskedReader( - reader->NewReader(state_->chunk_offsets[chunk_idx_start]), + reader->NewReader((*state_->chunk_offsets)[chunk_idx_start]), buf_len); } for (uint64_t chunk_idx = chunk_idx_start; chunk_idx <= last_chunk_idx; ++chunk_idx) { AR_ENDO_SCOPE("ChunkReader+ChunkDecoder"); - masked_reader.Seek(state_->chunk_offsets[chunk_idx]); + masked_reader.Seek((*state_->chunk_offsets)[chunk_idx]); riegeli::DefaultChunkReader<> chunk_reader(&masked_reader); Chunk chunk; if (ABSL_PREDICT_FALSE(!chunk_reader.ReadChunk(chunk))) { @@ -400,7 +511,7 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecordsInRange( if (!ok()) { return status(); } - if (state_->chunk_offsets.empty()) { + if (state_->chunk_offsets->empty()) { return absl::OkStatus(); } if (end > NumRecords() || begin >= end) { @@ -424,7 +535,7 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecordsInRange( chunk_idx_begin + (buf_idx + 1) * state_->chunk_group_size - 1, chunk_idx_end - 1); uint64_t buf_len = state_->ChunkEndOffset(last_chunk_idx) - - state_->chunk_offsets[chunk_idx_start]; + (*state_->chunk_offsets)[chunk_idx_start]; AR_ENDO_JOB( "ArrayRecordReaderBase::ParallelReadRecordsWithRange", absl::StrCat("buffer_idx: ", buf_idx, " buffer_len: ", buf_len)); @@ -433,13 +544,13 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecordsInRange( { AR_ENDO_SCOPE("MaskedReader"); masked_reader = MaskedReader( - reader->NewReader(state_->chunk_offsets[chunk_idx_start]), + reader->NewReader((*state_->chunk_offsets)[chunk_idx_start]), buf_len); } for (uint64_t chunk_idx = chunk_idx_start; chunk_idx <= last_chunk_idx; ++chunk_idx) { AR_ENDO_SCOPE("ChunkReader+ChunkDecoder"); - masked_reader.Seek(state_->chunk_offsets[chunk_idx]); + masked_reader.Seek((*state_->chunk_offsets)[chunk_idx]); riegeli::DefaultChunkReader<> chunk_reader(&masked_reader); Chunk chunk; if (ABSL_PREDICT_FALSE(!chunk_reader.ReadChunk(chunk))) { @@ -489,7 +600,7 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecordsWithIndices( if (!ok()) { return status(); } - if (state_->chunk_offsets.empty()) { + if (state_->chunk_offsets->empty()) { return absl::OkStatus(); } @@ -505,7 +616,7 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecordsWithIndices( }; std::vector> per_chunk_indices( - state_->chunk_offsets.size()); + state_->chunk_offsets->size()); std::vector> chunk_indices_per_buffer; for (auto [indices_idx, record_idx] : Enumerate(indices)) { if (record_idx >= state_->num_records) { @@ -517,7 +628,7 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecordsWithIndices( per_chunk_indices[chunk_idx].emplace_back(local_idx, indices_idx); } bool in_buffer = false; - for (auto i : Seq(state_->chunk_offsets.size())) { + for (auto i : Seq(state_->chunk_offsets->size())) { // Find the first chunk containing indices if (!in_buffer) { if (!per_chunk_indices[i].empty()) { @@ -544,7 +655,7 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecordsWithIndices( auto buffer_chunks = absl::MakeConstSpan(chunk_indices_per_buffer[buf_idx]); uint64_t buf_len = state_->ChunkEndOffset(buffer_chunks.back()) - - state_->chunk_offsets[buffer_chunks[0]]; + (*state_->chunk_offsets)[buffer_chunks[0]]; AR_ENDO_JOB( "ArrayRecordReaderBase::ParallelReadRecordsWithIndices", absl::StrCat("buffer_idx: ", buf_idx, " buffer_len: ", buf_len)); @@ -552,12 +663,12 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecordsWithIndices( { AR_ENDO_SCOPE("MaskedReader"); masked_reader = MaskedReader( - reader->NewReader(state_->chunk_offsets[buffer_chunks[0]]), + reader->NewReader((*state_->chunk_offsets)[buffer_chunks[0]]), buf_len); } for (auto chunk_idx : buffer_chunks) { AR_ENDO_SCOPE("ChunkReader+ChunkDecoder"); - masked_reader.Seek(state_->chunk_offsets[chunk_idx]); + masked_reader.Seek((*state_->chunk_offsets)[chunk_idx]); riegeli::DefaultChunkReader<> chunk_reader(&masked_reader); Chunk chunk; if (ABSL_PREDICT_FALSE(!chunk_reader.ReadChunk(chunk))) { @@ -667,11 +778,11 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) { decoders.reserve(state_->chunk_group_size); uint64_t chunk_start = buffer_idx * state_->chunk_group_size; uint64_t chunk_end = - std::min(state_->chunk_offsets.size(), + std::min(state_->chunk_offsets->size(), (buffer_idx + 1) * state_->chunk_group_size); auto reader = get_backing_reader(); for (uint64_t chunk_idx = chunk_start; chunk_idx < chunk_end; ++chunk_idx) { - uint64_t chunk_offset = state_->chunk_offsets[chunk_idx]; + uint64_t chunk_offset = (*state_->chunk_offsets)[chunk_idx]; uint64_t chunk_end_offset = state_->ChunkEndOffset(chunk_idx); decoders.push_back( ReadChunk(*reader, chunk_offset, chunk_end_offset - chunk_offset)); @@ -692,7 +803,7 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) { while (state_->future_decoders.size() < max_parallelism) { uint64_t buffer_to_add = buffer_idx + state_->future_decoders.size(); if (buffer_to_add * state_->chunk_group_size >= - state_->chunk_offsets.size()) { + state_->chunk_offsets->size()) { break; } // Although our internal ThreadPool takes absl::AnyInvocable which is @@ -708,10 +819,10 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) { chunk_offsets.reserve(state_->chunk_group_size); uint64_t chunk_start = buffer_to_add * state_->chunk_group_size; uint64_t chunk_end = - std::min(state_->chunk_offsets.size(), + std::min(state_->chunk_offsets->size(), (buffer_to_add + 1) * state_->chunk_group_size); for (uint64_t chunk_idx = chunk_start; chunk_idx < chunk_end; ++chunk_idx) { - chunk_offsets.push_back(state_->chunk_offsets[chunk_idx]); + chunk_offsets.push_back((*state_->chunk_offsets)[chunk_idx]); } uint64_t buffer_len = state_->ChunkEndOffset(chunk_end - 1) - chunk_offsets[0]; diff --git a/cpp/array_record_reader.h b/cpp/array_record_reader.h index dd7fb1c..ce8e34b 100644 --- a/cpp/array_record_reader.h +++ b/cpp/array_record_reader.h @@ -64,20 +64,31 @@ class ArrayRecordReaderBase : public riegeli::Object { public: Options() {} + enum class IndexStorageOption { + // Keeps all the record/chunk index in memory. Trade-off memory usage for + // speed. + kInMemory = 0, + // Does not keep the index in memory and reads the index from disk for + // every access. Uses much smaller memory footprint. + kOffloaded = 1, + }; + // Parses options from text: // ``` // options ::= option? ("," option?)* // option ::= // "readahead_buffer_size" ":" readahead_buffer_size | // "max_parallelism" ":" max_parallelism + // "index_storage_option" ":" index_storage_option // readahead_buffer_size ::= non-negative integer expressed as real with // optional suffix [BkKMGTPE]. (Default 16MB). Set to 0 optimizes random // access performance. // max_parallelism ::= `auto` or non-negative integer. Each parallel - // thread - // owns its readhaed buffer with the size `readahead_buffer_size`. - // (Default thread pool size) Set to 0 optimizes random access - // performance. + // thread owns its readhaed buffer with the size + // `readahead_buffer_size`. (Default thread pool size) Set to 0 + // optimizes random access performance. + // index_storage_option ::= `in_memory` or `offloaded`. Default to + // `in_memory`. // ``` static absl::StatusOr FromString(absl::string_view text); @@ -99,9 +110,19 @@ class ArrayRecordReaderBase : public riegeli::Object { } std::optional max_parallelism() const { return max_parallelism_; } + // Specifies the index storage option. + Options& set_index_storage_option(IndexStorageOption storage_option) { + index_storage_option_ = storage_option; + return *this; + } + IndexStorageOption index_storage_option() const { + return index_storage_option_; + } + private: std::optional max_parallelism_ = std::nullopt; uint64_t readahead_buffer_size_ = kDefaultReadaheadBufferSize; + IndexStorageOption index_storage_option_ = IndexStorageOption::kInMemory; }; // Reads the entire file in parallel and invokes the callback function of @@ -303,7 +324,7 @@ class ArrayRecordReaderBase : public riegeli::Object { // Holds all the internal state in a variable to simplify the implementation // of the "Close after Move" semantic. struct ArrayRecordReaderState; - friend class ChunkDispatcher; + friend class OffloadedChunkOffset; std::unique_ptr state_; }; diff --git a/cpp/array_record_reader_test.cc b/cpp/array_record_reader_test.cc index acb809e..aa07c5f 100644 --- a/cpp/array_record_reader_test.cc +++ b/cpp/array_record_reader_test.cc @@ -45,14 +45,17 @@ namespace { enum class CompressionType { kUncompressed, kBrotli, kZstd, kSnappy }; +using IndexStorageOption = ArrayRecordReaderBase::Options::IndexStorageOption; + // Tuple params // CompressionType // transpose // use_thread_pool // optimize_for_random_access +// index_storage_option class ArrayRecordReaderTest : public testing::TestWithParam< - std::tuple> { + std::tuple> { public: ARThreadPool* get_pool() { return ArrayRecordGlobalPool(); } ArrayRecordWriterBase::Options GetWriterOptions() { @@ -78,6 +81,7 @@ class ArrayRecordReaderTest bool transpose() { return std::get<1>(GetParam()); } bool use_thread_pool() { return std::get<2>(GetParam()); } bool optimize_for_random_access() { return std::get<3>(GetParam()); } + IndexStorageOption index_storage_option() { return std::get<4>(GetParam()); } }; TEST_P(ArrayRecordReaderTest, MoveTest) { @@ -94,6 +98,7 @@ TEST_P(ArrayRecordReaderTest, MoveTest) { ASSERT_TRUE(writer.Close()); auto reader_opt = ArrayRecordReaderBase::Options(); + reader_opt.set_index_storage_option(index_storage_option()); if (optimize_for_random_access()) { reader_opt.set_max_parallelism(0); reader_opt.set_readahead_buffer_size(0); @@ -183,6 +188,7 @@ TEST_P(ArrayRecordReaderTest, RandomDatasetTest) { ASSERT_TRUE(writer.Close()); auto reader_opt = ArrayRecordReaderBase::Options(); + reader_opt.set_index_storage_option(index_storage_option()); if (optimize_for_random_access()) { reader_opt.set_max_parallelism(0); reader_opt.set_readahead_buffer_size(0); @@ -269,7 +275,9 @@ INSTANTIATE_TEST_SUITE_P( CompressionType::kBrotli, CompressionType::kZstd, CompressionType::kSnappy), - testing::Bool(), testing::Bool(), testing::Bool())); + testing::Bool(), testing::Bool(), testing::Bool(), + testing::Values(IndexStorageOption::kInMemory, + IndexStorageOption::kOffloaded))); TEST(ArrayRecordReaderOptionTest, ParserTest) { {