diff --git a/MODULE.bazel b/MODULE.bazel index 4af5a79..f91bc78 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -13,7 +13,7 @@ # limitations under the License. # TODO(fchern): automate version string alignment with setup.py -VERSION = "0.8.3" +VERSION = "0.8.4" module( name = "array_record", diff --git a/cpp/BUILD b/cpp/BUILD index 2b799c5..a5946ec 100644 --- a/cpp/BUILD +++ b/cpp/BUILD @@ -276,8 +276,14 @@ cc_test( cc_test( name = "array_record_reader_test", + size = "large", srcs = ["array_record_reader_test.cc"], shard_count = 4, + # Timeout on *san tests + tags = [ + "noasan", + "nomsan", + ], deps = [ ":array_record_reader", ":array_record_writer", diff --git a/cpp/array_record_reader_test.cc b/cpp/array_record_reader_test.cc index aa07c5f..f3efad5 100644 --- a/cpp/array_record_reader_test.cc +++ b/cpp/array_record_reader_test.cc @@ -38,7 +38,7 @@ limitations under the License. #include "riegeli/bytes/string_reader.h" #include "riegeli/bytes/string_writer.h" -constexpr uint32_t kDatasetSize = 10050; +constexpr uint32_t kDatasetSize = 3210; namespace array_record { namespace { @@ -203,23 +203,25 @@ TEST_P(ArrayRecordReaderTest, RandomDatasetTest) { std::min(ArrayRecordWriterBase::Options::kDefaultGroupSize, kDatasetSize); EXPECT_EQ(reader.RecordGroupSize(), group_size); - std::vector read_all_records(kDatasetSize, false); + // vector bool is casted as bit string, which is not thread safe to write. + std::vector read_all_records(kDatasetSize, 0); ASSERT_TRUE(reader .ParallelReadRecords( [&](uint64_t record_index, absl::string_view result_view) -> absl::Status { EXPECT_EQ(result_view, records[record_index]); EXPECT_FALSE(read_all_records[record_index]); - read_all_records[record_index] = true; + read_all_records[record_index] = 1; return absl::OkStatus(); }) .ok()); - for (bool record_was_read : read_all_records) { + for (auto record_was_read : read_all_records) { EXPECT_TRUE(record_was_read); } std::vector indices = {0, 3, 5, 7, 101, 2000}; - std::vector read_indexed_records(indices.size(), false); + // vector bool is casted as bit string, which is not thread safe to write. + std::vector read_indexed_records(indices.size(), 0); ASSERT_TRUE(reader .ParallelReadRecordsWithIndices( indices, @@ -227,16 +229,17 @@ TEST_P(ArrayRecordReaderTest, RandomDatasetTest) { absl::string_view result_view) -> absl::Status { EXPECT_EQ(result_view, records[indices[indices_idx]]); EXPECT_FALSE(read_indexed_records[indices_idx]); - read_indexed_records[indices_idx] = true; + read_indexed_records[indices_idx] = 1; return absl::OkStatus(); }) .ok()); - for (bool record_was_read : read_indexed_records) { + for (auto record_was_read : read_indexed_records) { EXPECT_TRUE(record_was_read); } uint64_t begin = 10, end = 101; - std::vector read_range_records(end - begin, false); + // vector bool is casted as bit string, which is not thread safe to write. + std::vector read_range_records(end - begin, 0); ASSERT_TRUE(reader .ParallelReadRecordsInRange( begin, end, @@ -244,11 +247,11 @@ TEST_P(ArrayRecordReaderTest, RandomDatasetTest) { absl::string_view result_view) -> absl::Status { EXPECT_EQ(result_view, records[record_index]); EXPECT_FALSE(read_range_records[record_index - begin]); - read_range_records[record_index - begin] = true; + read_range_records[record_index - begin] = 1; return absl::OkStatus(); }) .ok()); - for (bool record_was_read : read_range_records) { + for (auto record_was_read : read_range_records) { EXPECT_TRUE(record_was_read); } diff --git a/cpp/array_record_writer.cc b/cpp/array_record_writer.cc index ef722af..f1542c0 100644 --- a/cpp/array_record_writer.cc +++ b/cpp/array_record_writer.cc @@ -105,10 +105,7 @@ template absl::StatusOr ChunkFromSpan(CompressorOptions compression_options, absl::Span span, std::optional header = std::nullopt) { - riegeli::SimpleEncoder encoder( - compression_options, - riegeli::SimpleEncoder::TuningOptions().set_size_hint( - span.size() * sizeof(typename decltype(span)::value_type))); + riegeli::SimpleEncoder encoder(compression_options); if (header.has_value()) { encoder.AddRecord(header.value()); } @@ -166,7 +163,7 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) { // Padding options_parser.AddOption( "pad_to_block_boundary", - ValueParser::Enum({{"", true}, {"true", true}, {"false", false}}, + ValueParser::Enum({{"", false}, {"true", true}, {"false", false}}, &options.pad_to_block_boundary_)); if (!options_parser.FromString(text)) { return options_parser.status(); @@ -400,10 +397,8 @@ std::unique_ptr ArrayRecordWriterBase::CreateEncoder() { riegeli::TransposeEncoder::TuningOptions().set_bucket_size( options_.transpose_bucket_size())); } else { - encoder = std::make_unique( - options_.compressor_options(), - riegeli::SimpleEncoder::TuningOptions().set_size_hint( - submit_chunk_callback_->get_last_decoded_data_size())); + encoder = + std::make_unique(options_.compressor_options()); } if (pool_) { return std::make_unique(std::move(encoder)); diff --git a/cpp/array_record_writer.h b/cpp/array_record_writer.h index 385313d..d322b33 100644 --- a/cpp/array_record_writer.h +++ b/cpp/array_record_writer.h @@ -108,7 +108,7 @@ class ArrayRecordWriterBase : public riegeli::Object { // "window_log" : window_log | // "pad_to_block_boundary" (":" ("true" | "false"))? // group_size ::= positive integer which specifies number of records to be - // grouped into a chunk before compression. (default 65536) + // grouped into a chunk before compression. (default 1) // saturation_delay_ms ::= positive integer which specifies a delay in // milliseconds when the parallel writing queue is saturated. // max_parallelism ::= `auto` or positive integers which specifies @@ -126,7 +126,7 @@ class ArrayRecordWriterBase : public riegeli::Object { // // The larger the value, the denser the file, at the cost of more expansive // random accessing. - static constexpr uint32_t kDefaultGroupSize = 65536; + static constexpr uint32_t kDefaultGroupSize = 1; Options& set_group_size(uint32_t group_size) { group_size_ = group_size; return *this; diff --git a/cpp/array_record_writer_test.cc b/cpp/array_record_writer_test.cc index 1533225..17f3193 100644 --- a/cpp/array_record_writer_test.cc +++ b/cpp/array_record_writer_test.cc @@ -220,6 +220,59 @@ TEST_P(ArrayRecordWriterTest, RandomDatasetTest) { ASSERT_TRUE(reader.Close()); } +TEST_P(ArrayRecordWriterTest, CompressionRatioTest) { + float expected_ratio = 1.0; + auto options = GetOptions(); + if (options.pad_to_block_boundary()) { + GTEST_SKIP() + << "Padded boundaries are known to have bad compression ratio."; + } + // We use uniform int distribution for values that are within a byte. + // Therefore regular compression algorithm should easily compress it + // for at least 2x and near 4x. However, the snappy compression doesn't + // work that well so we set a higher threshold. + switch (options.compression_type()) { + case riegeli::CompressionType::kNone: + GTEST_SKIP() << "No need to verify compression ratio for uncompressed."; + case riegeli::CompressionType::kBrotli: + expected_ratio = 0.4; + break; + case riegeli::CompressionType::kZstd: + expected_ratio = 0.4; + break; + case riegeli::CompressionType::kSnappy: + expected_ratio = 0.6; + break; + } + options.set_group_size(128); + std::mt19937 bitgen; + std::uniform_int_distribution dist(0, 128); + constexpr uint32_t num_records = 32768; + constexpr uint32_t dim = 256; + std::vector records(num_records * dim); + + for (auto i : Seq(num_records * dim)) { + records[i] = dist(bitgen); + } + std::string encoded; + + ARThreadPool* pool = nullptr; + if (std::get<3>(GetParam())) { + pool = ArrayRecordGlobalPool(); + } + + auto writer = ArrayRecordWriter( + riegeli::Maker(&encoded), options, pool); + + for (auto i : Seq(num_records)) { + EXPECT_TRUE( + writer.WriteRecord(records.data() + dim * i, dim * sizeof(uint32_t))); + } + ASSERT_TRUE(writer.Close()); + EXPECT_LE(encoded.size(), + num_records * dim * sizeof(uint32_t) * expected_ratio); +} + INSTANTIATE_TEST_SUITE_P( ParamTest, ArrayRecordWriterTest, testing::Combine(testing::Values(CompressionType::kUncompressed, @@ -253,7 +306,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { EXPECT_FALSE(option.pad_to_block_boundary()); EXPECT_EQ(option.ToString(), - "group_size:65536," + "group_size:1," "transpose:false," "pad_to_block_boundary:false," "zstd:3," @@ -274,7 +327,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { EXPECT_FALSE(option.pad_to_block_boundary()); EXPECT_EQ(option.ToString(), - "group_size:65536," + "group_size:1," "transpose:false," "pad_to_block_boundary:false," "zstd:3," @@ -362,7 +415,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { EXPECT_TRUE(option.pad_to_block_boundary()); EXPECT_EQ(option.ToString(), - "group_size:65536," + "group_size:1," "transpose:false," "pad_to_block_boundary:true," "uncompressed"); @@ -382,7 +435,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { EXPECT_TRUE(option.pad_to_block_boundary()); EXPECT_EQ(option.ToString(), - "group_size:65536," + "group_size:1," "transpose:false," "pad_to_block_boundary:true," "snappy"); diff --git a/python/array_record_data_source_test.py b/python/array_record_data_source_test.py index 0c1fbfd..8977a27 100644 --- a/python/array_record_data_source_test.py +++ b/python/array_record_data_source_test.py @@ -47,7 +47,7 @@ def setUp(self): def test_check_default_group_size(self): filename = os.path.join(FLAGS.test_tmpdir, "test.array_record") - writer = array_record_module.ArrayRecordWriter(filename) + writer = array_record_module.ArrayRecordWriter(filename, "group_size:65536") writer.write(b"foobar") writer.close() reader = array_record_module.ArrayRecordReader(filename) diff --git a/setup.py b/setup.py index 168fe26..39d1634 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ def has_ext_modules(self): setup( name='array_record', - version='0.8.3', + version='0.8.4', description='A file format that achieves a new frontier of IO efficiency', author='ArrayRecord team', author_email='no-reply@google.com',