Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions src/v/datalake/record_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -382,22 +382,19 @@ record_multiplexer::finish(
_error = res.error();
continue;
}
if (_result) {
auto& files = res.value();
vlog(
_log.trace, "writer finished: files_created={})", files.size());
std::move(
files.begin(),
files.end(),
std::back_inserter(finished_files.data_files));
}
auto& files = res.value();
vlog(_log.trace, "writer finished: files_created={})", files.size());
std::move(
files.begin(),
files.end(),
std::back_inserter(finished_files.data_files));
}
if (_invalid_record_writer) {
auto writer = std::move(_invalid_record_writer);
auto res = co_await std::move(*writer).finish();
if (res.has_error()) {
_error = res.error();
} else if (_result) {
} else {
auto& files = res.value();
vlog(
_log.trace,
Expand Down
68 changes: 68 additions & 0 deletions src/v/datalake/tests/translation_task_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,36 @@ class TranslateTaskTest
return model::make_memory_record_batch_reader(std::move(batches));
}

/// Returns a memory tracker that allows for `n` successful reservations.
/// After `n` reservations `out_of_memory` is returned.
std::unique_ptr<datalake::writer_mem_tracker>
make_memory_tracker(size_t n) {
class mem_tracker : public datalake::writer_mem_tracker {
public:
explicit mem_tracker(size_t num_left)
: _num_left(num_left) {}
ss::future<reservation_error>
reserve_bytes(size_t u, ss::abort_source& as) noexcept override {
if (_num_left == 0) {
co_return reservation_error::out_of_memory;
}
--_num_left;
co_return reservation_error::ok;
}
ss::future<> free_bytes(size_t, ss::abort_source&) override {
return ss::now();
}
void release() override {}
writer_disk_tracker& disk() override { return _disk.disk(); }

private:
size_t _num_left;
datalake::noop_mem_tracker _disk;
};

return std::make_unique<mem_tracker>(n);
}

std::unique_ptr<datalake::parquet_file_writer_factory>
get_writer_factory() {
return std::make_unique<datalake::local_parquet_file_writer_factory>(
Expand Down Expand Up @@ -275,6 +305,44 @@ TEST_F(TranslateTaskTest, TestUploadError) {
// check no data files are left behind
ASSERT_THAT(list_data_files().get(), IsEmpty());
}
TEST_F(TranslateTaskTest, TestCleanupAfterOOMError) {
// Create a mem tracker allows for 1 reservation to make a single writer.
auto mem_tracker = make_memory_tracker(1);
auto writer_factory
= std::make_unique<datalake::local_parquet_file_writer_factory>(
datalake::local_path(tmp_dir.get_path()),
"test-prefix",
ss::make_shared<datalake::serde_parquet_writer_factory>(),
*mem_tracker);

datalake::translation_task task(
ntp,
model::revision_id{123},
std::move(writer_factory),
cloud_io,
&features,
*schema_mgr,
*schema_resolver,
*translator,
*t_creator,
model::iceberg_invalid_record_action::dlq_table,
location_provider,
probe);

task.translate_once(make_batches(10, 16), kafka::offset{0}, as).get();
auto result = std::move(task)
.finish(
translation_task::custom_partitioning_enabled::yes,
test_rcn,
as)
.get();

ASSERT_TRUE(result.has_error());
ASSERT_EQ(result.error(), datalake::translation_task::errc::no_data);

// check no data files are left behind
ASSERT_THAT(list_data_files().get(), IsEmpty());
}

TEST_F(TranslateTaskTest, TestCleanupAfterTransientError) {
datalake::translation_task task(
Expand Down