Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

# TODO(fchern): automate version string alignment with setup.py
VERSION = "0.8.3"
VERSION = "0.8.4"

module(
name = "array_record",
Expand Down
6 changes: 6 additions & 0 deletions cpp/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,14 @@ cc_test(

cc_test(
name = "array_record_reader_test",
size = "large",
srcs = ["array_record_reader_test.cc"],
shard_count = 4,
# Timeout on *san tests
tags = [
"noasan",
"nomsan",
],
deps = [
":array_record_reader",
":array_record_writer",
Expand Down
23 changes: 13 additions & 10 deletions cpp/array_record_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ limitations under the License.
#include "riegeli/bytes/string_reader.h"
#include "riegeli/bytes/string_writer.h"

constexpr uint32_t kDatasetSize = 10050;
constexpr uint32_t kDatasetSize = 3210;

namespace array_record {
namespace {
Expand Down Expand Up @@ -203,52 +203,55 @@ TEST_P(ArrayRecordReaderTest, RandomDatasetTest) {
std::min(ArrayRecordWriterBase::Options::kDefaultGroupSize, kDatasetSize);
EXPECT_EQ(reader.RecordGroupSize(), group_size);

std::vector<bool> read_all_records(kDatasetSize, false);
// vector bool is casted as bit string, which is not thread safe to write.
std::vector<int> read_all_records(kDatasetSize, 0);
ASSERT_TRUE(reader
.ParallelReadRecords(
[&](uint64_t record_index,
absl::string_view result_view) -> absl::Status {
EXPECT_EQ(result_view, records[record_index]);
EXPECT_FALSE(read_all_records[record_index]);
read_all_records[record_index] = true;
read_all_records[record_index] = 1;
return absl::OkStatus();
})
.ok());
for (bool record_was_read : read_all_records) {
for (auto record_was_read : read_all_records) {
EXPECT_TRUE(record_was_read);
}

std::vector<uint64_t> indices = {0, 3, 5, 7, 101, 2000};
std::vector<bool> read_indexed_records(indices.size(), false);
// vector bool is casted as bit string, which is not thread safe to write.
std::vector<int> read_indexed_records(indices.size(), 0);
ASSERT_TRUE(reader
.ParallelReadRecordsWithIndices(
indices,
[&](uint64_t indices_idx,
absl::string_view result_view) -> absl::Status {
EXPECT_EQ(result_view, records[indices[indices_idx]]);
EXPECT_FALSE(read_indexed_records[indices_idx]);
read_indexed_records[indices_idx] = true;
read_indexed_records[indices_idx] = 1;
return absl::OkStatus();
})
.ok());
for (bool record_was_read : read_indexed_records) {
for (auto record_was_read : read_indexed_records) {
EXPECT_TRUE(record_was_read);
}

uint64_t begin = 10, end = 101;
std::vector<bool> read_range_records(end - begin, false);
// vector bool is casted as bit string, which is not thread safe to write.
std::vector<int> read_range_records(end - begin, 0);
ASSERT_TRUE(reader
.ParallelReadRecordsInRange(
begin, end,
[&](uint64_t record_index,
absl::string_view result_view) -> absl::Status {
EXPECT_EQ(result_view, records[record_index]);
EXPECT_FALSE(read_range_records[record_index - begin]);
read_range_records[record_index - begin] = true;
read_range_records[record_index - begin] = 1;
return absl::OkStatus();
})
.ok());
for (bool record_was_read : read_range_records) {
for (auto record_was_read : read_range_records) {
EXPECT_TRUE(record_was_read);
}

Expand Down
13 changes: 4 additions & 9 deletions cpp/array_record_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,7 @@ template <typename T, typename H = std::string>
absl::StatusOr<Chunk> ChunkFromSpan(CompressorOptions compression_options,
absl::Span<const T> span,
std::optional<H> header = std::nullopt) {
riegeli::SimpleEncoder encoder(
compression_options,
riegeli::SimpleEncoder::TuningOptions().set_size_hint(
span.size() * sizeof(typename decltype(span)::value_type)));
riegeli::SimpleEncoder encoder(compression_options);
if (header.has_value()) {
encoder.AddRecord(header.value());
}
Expand Down Expand Up @@ -166,7 +163,7 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
// Padding
options_parser.AddOption(
"pad_to_block_boundary",
ValueParser::Enum({{"", true}, {"true", true}, {"false", false}},
ValueParser::Enum({{"", false}, {"true", true}, {"false", false}},
&options.pad_to_block_boundary_));
if (!options_parser.FromString(text)) {
return options_parser.status();
Expand Down Expand Up @@ -400,10 +397,8 @@ std::unique_ptr<riegeli::ChunkEncoder> ArrayRecordWriterBase::CreateEncoder() {
riegeli::TransposeEncoder::TuningOptions().set_bucket_size(
options_.transpose_bucket_size()));
} else {
encoder = std::make_unique<riegeli::SimpleEncoder>(
options_.compressor_options(),
riegeli::SimpleEncoder::TuningOptions().set_size_hint(
submit_chunk_callback_->get_last_decoded_data_size()));
encoder =
std::make_unique<riegeli::SimpleEncoder>(options_.compressor_options());
}
if (pool_) {
return std::make_unique<riegeli::DeferredEncoder>(std::move(encoder));
Expand Down
4 changes: 2 additions & 2 deletions cpp/array_record_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class ArrayRecordWriterBase : public riegeli::Object {
// "window_log" : window_log |
// "pad_to_block_boundary" (":" ("true" | "false"))?
// group_size ::= positive integer which specifies number of records to be
// grouped into a chunk before compression. (default 65536)
// grouped into a chunk before compression. (default 1)
// saturation_delay_ms ::= positive integer which specifies a delay in
// milliseconds when the parallel writing queue is saturated.
// max_parallelism ::= `auto` or positive integers which specifies
Expand All @@ -126,7 +126,7 @@ class ArrayRecordWriterBase : public riegeli::Object {
//
// The larger the value, the denser the file, at the cost of more expansive
// random accessing.
static constexpr uint32_t kDefaultGroupSize = 65536;
static constexpr uint32_t kDefaultGroupSize = 1;
Options& set_group_size(uint32_t group_size) {
group_size_ = group_size;
return *this;
Expand Down
61 changes: 57 additions & 4 deletions cpp/array_record_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,59 @@ TEST_P(ArrayRecordWriterTest, RandomDatasetTest) {
ASSERT_TRUE(reader.Close());
}

TEST_P(ArrayRecordWriterTest, CompressionRatioTest) {
float expected_ratio = 1.0;
auto options = GetOptions();
if (options.pad_to_block_boundary()) {
GTEST_SKIP()
<< "Padded boundaries are known to have bad compression ratio.";
}
// We use uniform int distribution for values that are within a byte.
// Therefore regular compression algorithm should easily compress it
// for at least 2x and near 4x. However, the snappy compression doesn't
// work that well so we set a higher threshold.
switch (options.compression_type()) {
case riegeli::CompressionType::kNone:
GTEST_SKIP() << "No need to verify compression ratio for uncompressed.";
case riegeli::CompressionType::kBrotli:
expected_ratio = 0.4;
break;
case riegeli::CompressionType::kZstd:
expected_ratio = 0.4;
break;
case riegeli::CompressionType::kSnappy:
expected_ratio = 0.6;
break;
}
options.set_group_size(128);
std::mt19937 bitgen;
std::uniform_int_distribution<uint32_t> dist(0, 128);
constexpr uint32_t num_records = 32768;
constexpr uint32_t dim = 256;
std::vector<uint32_t> records(num_records * dim);

for (auto i : Seq(num_records * dim)) {
records[i] = dist(bitgen);
}
std::string encoded;

ARThreadPool* pool = nullptr;
if (std::get<3>(GetParam())) {
pool = ArrayRecordGlobalPool();
}

auto writer = ArrayRecordWriter(
riegeli::Maker<riegeli::StringWriter>(&encoded), options, pool);

for (auto i : Seq(num_records)) {
EXPECT_TRUE(
writer.WriteRecord(records.data() + dim * i, dim * sizeof(uint32_t)));
}
ASSERT_TRUE(writer.Close());
EXPECT_LE(encoded.size(),
num_records * dim * sizeof(uint32_t) * expected_ratio);
}

INSTANTIATE_TEST_SUITE_P(
ParamTest, ArrayRecordWriterTest,
testing::Combine(testing::Values(CompressionType::kUncompressed,
Expand Down Expand Up @@ -253,7 +306,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
EXPECT_FALSE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"group_size:1,"
"transpose:false,"
"pad_to_block_boundary:false,"
"zstd:3,"
Expand All @@ -274,7 +327,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
EXPECT_FALSE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"group_size:1,"
"transpose:false,"
"pad_to_block_boundary:false,"
"zstd:3,"
Expand Down Expand Up @@ -362,7 +415,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
EXPECT_TRUE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"group_size:1,"
"transpose:false,"
"pad_to_block_boundary:true,"
"uncompressed");
Expand All @@ -382,7 +435,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
EXPECT_TRUE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"group_size:1,"
"transpose:false,"
"pad_to_block_boundary:true,"
"snappy");
Expand Down
2 changes: 1 addition & 1 deletion python/array_record_data_source_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def setUp(self):

def test_check_default_group_size(self):
filename = os.path.join(FLAGS.test_tmpdir, "test.array_record")
writer = array_record_module.ArrayRecordWriter(filename)
writer = array_record_module.ArrayRecordWriter(filename, "group_size:65536")
writer.write(b"foobar")
writer.close()
reader = array_record_module.ArrayRecordReader(filename)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def has_ext_modules(self):

setup(
name='array_record',
version='0.8.3',
version='0.8.4',
description='A file format that achieves a new frontier of IO efficiency',
author='ArrayRecord team',
author_email='no-reply@google.com',
Expand Down
Loading