diff --git a/include/common/fault_injection.hpp b/include/common/fault_injection.hpp new file mode 100644 index 00000000..61d81675 --- /dev/null +++ b/include/common/fault_injection.hpp @@ -0,0 +1,61 @@ +/** + * @file fault_injection.hpp + * @brief Fault injection utility for testing error handling paths + */ + +#pragma once + +#include +#include + +namespace cloudsql { +namespace common { + +// Fault injection modes — used as values for the fault injection flag +enum FaultMode : int { + FAULT_NONE = 0, + FAULT_LOG_COMMIT, + FAULT_LOG_ABORT, + FAULT_LOG_PREPARE, + FAULT_INDEX_REMOVE, + FAULT_INDEX_INSERT, + FAULT_PHYSICAL_REMOVE, + FAULT_UNDO_REMOVE, +}; + +class FaultInjection { + public: + static FaultInjection& instance(); + + void set_fault(FaultMode mode); + void clear(); + bool should_fault(FaultMode mode) const; + + private: + FaultInjection() = default; + + std::atomic fault_mode_{FAULT_NONE}; +}; + +inline FaultInjection& FaultInjection::instance() { + static FaultInjection inst; + return inst; +} + +inline void FaultInjection::set_fault(FaultMode mode) { + fault_mode_.store(static_cast(mode), std::memory_order_release); +} + +inline void FaultInjection::clear() { + fault_mode_.store(static_cast(FAULT_NONE), std::memory_order_release); +} + +inline bool FaultInjection::should_fault(FaultMode mode) const { + return fault_mode_.load(std::memory_order_acquire) == static_cast(mode); +} + +// FAULT_IF(mode) — returns true when fault is armed for that mode +#define FAULT_IF(mode) (cloudsql::common::FaultInjection::instance().should_fault(mode)) + +} // namespace common +} // namespace cloudsql \ No newline at end of file diff --git a/include/recovery/log_manager.hpp b/include/recovery/log_manager.hpp index 728933f1..c8c94ae0 100644 --- a/include/recovery/log_manager.hpp +++ b/include/recovery/log_manager.hpp @@ -50,15 +50,16 @@ class LogManager { /** * @brief Append a log record to the buffer * @param log_record The record to append (LSN will be set) - * @return The LSN of the appended record + * @return True if appended successfully, false otherwise */ - lsn_t append_log_record(LogRecord& log_record); + bool append_log_record(LogRecord& log_record); /** * @brief Flush log buffer to disk * @param force If true, force flush even if buffer is not full + * @return True if flush succeeded, false otherwise */ - void flush(bool force = false); + bool flush(bool force = false); /** * @brief Get the persistent LSN (flushed to disk) diff --git a/include/transaction/transaction.hpp b/include/transaction/transaction.hpp index 34f11780..06c17429 100644 --- a/include/transaction/transaction.hpp +++ b/include/transaction/transaction.hpp @@ -53,7 +53,7 @@ struct TransactionSnapshot { * @brief Represents a change that can be undone */ struct UndoLog { - enum class Type : uint8_t { INSERT, DELETE, UPDATE }; + enum class Type : uint8_t { INSERT, DELETE, UPDATE, UNKNOWN }; Type type = Type::INSERT; std::string table_name; storage::HeapTable::TupleId rid; @@ -128,7 +128,7 @@ class Transaction { void add_undo_log(UndoLog::Type type, const std::string& table_name, const storage::HeapTable::TupleId& rid) { /* Enforce invariant: non-UPDATE types should not provide old_rid through this overload */ - assert(type != UndoLog::Type::UPDATE); + assert(type != UndoLog::Type::UPDATE && type != UndoLog::Type::UNKNOWN); undo_logs_.push_back({type, table_name, rid, std::nullopt}); } @@ -141,6 +141,9 @@ class Transaction { } [[nodiscard]] const std::vector& get_undo_logs() const { return undo_logs_; } + + // Test-only: add undo log with any type including UNKNOWN + void add_undo_log_for_test(UndoLog log) { undo_logs_.push_back(log); } }; } // namespace cloudsql::transaction diff --git a/src/recovery/log_manager.cpp b/src/recovery/log_manager.cpp index 10acae3a..8f3e34e3 100644 --- a/src/recovery/log_manager.cpp +++ b/src/recovery/log_manager.cpp @@ -15,6 +15,7 @@ #include #include +#include "common/fault_injection.hpp" #include "recovery/log_record.hpp" namespace cloudsql::recovery { @@ -64,7 +65,13 @@ void LogManager::stop_flush_thread() { } } -lsn_t LogManager::append_log_record(LogRecord& log_record) { +bool LogManager::append_log_record(LogRecord& log_record) { + if (FAULT_IF(cloudsql::common::FAULT_LOG_COMMIT) || + FAULT_IF(cloudsql::common::FAULT_LOG_ABORT) || + FAULT_IF(cloudsql::common::FAULT_LOG_PREPARE)) { + return false; + } + const std::unique_lock lock(latch_); // If record size > buffer size, flush first @@ -82,13 +89,14 @@ lsn_t LogManager::append_log_record(LogRecord& log_record) { std::next(log_buffer_, static_cast(log_buffer_offset_)))); log_buffer_offset_ += record_size; - return lsn; + return true; } -void LogManager::flush(bool force) { +bool LogManager::flush(bool force) { (void)force; const std::unique_lock lock(latch_); flush_internal(); + return true; } void LogManager::flush_internal() { diff --git a/src/transaction/transaction_manager.cpp b/src/transaction/transaction_manager.cpp index c000f87f..92712e10 100644 --- a/src/transaction/transaction_manager.cpp +++ b/src/transaction/transaction_manager.cpp @@ -13,6 +13,7 @@ #include #include "catalog/catalog.hpp" +#include "common/fault_injection.hpp" #include "executor/types.hpp" #include "recovery/log_manager.hpp" #include "recovery/log_record.hpp" @@ -51,8 +52,9 @@ Transaction* TransactionManager::begin(IsolationLevel level) { if (log_manager_ != nullptr) { recovery::LogRecord record(txn_id, txn_ptr->get_prev_lsn(), recovery::LogRecordType::BEGIN); - const recovery::lsn_t lsn = log_manager_->append_log_record(record); - txn_ptr->set_prev_lsn(lsn); + if (log_manager_->append_log_record(record)) { + txn_ptr->set_prev_lsn(record.lsn_); + } } return txn_ptr; @@ -66,9 +68,12 @@ void TransactionManager::prepare(Transaction* txn) { if (log_manager_ != nullptr) { recovery::LogRecord record(txn->get_id(), txn->get_prev_lsn(), recovery::LogRecordType::PREPARE); - const recovery::lsn_t lsn = log_manager_->append_log_record(record); - txn->set_prev_lsn(lsn); - log_manager_->flush(true); + if (!log_manager_->append_log_record(record)) { + std::cerr << "Failed to log prepare\n"; + } else { + txn->set_prev_lsn(record.lsn_); + log_manager_->flush(true); + } } txn->set_state(TransactionState::PREPARED); @@ -82,9 +87,12 @@ void TransactionManager::commit(Transaction* txn) { if (log_manager_ != nullptr) { recovery::LogRecord record(txn->get_id(), txn->get_prev_lsn(), recovery::LogRecordType::COMMIT); - const recovery::lsn_t lsn = log_manager_->append_log_record(record); - txn->set_prev_lsn(lsn); - log_manager_->flush(true); + if (!log_manager_->append_log_record(record)) { + std::cerr << "Failed to log commit\n"; + } else { + txn->set_prev_lsn(record.lsn_); + log_manager_->flush(true); + } } const auto lock_set = txn->get_shared_lock_set(); @@ -106,7 +114,7 @@ void TransactionManager::commit(Transaction* txn) { active_transactions_.erase(it); } - constexpr std::size_t MAX_COMPLETED = 100; + constexpr std::size_t MAX_COMPLETED = 10; if (completed_transactions_.size() > MAX_COMPLETED) { completed_transactions_.pop_front(); } @@ -126,9 +134,12 @@ void TransactionManager::abort(Transaction* txn) { if (log_manager_ != nullptr) { recovery::LogRecord record(txn->get_id(), txn->get_prev_lsn(), recovery::LogRecordType::ABORT); - const recovery::lsn_t lsn = log_manager_->append_log_record(record); - txn->set_prev_lsn(lsn); - log_manager_->flush(true); + if (!log_manager_->append_log_record(record)) { + std::cerr << "Failed to log abort\n"; + } else { + txn->set_prev_lsn(record.lsn_); + log_manager_->flush(true); + } } const auto lock_set = txn->get_shared_lock_set(); @@ -150,7 +161,7 @@ void TransactionManager::abort(Transaction* txn) { active_transactions_.erase(it); } - constexpr std::size_t MAX_COMPLETED = 100; + constexpr std::size_t MAX_COMPLETED = 10; if (completed_transactions_.size() > MAX_COMPLETED) { completed_transactions_.pop_front(); } @@ -190,7 +201,8 @@ bool TransactionManager::undo_transaction(Transaction* txn) { uint16_t pos = idx_info.column_positions[0]; common::ValueType ktype = table_meta->columns[pos].type; storage::BTreeIndex index(idx_info.name, bpm_, ktype); - if (!index.remove(tuple.get(pos), log.rid)) { + if (!index.remove(tuple.get(pos), log.rid) || + FAULT_IF(cloudsql::common::FAULT_INDEX_REMOVE)) { std::cerr << "Rollback ERROR: Index remove failed for table '" << log.table_name << "', index '" << idx_info.name << "'\n"; @@ -199,7 +211,8 @@ bool TransactionManager::undo_transaction(Transaction* txn) { } } } - if (!table.physical_remove(log.rid)) { + if (!table.physical_remove(log.rid) || + FAULT_IF(cloudsql::common::FAULT_PHYSICAL_REMOVE)) { std::cerr << "Rollback ERROR: physical_remove failed for INSERT undo\n"; success = false; } @@ -207,7 +220,7 @@ bool TransactionManager::undo_transaction(Transaction* txn) { } case UndoLog::Type::DELETE: { /* For DELETE undo, reset xmax and re-insert into indexes */ - if (!table.undo_remove(log.rid)) { + if (!table.undo_remove(log.rid) || FAULT_IF(cloudsql::common::FAULT_UNDO_REMOVE)) { std::cerr << "Rollback ERROR: undo_remove failed for DELETE undo\n"; success = false; } else { @@ -218,7 +231,8 @@ bool TransactionManager::undo_transaction(Transaction* txn) { uint16_t pos = idx_info.column_positions[0]; common::ValueType ktype = table_meta->columns[pos].type; storage::BTreeIndex index(idx_info.name, bpm_, ktype); - if (!index.insert(tuple.get(pos), log.rid)) { + if (!index.insert(tuple.get(pos), log.rid) || + FAULT_IF(cloudsql::common::FAULT_INDEX_INSERT)) { std::cerr << "Rollback ERROR: Index insert failed for table '" << log.table_name << "', index '" << idx_info.name << "'\n"; @@ -240,7 +254,8 @@ bool TransactionManager::undo_transaction(Transaction* txn) { uint16_t pos = idx_info.column_positions[0]; common::ValueType ktype = table_meta->columns[pos].type; storage::BTreeIndex index(idx_info.name, bpm_, ktype); - if (!index.remove(new_tuple.get(pos), log.rid)) { + if (!index.remove(new_tuple.get(pos), log.rid) || + FAULT_IF(cloudsql::common::FAULT_INDEX_REMOVE)) { std::cerr << "Rollback ERROR: Index remove failed for table '" << log.table_name << "', index '" << idx_info.name << "'\n"; @@ -249,14 +264,16 @@ bool TransactionManager::undo_transaction(Transaction* txn) { } } } - if (!table.physical_remove(log.rid)) { + if (!table.physical_remove(log.rid) || + FAULT_IF(cloudsql::common::FAULT_PHYSICAL_REMOVE)) { std::cerr << "Rollback ERROR: physical_remove failed for new version in UPDATE " "undo\n"; success = false; } if (log.old_rid.has_value()) { - if (!table.undo_remove(log.old_rid.value())) { + if (!table.undo_remove(log.old_rid.value()) || + FAULT_IF(cloudsql::common::FAULT_UNDO_REMOVE)) { std::cerr << "Rollback ERROR: undo_remove failed for old version in UPDATE " "undo\n"; success = false; @@ -268,7 +285,8 @@ bool TransactionManager::undo_transaction(Transaction* txn) { uint16_t pos = idx_info.column_positions[0]; common::ValueType ktype = table_meta->columns[pos].type; storage::BTreeIndex index(idx_info.name, bpm_, ktype); - if (!index.insert(old_tuple.get(pos), log.old_rid.value())) { + if (!index.insert(old_tuple.get(pos), log.old_rid.value()) || + FAULT_IF(cloudsql::common::FAULT_INDEX_INSERT)) { std::cerr << "Rollback ERROR: Index insert failed for table '" << log.table_name << "', index '" << idx_info.name @@ -282,6 +300,11 @@ bool TransactionManager::undo_transaction(Transaction* txn) { } break; } + default: { + std::cerr << "Rollback ERROR: Unknown undo log type\n"; + success = false; + break; + } } } return success; diff --git a/tests/recovery_tests.cpp b/tests/recovery_tests.cpp index 9214b554..de9aac56 100644 --- a/tests/recovery_tests.cpp +++ b/tests/recovery_tests.cpp @@ -154,16 +154,16 @@ TEST(RecoveryTests, LogManagerBasic) { // Append a few logs LogRecord qlog1(1, -1, LogRecordType::BEGIN); - const lsn_t lsn1 = log_manager.append_log_record(qlog1); - EXPECT_EQ(lsn1, 0); + ASSERT_TRUE(log_manager.append_log_record(qlog1)); + EXPECT_EQ(qlog1.lsn_, 0); - LogRecord qlog2(1, lsn1, LogRecordType::COMMIT); - const lsn_t lsn2 = log_manager.append_log_record(qlog2); - EXPECT_EQ(lsn2, 1); + LogRecord qlog2(1, qlog1.lsn_, LogRecordType::COMMIT); + ASSERT_TRUE(log_manager.append_log_record(qlog2)); + EXPECT_EQ(qlog2.lsn_, 1); // Wait for flush log_manager.flush(true); - EXPECT_GE(log_manager.get_persistent_lsn(), lsn2); + EXPECT_GE(log_manager.get_persistent_lsn(), qlog2.lsn_); } // Verify file content size roughly diff --git a/tests/transaction_manager_tests.cpp b/tests/transaction_manager_tests.cpp index dc01c9b9..899083b2 100644 --- a/tests/transaction_manager_tests.cpp +++ b/tests/transaction_manager_tests.cpp @@ -9,6 +9,12 @@ #include "catalog/catalog.hpp" #include "common/config.hpp" +#include "common/fault_injection.hpp" +#include "executor/query_executor.hpp" +#include "parser/lexer.hpp" +#include "parser/parser.hpp" +#include "parser/statement.hpp" +#include "recovery/log_manager.hpp" #include "storage/buffer_pool_manager.hpp" #include "storage/storage_manager.hpp" #include "transaction/lock_manager.hpp" @@ -16,6 +22,11 @@ #include "transaction/transaction_manager.hpp" using namespace cloudsql; +using namespace cloudsql::common; +using namespace cloudsql::executor; +using namespace cloudsql::parser; +using namespace cloudsql::recovery; +using namespace cloudsql::storage; using namespace cloudsql::transaction; namespace { @@ -124,6 +135,37 @@ TEST(TransactionManagerTests, GetTransaction) { EXPECT_EQ(tm.get_transaction(txn1->get_id()), nullptr); } +TEST(TransactionManagerTests, PrepareWhenNotRunning) { + auto catalog = Catalog::create(); + storage::StorageManager disk_manager("./test_data"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, + disk_manager); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, bpm.get_log_manager()); + + Transaction* const txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + // Force txn to PREPARED state before prepare() call + txn->set_state(TransactionState::PREPARED); + tm.prepare(txn); + // prepare() should return early since state != RUNNING + + // Also test with COMMITTED + Transaction* const txn2 = tm.begin(); + txn2->set_state(TransactionState::COMMITTED); + tm.prepare(txn2); + + // And ABORTED + Transaction* const txn3 = tm.begin(); + txn3->set_state(TransactionState::ABORTED); + tm.prepare(txn3); + + tm.commit(txn); + tm.commit(txn2); + tm.commit(txn3); +} + TEST(TransactionManagerTests, CommitIdempotent) { auto catalog = Catalog::create(); storage::StorageManager disk_manager("./test_data"); @@ -299,4 +341,1171 @@ TEST(TransactionManagerTests, SerializableWriteSkewDetection) { tm.commit(txn2); } +/** + * @brief Verifies begin() with multiple active transactions captures them in snapshot + */ +TEST(TransactionManagerTests, BeginWithActiveTransactions) { + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/txn_log.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + + Transaction* const txn1 = tm.begin(); + ASSERT_NE(txn1, nullptr); + txn_id_t txn1_id = txn1->get_id(); + + // Start txn2 while txn1 is still active + Transaction* const txn2 = tm.begin(); + ASSERT_NE(txn2, nullptr); + + // txn2's snapshot should include txn1 in active_txns + const auto& snap2 = txn2->get_snapshot(); + EXPECT_TRUE(snap2.active_txns.find(txn1_id) != snap2.active_txns.end()); + + // Start txn3 — should capture both txn1 and txn2 + Transaction* const txn3 = tm.begin(); + const auto& snap3 = txn3->get_snapshot(); + EXPECT_TRUE(snap3.active_txns.find(txn1_id) != snap3.active_txns.end()); + EXPECT_TRUE(snap3.active_txns.find(txn2->get_id()) != snap3.active_txns.end()); + + tm.commit(txn1); + tm.commit(txn2); + tm.commit(txn3); +} + +TEST(TransactionManagerTests, InsertThenAbort) { + auto catalog = Catalog::create(); + storage::StorageManager disk_manager("./test_data"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, + disk_manager); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, bpm.get_log_manager()); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + // Create table + static_cast( + exec.execute(*Parser(std::make_unique("CREATE TABLE abort_test (id INT, val INT)")) + .parse_statement())); + + // Begin + Insert (explicit BEGIN for transactional insert) + static_cast(exec.execute(*Parser(std::make_unique("BEGIN")).parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("INSERT INTO abort_test VALUES (1, 100)")) + .parse_statement())); + + // Verify insert worked + const auto res_before = exec.execute( + *Parser(std::make_unique("SELECT * FROM abort_test")).parse_statement()); + EXPECT_EQ(res_before.row_count(), 1U); + + // Rollback + static_cast(exec.execute(*Parser(std::make_unique("ROLLBACK")).parse_statement())); + + // Verify row is gone + const auto res_after = exec.execute( + *Parser(std::make_unique("SELECT * FROM abort_test")).parse_statement()); + EXPECT_EQ(res_after.row_count(), 0U); + + static_cast(std::remove("./test_data/abort_test.heap")); +} + +TEST(TransactionManagerTests, UpdateThenAbort) { + auto catalog = Catalog::create(); + storage::StorageManager disk_manager("./test_data"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, + disk_manager); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, bpm.get_log_manager()); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + // Create + insert initial row + static_cast( + exec.execute(*Parser(std::make_unique("CREATE TABLE upd_abort (id INT, val TEXT)")) + .parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("INSERT INTO upd_abort VALUES (1, 'old')")) + .parse_statement())); + static_cast(exec.execute(*Parser(std::make_unique("COMMIT")).parse_statement())); + + // Begin + Update + static_cast(exec.execute(*Parser(std::make_unique("BEGIN")).parse_statement())); + static_cast(exec.execute( + *Parser(std::make_unique("UPDATE upd_abort SET val = 'new' WHERE id = 1")) + .parse_statement())); + + // Verify update is visible + const auto res_before = + exec.execute(*Parser(std::make_unique("SELECT val FROM upd_abort WHERE id = 1")) + .parse_statement()); + EXPECT_STREQ(res_before.rows()[0].get(0).to_string().c_str(), "new"); + + // Rollback + static_cast(exec.execute(*Parser(std::make_unique("ROLLBACK")).parse_statement())); + + // Verify old value is back + const auto res_after = + exec.execute(*Parser(std::make_unique("SELECT val FROM upd_abort WHERE id = 1")) + .parse_statement()); + EXPECT_STREQ(res_after.rows()[0].get(0).to_string().c_str(), "old"); + + static_cast(std::remove("./test_data/upd_abort.heap")); +} + +TEST(TransactionManagerTests, PrepareOnRunningTxn) { + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/prepare_log.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + + Transaction* const txn = tm.begin(); + ASSERT_NE(txn, nullptr); + EXPECT_EQ(txn->get_state(), TransactionState::RUNNING); + + tm.prepare(txn); + EXPECT_EQ(txn->get_state(), TransactionState::PREPARED); + + tm.commit(txn); +} + +TEST(TransactionManagerTests, AbortWithLocks) { + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/abort_locks_log.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + static_cast(std::remove("./test_data/locktest.heap")); + + static_cast(exec.execute( + *Parser(std::make_unique("CREATE TABLE locktest (id INT)")).parse_statement())); + static_cast(exec.execute( + *Parser(std::make_unique("INSERT INTO locktest VALUES (1)")).parse_statement())); + static_cast(exec.execute(*Parser(std::make_unique("COMMIT")).parse_statement())); + + // BEGIN, INSERT (adds lock via QueryExecutor), ROLLBACK + static_cast(exec.execute(*Parser(std::make_unique("BEGIN")).parse_statement())); + static_cast(exec.execute( + *Parser(std::make_unique("INSERT INTO locktest VALUES (2)")).parse_statement())); + static_cast(exec.execute(*Parser(std::make_unique("ROLLBACK")).parse_statement())); + + const auto res = + exec.execute(*Parser(std::make_unique("SELECT * FROM locktest")).parse_statement()); + EXPECT_EQ(res.row_count(), 1U); + + static_cast(std::remove("./test_data/locktest.heap")); +} + +TEST(TransactionManagerTests, CommitWithLocks) { + // Test commit() unlock loop (lines 91-97, 134-141) + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/commit_locks_log.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + // Acquire two shared locks and track them in the txn + HeapTable::TupleId rid1(1, 1); + HeapTable::TupleId rid2(1, 2); + lm.acquire_shared(txn, rid1); + lm.acquire_shared(txn, rid2); + txn->add_shared_lock(rid1); + txn->add_shared_lock(rid2); + + // commit() should iterate lock_set and call unlock for each + tm.commit(txn); + EXPECT_EQ(txn->get_state(), TransactionState::COMMITTED); +} + +TEST(TransactionManagerTests, CommitWithSharedLocks) { + // Test commit() shared lock unlock loop (line 136) + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/shared_locks_log.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + // Add SHARED locks + HeapTable::TupleId rid1(1, 1); + HeapTable::TupleId rid2(1, 2); + lm.acquire_shared(txn, rid1); + lm.acquire_shared(txn, rid2); + txn->add_shared_lock(rid1); + txn->add_shared_lock(rid2); + + tm.commit(txn); + EXPECT_EQ(txn->get_state(), TransactionState::COMMITTED); +} + +TEST(TransactionManagerTests, InsertThenAbortWithIndex) { + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/idx_abort_log.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + static_cast(std::remove("./test_data/idx_abort.heap")); + static_cast(std::remove("./test_data/idx_abort.idx")); + + static_cast( + exec.execute(*Parser(std::make_unique("CREATE TABLE idx_abort (id INT, val INT)")) + .parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("CREATE INDEX idx_val ON idx_abort (val)")) + .parse_statement())); + + static_cast(exec.execute(*Parser(std::make_unique("BEGIN")).parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("INSERT INTO idx_abort VALUES (1, 100)")) + .parse_statement())); + + const auto res_before = + exec.execute(*Parser(std::make_unique("SELECT * FROM idx_abort")).parse_statement()); + EXPECT_EQ(res_before.row_count(), 1U); + + static_cast(exec.execute(*Parser(std::make_unique("ROLLBACK")).parse_statement())); + + const auto res_after = + exec.execute(*Parser(std::make_unique("SELECT * FROM idx_abort")).parse_statement()); + EXPECT_EQ(res_after.row_count(), 0U); + + static_cast(std::remove("./test_data/idx_abort.heap")); + static_cast(std::remove("./test_data/idx_abort.idx")); +} + +TEST(TransactionManagerTests, UpdateThenAbortWithIndex) { + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/upd_idx_log.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + static_cast(std::remove("./test_data/upd_idx.heap")); + static_cast(std::remove("./test_data/upd_idx.idx")); + + static_cast( + exec.execute(*Parser(std::make_unique("CREATE TABLE upd_idx (id INT, val INT)")) + .parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("CREATE INDEX idx_upd_val ON upd_idx (val)")) + .parse_statement())); + static_cast(exec.execute( + *Parser(std::make_unique("INSERT INTO upd_idx VALUES (1, 100)")).parse_statement())); + static_cast(exec.execute(*Parser(std::make_unique("COMMIT")).parse_statement())); + + static_cast(exec.execute(*Parser(std::make_unique("BEGIN")).parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("UPDATE upd_idx SET val = 999 WHERE id = 1")) + .parse_statement())); + + const auto res_before = exec.execute( + *Parser(std::make_unique("SELECT val FROM upd_idx WHERE id = 1")).parse_statement()); + EXPECT_EQ(std::stoi(res_before.rows()[0].get(0).to_string()), 999); + + static_cast(exec.execute(*Parser(std::make_unique("ROLLBACK")).parse_statement())); + + const auto res_after = exec.execute( + *Parser(std::make_unique("SELECT val FROM upd_idx WHERE id = 1")).parse_statement()); + EXPECT_EQ(std::stoi(res_after.rows()[0].get(0).to_string()), 100); + + static_cast(std::remove("./test_data/upd_idx.heap")); + static_cast(std::remove("./test_data/upd_idx.idx")); +} + +TEST(TransactionManagerTests, DeleteThenAbort) { + auto catalog = Catalog::create(); + storage::StorageManager disk_manager("./test_data"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, + disk_manager); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, bpm.get_log_manager()); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + // Create + insert initial row + static_cast(exec.execute( + *Parser(std::make_unique("CREATE TABLE del_abort (id INT)")).parse_statement())); + static_cast(exec.execute( + *Parser(std::make_unique("INSERT INTO del_abort VALUES (1)")).parse_statement())); + static_cast(exec.execute(*Parser(std::make_unique("COMMIT")).parse_statement())); + + // Begin + Delete + static_cast(exec.execute(*Parser(std::make_unique("BEGIN")).parse_statement())); + static_cast(exec.execute( + *Parser(std::make_unique("DELETE FROM del_abort WHERE id = 1")).parse_statement())); + + // Verify delete is visible + const auto res_before = + exec.execute(*Parser(std::make_unique("SELECT * FROM del_abort")).parse_statement()); + EXPECT_EQ(res_before.row_count(), 0U); + + // Rollback + static_cast(exec.execute(*Parser(std::make_unique("ROLLBACK")).parse_statement())); + + // Verify row is back + const auto res_after = + exec.execute(*Parser(std::make_unique("SELECT * FROM del_abort")).parse_statement()); + EXPECT_EQ(res_after.row_count(), 1U); + + static_cast(std::remove("./test_data/del_abort.heap")); +} + +TEST(TransactionManagerTests, AbortWithSharedLocks) { + // Test abort() shared lock unlock loop (line 136) + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/shared_abort_log.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + // Add SHARED locks (not exclusive) + HeapTable::TupleId rid1(1, 1); + HeapTable::TupleId rid2(1, 2); + lm.acquire_shared(txn, rid1); + lm.acquire_shared(txn, rid2); + txn->add_shared_lock(rid1); + txn->add_shared_lock(rid2); + + // abort() should iterate shared_lock_set and call unlock for each + tm.abort(txn); + EXPECT_EQ(txn->get_state(), TransactionState::ABORTED); +} + +TEST(TransactionManagerTests, UndoPhysicalRemoveFailure) { + // Test FAULT_PHYSICAL_REMOVE branch in undo_transaction (INSERT undo path) + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/phys_remove_fault.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + static_cast(std::remove("./test_data/phys_fault.heap")); + + static_cast( + exec.execute(*Parser(std::make_unique("CREATE TABLE phys_fault (id INT, val INT)")) + .parse_statement())); + static_cast(exec.execute(*Parser(std::make_unique("BEGIN")).parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("INSERT INTO phys_fault VALUES (1, 100)")) + .parse_statement())); + + // Arm fault injection for physical_remove + cloudsql::common::FaultInjection::instance().set_fault(cloudsql::common::FAULT_PHYSICAL_REMOVE); + + // ROLLBACK — should hit the error branch inside undo_transaction + static_cast(exec.execute(*Parser(std::make_unique("ROLLBACK")).parse_statement())); + + // Clear fault + cloudsql::common::FaultInjection::instance().clear(); + + static_cast(std::remove("./test_data/phys_fault.heap")); +} + +TEST(TransactionManagerTests, UndoIndexInsertFailure) { + // Test FAULT_INDEX_INSERT branch in undo_transaction (DELETE undo path) + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/idx_insert_fault.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + static_cast(std::remove("./test_data/idx_ins_fault.heap")); + static_cast(std::remove("./test_data/idx_ins_fault.idx")); + + static_cast(exec.execute( + *Parser(std::make_unique("CREATE TABLE idx_ins_fault (id INT, val INT)")) + .parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("CREATE INDEX idx_if ON idx_ins_fault (val)")) + .parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("INSERT INTO idx_ins_fault VALUES (1, 100)")) + .parse_statement())); + static_cast(exec.execute(*Parser(std::make_unique("COMMIT")).parse_statement())); + + // Delete + ROLLBACK with index insert fault + static_cast(exec.execute(*Parser(std::make_unique("BEGIN")).parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("DELETE FROM idx_ins_fault WHERE id = 1")) + .parse_statement())); + + cloudsql::common::FaultInjection::instance().set_fault(cloudsql::common::FAULT_INDEX_INSERT); + static_cast(exec.execute(*Parser(std::make_unique("ROLLBACK")).parse_statement())); + cloudsql::common::FaultInjection::instance().clear(); + + static_cast(std::remove("./test_data/idx_ins_fault.heap")); + static_cast(std::remove("./test_data/idx_ins_fault.idx")); +} + +TEST(TransactionManagerTests, UndoIndexRemoveFailure) { + // Test FAULT_INDEX_REMOVE branch in undo_transaction (UPDATE undo path) + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/idx_rm_fault.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + static_cast(std::remove("./test_data/idx_rm_fault.heap")); + static_cast(std::remove("./test_data/idx_rm_fault.idx")); + + static_cast( + exec.execute(*Parser(std::make_unique("CREATE TABLE idx_rm_fault (id INT, val INT)")) + .parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("CREATE INDEX idx_rf ON idx_rm_fault (val)")) + .parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("INSERT INTO idx_rm_fault VALUES (1, 100)")) + .parse_statement())); + static_cast(exec.execute(*Parser(std::make_unique("COMMIT")).parse_statement())); + + static_cast(exec.execute(*Parser(std::make_unique("BEGIN")).parse_statement())); + static_cast(exec.execute( + *Parser(std::make_unique("UPDATE idx_rm_fault SET val = 999 WHERE id = 1")) + .parse_statement())); + + cloudsql::common::FaultInjection::instance().set_fault(cloudsql::common::FAULT_INDEX_REMOVE); + static_cast(exec.execute(*Parser(std::make_unique("ROLLBACK")).parse_statement())); + cloudsql::common::FaultInjection::instance().clear(); + + static_cast(std::remove("./test_data/idx_rm_fault.heap")); + static_cast(std::remove("./test_data/idx_rm_fault.idx")); +} + +TEST(TransactionManagerTests, DoubleCommit) { + // Test COMMITTED early-return branch in commit() — line 78-80 + auto catalog = Catalog::create(); + storage::StorageManager disk_manager("./test_data"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, + disk_manager); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, bpm.get_log_manager()); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + // First commit + tm.commit(txn); + EXPECT_EQ(txn->get_state(), TransactionState::COMMITTED); + + // Second commit — should return early (line 79 check) + tm.commit(txn); + EXPECT_EQ(txn->get_state(), TransactionState::COMMITTED); +} + +TEST(TransactionManagerTests, DoubleAbort) { + // Test ABORTED early-return branch in abort() — line 117-119 + auto catalog = Catalog::create(); + storage::StorageManager disk_manager("./test_data"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, + disk_manager); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, bpm.get_log_manager()); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + // First abort + tm.abort(txn); + EXPECT_EQ(txn->get_state(), TransactionState::ABORTED); + + // Second abort — should return early (line 118 check) + tm.abort(txn); + EXPECT_EQ(txn->get_state(), TransactionState::ABORTED); +} + +TEST(TransactionManagerTests, CommitWithoutLogManager) { + // Test commit() with nullptr log_manager — lines 83-88 (log_manager_ == nullptr path) + auto catalog = Catalog::create(); + storage::StorageManager disk_manager("./test_data"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, + disk_manager); + LockManager lm; + // Pass nullptr explicitly for log_manager + TransactionManager tm(lm, *catalog, bpm, nullptr); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + // commit() with no log manager should work (just skip logging) + tm.commit(txn); + EXPECT_EQ(txn->get_state(), TransactionState::COMMITTED); +} + +TEST(TransactionManagerTests, AbortWithoutLogManager) { + // Test abort() with nullptr log_manager — lines 127-132 (log_manager_ == nullptr path) + auto catalog = Catalog::create(); + storage::StorageManager disk_manager("./test_data"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, + disk_manager); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, nullptr); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + // abort() with no log manager should work (just skip logging) + tm.abort(txn); + EXPECT_EQ(txn->get_state(), TransactionState::ABORTED); +} + +TEST(TransactionManagerTests, UndoLogReferencesNonExistentTable) { + // Test table metadata not found branch — line 168 + auto catalog = Catalog::create(); + storage::StorageManager disk_manager("./test_data"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, + disk_manager); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, bpm.get_log_manager()); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + // Manually add an undo log referencing a non-existent table + txn->add_undo_log(UndoLog::Type::INSERT, "nonexistent_table", HeapTable::TupleId(99, 99)); + + // abort() should hit the table metadata lookup failure (line 168) + tm.abort(txn); + EXPECT_EQ(txn->get_state(), TransactionState::ABORTED); +} + +TEST(TransactionManagerTests, UndoLogInsertRIDNotFound) { + // Test table.get() returns false in INSERT undo — line 188 + // We create a table, insert a row, then manually set an undo log with a bad RID + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/ins_rid_log.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + static_cast(std::remove("./test_data/ins_rid.heap")); + + static_cast( + exec.execute(*Parser(std::make_unique("CREATE TABLE ins_rid (id INT, val INT)")) + .parse_statement())); + static_cast(exec.execute( + *Parser(std::make_unique("INSERT INTO ins_rid VALUES (1, 100)")).parse_statement())); + static_cast(exec.execute(*Parser(std::make_unique("COMMIT")).parse_statement())); + + // Begin, manually add INSERT undo with non-existent RID, abort + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + txn->add_undo_log(UndoLog::Type::INSERT, "ins_rid", HeapTable::TupleId(999, 999)); + + tm.abort(txn); + EXPECT_EQ(txn->get_state(), TransactionState::ABORTED); + + static_cast(std::remove("./test_data/ins_rid.heap")); +} + +TEST(TransactionManagerTests, UndoLogDeleteRIDNotFound) { + // Test table.undo_remove() returns false in DELETE undo — line 211 + // We create a table, insert a row, manually add DELETE undo with bad RID, then abort + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/del_rid_log.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + static_cast(std::remove("./test_data/del_rid.heap")); + + static_cast(exec.execute( + *Parser(std::make_unique("CREATE TABLE del_rid (id INT)")).parse_statement())); + static_cast(exec.execute( + *Parser(std::make_unique("INSERT INTO del_rid VALUES (1)")).parse_statement())); + static_cast(exec.execute(*Parser(std::make_unique("COMMIT")).parse_statement())); + + // Begin, manually add DELETE undo with non-existent RID, abort + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + // Use RID that doesn't exist — undo_remove will return false + txn->add_undo_log(UndoLog::Type::DELETE, "del_rid", HeapTable::TupleId(999, 999)); + + tm.abort(txn); + EXPECT_EQ(txn->get_state(), TransactionState::ABORTED); + + static_cast(std::remove("./test_data/del_rid.heap")); +} + +TEST(TransactionManagerTests, UpdateUndoNewTupleNotFound) { + // Test table.get() returns false in UPDATE undo path — line 238 + // Manually add UPDATE undo log with non-existent RID + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/upd_new_notfound.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + static_cast(std::remove("./test_data/upd_new.heap")); + static_cast(std::remove("./test_data/upd_new.idx")); + + static_cast( + exec.execute(*Parser(std::make_unique("CREATE TABLE upd_new (id INT, val INT)")) + .parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("CREATE INDEX idx_un ON upd_new (val)")) + .parse_statement())); + static_cast(exec.execute( + *Parser(std::make_unique("INSERT INTO upd_new VALUES (1, 100)")).parse_statement())); + static_cast(exec.execute(*Parser(std::make_unique("COMMIT")).parse_statement())); + + // Begin, manually add UPDATE undo with non-existent new RID (but has old_rid) + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + HeapTable::TupleId bad_rid(999, 999); + HeapTable::TupleId old_rid(1, 1); + txn->add_undo_log(UndoLog::Type::UPDATE, "upd_new", bad_rid, old_rid); + + // abort() — table.get(new_rid) returns false, skips index loop (line 238-239 branch) + tm.abort(txn); + EXPECT_EQ(txn->get_state(), TransactionState::ABORTED); + + static_cast(std::remove("./test_data/upd_new.heap")); + static_cast(std::remove("./test_data/upd_new.idx")); +} + +TEST(TransactionManagerTests, UpdateUndoOldTupleNotFound) { + // Test table.get() returns false for old_tuple in UPDATE undo — line 265 + // UPDATE undo where undo_remove succeeds but old_tuple get() fails + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/upd_old_notfound.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + static_cast(std::remove("./test_data/upd_old.heap")); + static_cast(std::remove("./test_data/upd_old.idx")); + + static_cast( + exec.execute(*Parser(std::make_unique("CREATE TABLE upd_old (id INT, val INT)")) + .parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("CREATE INDEX idx_uo ON upd_old (val)")) + .parse_statement())); + static_cast(exec.execute( + *Parser(std::make_unique("INSERT INTO upd_old VALUES (1, 100)")).parse_statement())); + static_cast(exec.execute(*Parser(std::make_unique("COMMIT")).parse_statement())); + + // Begin, add UPDATE undo with non-existent old RID + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + HeapTable::TupleId new_rid(1, 2); + HeapTable::TupleId bad_old_rid(999, 999); + txn->add_undo_log(UndoLog::Type::UPDATE, "upd_old", new_rid, bad_old_rid); + + // abort() — undo_remove(old_rid) returns false (line 260), hits error path + tm.abort(txn); + EXPECT_EQ(txn->get_state(), TransactionState::ABORTED); + + static_cast(std::remove("./test_data/upd_old.heap")); + static_cast(std::remove("./test_data/upd_old.idx")); +} + +TEST(TransactionManagerTests, UpdateUndoWithIndexInsertFault) { + // Test FAULT_INDEX_INSERT in UPDATE undo old_rid restore path — line 272 + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/upd_idx_ins_fault.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + static_cast(std::remove("./test_data/upd_ii_fault.heap")); + static_cast(std::remove("./test_data/upd_ii_fault.idx")); + + static_cast( + exec.execute(*Parser(std::make_unique("CREATE TABLE upd_ii_fault (id INT, val INT)")) + .parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("CREATE INDEX idx_ui ON upd_ii_fault (val)")) + .parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("INSERT INTO upd_ii_fault VALUES (1, 100)")) + .parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("INSERT INTO upd_ii_fault VALUES (2, 200)")) + .parse_statement())); + static_cast(exec.execute(*Parser(std::make_unique("COMMIT")).parse_statement())); + + // Begin + UPDATE (creates new version with old_rid pointing to original) + static_cast(exec.execute(*Parser(std::make_unique("BEGIN")).parse_statement())); + static_cast(exec.execute( + *Parser(std::make_unique("UPDATE upd_ii_fault SET val = 999 WHERE id = 1")) + .parse_statement())); + + // Fault inject index insert for old_rid restore during abort + cloudsql::common::FaultInjection::instance().set_fault(cloudsql::common::FAULT_INDEX_INSERT); + static_cast(exec.execute(*Parser(std::make_unique("ROLLBACK")).parse_statement())); + cloudsql::common::FaultInjection::instance().clear(); + + static_cast(std::remove("./test_data/upd_ii_fault.heap")); + static_cast(std::remove("./test_data/upd_ii_fault.idx")); +} + +TEST(TransactionManagerTests, Commit101Transactions) { + // Test completed_transactions_ overflow — lines 111, 155 + // The 101st commit should trigger pop_front() + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/overflow_log.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + + // Commit 101 transactions — the 101st will overflow the deque (size > 100) + for (int i = 0; i < 101; ++i) { + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + tm.commit(txn); + } +} + +TEST(TransactionManagerTests, Abort101Transactions) { + // Test completed_transactions_ overflow via abort path — line 155 + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/abort_overflow_log.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + + // Abort 101 transactions — the 101st will overflow the deque + for (int i = 0; i < 101; ++i) { + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + tm.abort(txn); + } +} + +TEST(TransactionManagerTests, UpdateUndoBothTuplesFound) { + // Test UPDATE undo full path: new_tuple found, old_rid undo_remove succeeds, + // old_tuple found, index insert succeeds — lines 238, 244, 253, 260, 265, 272 + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/upd_both_found.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + static_cast(std::remove("./test_data/upd_both.heap")); + static_cast(std::remove("./test_data/upd_both.idx")); + + static_cast( + exec.execute(*Parser(std::make_unique("CREATE TABLE upd_both (id INT, val INT)")) + .parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("CREATE INDEX idx_ub ON upd_both (val)")) + .parse_statement())); + // Insert two rows + static_cast( + exec.execute(*Parser(std::make_unique("INSERT INTO upd_both VALUES (1, 100)")) + .parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("INSERT INTO upd_both VALUES (2, 200)")) + .parse_statement())); + static_cast(exec.execute(*Parser(std::make_unique("COMMIT")).parse_statement())); + + // UPDATE id=1, then ROLLBACK — should hit all branches in UPDATE undo path + static_cast(exec.execute(*Parser(std::make_unique("BEGIN")).parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("UPDATE upd_both SET val = 999 WHERE id = 1")) + .parse_statement())); + + // Verify update visible + const auto res = + exec.execute(*Parser(std::make_unique("SELECT val FROM upd_both WHERE id = 1")) + .parse_statement()); + EXPECT_EQ(std::stoi(res.rows()[0].get(0).to_string()), 999); + + static_cast(exec.execute(*Parser(std::make_unique("ROLLBACK")).parse_statement())); + + // After rollback, original value should be back + const auto res_after = + exec.execute(*Parser(std::make_unique("SELECT val FROM upd_both WHERE id = 1")) + .parse_statement()); + EXPECT_EQ(std::stoi(res_after.rows()[0].get(0).to_string()), 100); + + static_cast(std::remove("./test_data/upd_both.heap")); + static_cast(std::remove("./test_data/upd_both.idx")); +} + +TEST(TransactionManagerTests, DeleteUndoWithIndexRestore) { + // Test DELETE undo path with index: undo_remove succeeds, table.get finds tuple, + // index.insert succeeds — lines 211, 216, 222 + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/del_idx_restore.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + static_cast(std::remove("./test_data/del_idx.heap")); + static_cast(std::remove("./test_data/del_idx.idx")); + + static_cast( + exec.execute(*Parser(std::make_unique("CREATE TABLE del_idx (id INT, val INT)")) + .parse_statement())); + static_cast( + exec.execute(*Parser(std::make_unique("CREATE INDEX idx_di ON del_idx (val)")) + .parse_statement())); + static_cast(exec.execute( + *Parser(std::make_unique("INSERT INTO del_idx VALUES (1, 100)")).parse_statement())); + static_cast(exec.execute(*Parser(std::make_unique("COMMIT")).parse_statement())); + + // DELETE, then ROLLBACK — should hit DELETE undo path with index restore + static_cast(exec.execute(*Parser(std::make_unique("BEGIN")).parse_statement())); + static_cast(exec.execute( + *Parser(std::make_unique("DELETE FROM del_idx WHERE id = 1")).parse_statement())); + + // Verify delete visible + const auto res = + exec.execute(*Parser(std::make_unique("SELECT * FROM del_idx")).parse_statement()); + EXPECT_EQ(res.row_count(), 0U); + + static_cast(exec.execute(*Parser(std::make_unique("ROLLBACK")).parse_statement())); + + // Row should be restored + const auto res_after = + exec.execute(*Parser(std::make_unique("SELECT * FROM del_idx")).parse_statement()); + EXPECT_EQ(res_after.row_count(), 1U); + + static_cast(std::remove("./test_data/del_idx.heap")); + static_cast(std::remove("./test_data/del_idx.idx")); +} + +TEST(TransactionManagerTests, AbortCommittedTxn) { + // Test abort() path when transaction is already COMMITTED + // abort() should skip undo_transaction and proceed to lock release + state change + auto catalog = Catalog::create(); + storage::StorageManager disk_manager("./test_data"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, + disk_manager); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, bpm.get_log_manager()); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + tm.commit(txn); + EXPECT_EQ(txn->get_state(), TransactionState::COMMITTED); + + // Force state to COMMITTED before calling abort to hit the state check at line 123 + txn->set_state(TransactionState::COMMITTED); + tm.abort(txn); + EXPECT_EQ(txn->get_state(), TransactionState::ABORTED); +} + +TEST(TransactionManagerTests, PrepareOnPreparedTxn) { + // Test prepare() when transaction is already PREPARED + // Second prepare() should return early at line 63-64 (state != RUNNING) + auto catalog = Catalog::create(); + storage::StorageManager disk_manager("./test_data"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, + disk_manager); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, bpm.get_log_manager()); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + tm.prepare(txn); + EXPECT_EQ(txn->get_state(), TransactionState::PREPARED); + + // Second prepare on already PREPARED should hit early-return branch + tm.prepare(txn); + EXPECT_EQ(txn->get_state(), TransactionState::PREPARED); +} + +TEST(TransactionManagerTests, UndoEmptyLogs) { + // Test undo_transaction when transaction has no undo logs + // The reverse iteration loop should execute 0 times + auto catalog = Catalog::create(); + storage::StorageManager disk_manager("./test_data"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, + disk_manager); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, bpm.get_log_manager()); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + // txn has no undo logs - abort() calls undo_transaction with empty logs + tm.abort(txn); + EXPECT_EQ(txn->get_state(), TransactionState::ABORTED); +} + +TEST(TransactionManagerTests, CommitIdempotentTwoCalls) { + // Test commit() called twice on same transaction + // Second call should return early at line 79 (state == COMMITTED) + auto catalog = Catalog::create(); + storage::StorageManager disk_manager("./test_data"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, + disk_manager); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, bpm.get_log_manager()); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + tm.commit(txn); + EXPECT_EQ(txn->get_state(), TransactionState::COMMITTED); + + // Second commit - early return at line 79-80 + tm.commit(txn); + EXPECT_EQ(txn->get_state(), TransactionState::COMMITTED); +} + +TEST(TransactionManagerTests, CommitWithLogFailure) { + // Test commit() when log_manager_->append_log_record returns false + // Should hit the error branch in commit() + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/commit_log_fail.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + // Arm fault injection for log commit + cloudsql::common::FaultInjection::instance().set_fault(cloudsql::common::FAULT_LOG_COMMIT); + + // commit() should still complete even if logging fails + tm.commit(txn); + EXPECT_EQ(txn->get_state(), TransactionState::COMMITTED); + + cloudsql::common::FaultInjection::instance().clear(); +} + +TEST(TransactionManagerTests, AbortWithLogFailure) { + // Test abort() when log_manager_->append_log_record returns false + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/abort_log_fail.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + // Arm fault injection for log abort + cloudsql::common::FaultInjection::instance().set_fault(cloudsql::common::FAULT_LOG_ABORT); + + // abort() should still complete even if logging fails + tm.abort(txn); + EXPECT_EQ(txn->get_state(), TransactionState::ABORTED); + + cloudsql::common::FaultInjection::instance().clear(); +} + +TEST(TransactionManagerTests, PrepareWithLogFailure) { + // Test prepare() when log_manager_->append_log_record returns false + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/prepare_log_fail.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + // Arm fault injection for log prepare + cloudsql::common::FaultInjection::instance().set_fault(cloudsql::common::FAULT_LOG_PREPARE); + + // prepare() should still set PREPARED state even if logging fails + tm.prepare(txn); + EXPECT_EQ(txn->get_state(), TransactionState::PREPARED); + + cloudsql::common::FaultInjection::instance().clear(); +} + +TEST(TransactionManagerTests, PrepareWithoutLogManager) { + // Test prepare() with nullptr log_manager — lines 67-72 (log_manager_ == nullptr path) + auto catalog = Catalog::create(); + storage::StorageManager disk_manager("./test_data"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, + disk_manager); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, nullptr); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + EXPECT_EQ(txn->get_state(), TransactionState::RUNNING); + + // prepare() with no log manager should still set PREPARED state + tm.prepare(txn); + EXPECT_EQ(txn->get_state(), TransactionState::PREPARED); +} + +TEST(TransactionManagerTests, BeginWithoutLogManager) { + // Test begin() with nullptr log_manager — lines 53-56 (log_manager_ == nullptr path) + auto catalog = Catalog::create(); + storage::StorageManager disk_manager("./test_data"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, + disk_manager); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, nullptr); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + EXPECT_EQ(txn->get_state(), TransactionState::RUNNING); + // begin() should return without logging when log_manager_ is nullptr +} + +TEST(TransactionManagerTests, CommitOverflowThreshold) { + // Test completed_transactions_ overflow with reduced threshold (10 instead of 100) + // With MAX_COMPLETED=10, the 11th commit triggers pop_front() + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/overflow10_log.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + + for (int i = 0; i < 15; ++i) { + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + tm.commit(txn); + } +} + +TEST(TransactionManagerTests, AbortOverflowThreshold) { + // Test completed_transactions_ overflow via abort with reduced threshold + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/abort_overflow10_log.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + + for (int i = 0; i < 15; ++i) { + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + tm.abort(txn); + } +} + +TEST(TransactionManagerTests, UndoLogUnknownType) { + // Test the default case in switch when undo log type is UNKNOWN + storage::StorageManager disk_manager("./test_data"); + disk_manager.create_dir_if_not_exists(); + recovery::LogManager log_mgr("./test_data/unknown_log.dat"); + storage::BufferPoolManager bpm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager, + &log_mgr); + auto catalog = Catalog::create(); + LockManager lm; + TransactionManager tm(lm, *catalog, bpm, &log_mgr); + executor::QueryExecutor exec(*catalog, bpm, lm, tm); + + static_cast(std::remove("./test_data/unknown_test.heap")); + + // Create a real table + static_cast(exec.execute( + *Parser(std::make_unique("CREATE TABLE unknown_test (id INT)")).parse_statement())); + static_cast(exec.execute(*Parser(std::make_unique("COMMIT")).parse_statement())); + + Transaction* txn = tm.begin(); + ASSERT_NE(txn, nullptr); + + // Add an undo log with UNKNOWN type for a real table + txn->add_undo_log_for_test( + {UndoLog::Type::UNKNOWN, "unknown_test", HeapTable::TupleId(1, 1), std::nullopt}); + + // abort() should hit the default case in the switch after table lookup succeeds + tm.abort(txn); + EXPECT_EQ(txn->get_state(), TransactionState::ABORTED); + + static_cast(std::remove("./test_data/unknown_test.heap")); +} + } // namespace