diff --git a/src/v/datalake/record_multiplexer.cc b/src/v/datalake/record_multiplexer.cc index 550d920ffe1b0..45c2943bb480c 100644 --- a/src/v/datalake/record_multiplexer.cc +++ b/src/v/datalake/record_multiplexer.cc @@ -382,22 +382,19 @@ record_multiplexer::finish( _error = res.error(); continue; } - if (_result) { - auto& files = res.value(); - vlog( - _log.trace, "writer finished: files_created={})", files.size()); - std::move( - files.begin(), - files.end(), - std::back_inserter(finished_files.data_files)); - } + auto& files = res.value(); + vlog(_log.trace, "writer finished: files_created={})", files.size()); + std::move( + files.begin(), + files.end(), + std::back_inserter(finished_files.data_files)); } if (_invalid_record_writer) { auto writer = std::move(_invalid_record_writer); auto res = co_await std::move(*writer).finish(); if (res.has_error()) { _error = res.error(); - } else if (_result) { + } else { auto& files = res.value(); vlog( _log.trace, diff --git a/src/v/datalake/tests/translation_task_test.cc b/src/v/datalake/tests/translation_task_test.cc index f620ba7531e3a..a2cc11722fbaf 100644 --- a/src/v/datalake/tests/translation_task_test.cc +++ b/src/v/datalake/tests/translation_task_test.cc @@ -89,6 +89,36 @@ class TranslateTaskTest return model::make_memory_record_batch_reader(std::move(batches)); } + /// Returns a memory tracker that allows for `n` successful reservations. + /// After `n` reservations `out_of_memory` is returned. + std::unique_ptr + make_memory_tracker(size_t n) { + class mem_tracker : public datalake::writer_mem_tracker { + public: + explicit mem_tracker(size_t num_left) + : _num_left(num_left) {} + ss::future + reserve_bytes(size_t u, ss::abort_source& as) noexcept override { + if (_num_left == 0) { + co_return reservation_error::out_of_memory; + } + --_num_left; + co_return reservation_error::ok; + } + ss::future<> free_bytes(size_t, ss::abort_source&) override { + return ss::now(); + } + void release() override {} + writer_disk_tracker& disk() override { return _disk.disk(); } + + private: + size_t _num_left; + datalake::noop_mem_tracker _disk; + }; + + return std::make_unique(n); + } + std::unique_ptr get_writer_factory() { return std::make_unique( @@ -275,6 +305,44 @@ TEST_F(TranslateTaskTest, TestUploadError) { // check no data files are left behind ASSERT_THAT(list_data_files().get(), IsEmpty()); } +TEST_F(TranslateTaskTest, TestCleanupAfterOOMError) { + // Create a mem tracker allows for 1 reservation to make a single writer. + auto mem_tracker = make_memory_tracker(1); + auto writer_factory + = std::make_unique( + datalake::local_path(tmp_dir.get_path()), + "test-prefix", + ss::make_shared(), + *mem_tracker); + + datalake::translation_task task( + ntp, + model::revision_id{123}, + std::move(writer_factory), + cloud_io, + &features, + *schema_mgr, + *schema_resolver, + *translator, + *t_creator, + model::iceberg_invalid_record_action::dlq_table, + location_provider, + probe); + + task.translate_once(make_batches(10, 16), kafka::offset{0}, as).get(); + auto result = std::move(task) + .finish( + translation_task::custom_partitioning_enabled::yes, + test_rcn, + as) + .get(); + + ASSERT_TRUE(result.has_error()); + ASSERT_EQ(result.error(), datalake::translation_task::errc::no_data); + + // check no data files are left behind + ASSERT_THAT(list_data_files().get(), IsEmpty()); +} TEST_F(TranslateTaskTest, TestCleanupAfterTransientError) { datalake::translation_task task(