Skip to content
61 changes: 61 additions & 0 deletions include/common/fault_injection.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* @file fault_injection.hpp
* @brief Fault injection utility for testing error handling paths
*/

#pragma once

#include <atomic>
#include <cstdint>

namespace cloudsql {
namespace common {

// Fault injection modes — used as values for the fault injection flag
enum FaultMode : int {
FAULT_NONE = 0,
FAULT_LOG_COMMIT,
FAULT_LOG_ABORT,
FAULT_LOG_PREPARE,
FAULT_INDEX_REMOVE,
FAULT_INDEX_INSERT,
FAULT_PHYSICAL_REMOVE,
FAULT_UNDO_REMOVE,
};

class FaultInjection {
public:
static FaultInjection& instance();

void set_fault(FaultMode mode);
void clear();
bool should_fault(FaultMode mode) const;

private:
FaultInjection() = default;

std::atomic<int> fault_mode_{FAULT_NONE};
};

inline FaultInjection& FaultInjection::instance() {
static FaultInjection inst;
return inst;
}

inline void FaultInjection::set_fault(FaultMode mode) {
fault_mode_.store(static_cast<int>(mode), std::memory_order_release);
}

inline void FaultInjection::clear() {
fault_mode_.store(static_cast<int>(FAULT_NONE), std::memory_order_release);
}

inline bool FaultInjection::should_fault(FaultMode mode) const {
return fault_mode_.load(std::memory_order_acquire) == static_cast<int>(mode);
}

// FAULT_IF(mode) — returns true when fault is armed for that mode
#define FAULT_IF(mode) (cloudsql::common::FaultInjection::instance().should_fault(mode))

} // namespace common
} // namespace cloudsql
7 changes: 4 additions & 3 deletions include/recovery/log_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,16 @@ class LogManager {
/**
* @brief Append a log record to the buffer
* @param log_record The record to append (LSN will be set)
* @return The LSN of the appended record
* @return True if appended successfully, false otherwise
*/
lsn_t append_log_record(LogRecord& log_record);
bool append_log_record(LogRecord& log_record);

/**
* @brief Flush log buffer to disk
* @param force If true, force flush even if buffer is not full
* @return True if flush succeeded, false otherwise
*/
void flush(bool force = false);
bool flush(bool force = false);

/**
* @brief Get the persistent LSN (flushed to disk)
Expand Down
7 changes: 5 additions & 2 deletions include/transaction/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ struct TransactionSnapshot {
* @brief Represents a change that can be undone
*/
struct UndoLog {
enum class Type : uint8_t { INSERT, DELETE, UPDATE };
enum class Type : uint8_t { INSERT, DELETE, UPDATE, UNKNOWN };
Type type = Type::INSERT;
std::string table_name;
storage::HeapTable::TupleId rid;
Expand Down Expand Up @@ -128,7 +128,7 @@ class Transaction {
void add_undo_log(UndoLog::Type type, const std::string& table_name,
const storage::HeapTable::TupleId& rid) {
/* Enforce invariant: non-UPDATE types should not provide old_rid through this overload */
assert(type != UndoLog::Type::UPDATE);
assert(type != UndoLog::Type::UPDATE && type != UndoLog::Type::UNKNOWN);
undo_logs_.push_back({type, table_name, rid, std::nullopt});
}

Expand All @@ -141,6 +141,9 @@ class Transaction {
}

[[nodiscard]] const std::vector<UndoLog>& get_undo_logs() const { return undo_logs_; }

// Test-only: add undo log with any type including UNKNOWN
void add_undo_log_for_test(UndoLog log) { undo_logs_.push_back(log); }
};

} // namespace cloudsql::transaction
Expand Down
14 changes: 11 additions & 3 deletions src/recovery/log_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <string>
#include <utility>

#include "common/fault_injection.hpp"
#include "recovery/log_record.hpp"

namespace cloudsql::recovery {
Expand Down Expand Up @@ -64,7 +65,13 @@ void LogManager::stop_flush_thread() {
}
}

lsn_t LogManager::append_log_record(LogRecord& log_record) {
bool LogManager::append_log_record(LogRecord& log_record) {
if (FAULT_IF(cloudsql::common::FAULT_LOG_COMMIT) ||
FAULT_IF(cloudsql::common::FAULT_LOG_ABORT) ||
FAULT_IF(cloudsql::common::FAULT_LOG_PREPARE)) {
return false;
}

const std::unique_lock<std::mutex> lock(latch_);

// If record size > buffer size, flush first
Expand All @@ -82,13 +89,14 @@ lsn_t LogManager::append_log_record(LogRecord& log_record) {
std::next(log_buffer_, static_cast<std::ptrdiff_t>(log_buffer_offset_))));
log_buffer_offset_ += record_size;

return lsn;
return true;
}

void LogManager::flush(bool force) {
bool LogManager::flush(bool force) {
(void)force;
const std::unique_lock<std::mutex> lock(latch_);
flush_internal();
return true;
}

void LogManager::flush_internal() {
Expand Down
65 changes: 44 additions & 21 deletions src/transaction/transaction_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <utility>

#include "catalog/catalog.hpp"
#include "common/fault_injection.hpp"
#include "executor/types.hpp"
#include "recovery/log_manager.hpp"
#include "recovery/log_record.hpp"
Expand Down Expand Up @@ -51,8 +52,9 @@ Transaction* TransactionManager::begin(IsolationLevel level) {

if (log_manager_ != nullptr) {
recovery::LogRecord record(txn_id, txn_ptr->get_prev_lsn(), recovery::LogRecordType::BEGIN);
const recovery::lsn_t lsn = log_manager_->append_log_record(record);
txn_ptr->set_prev_lsn(lsn);
if (log_manager_->append_log_record(record)) {
txn_ptr->set_prev_lsn(record.lsn_);
}
}

return txn_ptr;
Expand All @@ -66,9 +68,12 @@ void TransactionManager::prepare(Transaction* txn) {
if (log_manager_ != nullptr) {
recovery::LogRecord record(txn->get_id(), txn->get_prev_lsn(),
recovery::LogRecordType::PREPARE);
const recovery::lsn_t lsn = log_manager_->append_log_record(record);
txn->set_prev_lsn(lsn);
log_manager_->flush(true);
if (!log_manager_->append_log_record(record)) {
std::cerr << "Failed to log prepare\n";
} else {
txn->set_prev_lsn(record.lsn_);
log_manager_->flush(true);
}
}

txn->set_state(TransactionState::PREPARED);
Expand All @@ -82,9 +87,12 @@ void TransactionManager::commit(Transaction* txn) {
if (log_manager_ != nullptr) {
recovery::LogRecord record(txn->get_id(), txn->get_prev_lsn(),
recovery::LogRecordType::COMMIT);
const recovery::lsn_t lsn = log_manager_->append_log_record(record);
txn->set_prev_lsn(lsn);
log_manager_->flush(true);
if (!log_manager_->append_log_record(record)) {
std::cerr << "Failed to log commit\n";
} else {
txn->set_prev_lsn(record.lsn_);
log_manager_->flush(true);
}
}

const auto lock_set = txn->get_shared_lock_set();
Expand All @@ -106,7 +114,7 @@ void TransactionManager::commit(Transaction* txn) {
active_transactions_.erase(it);
}

constexpr std::size_t MAX_COMPLETED = 100;
constexpr std::size_t MAX_COMPLETED = 10;
if (completed_transactions_.size() > MAX_COMPLETED) {
completed_transactions_.pop_front();
}
Expand All @@ -126,9 +134,12 @@ void TransactionManager::abort(Transaction* txn) {
if (log_manager_ != nullptr) {
recovery::LogRecord record(txn->get_id(), txn->get_prev_lsn(),
recovery::LogRecordType::ABORT);
const recovery::lsn_t lsn = log_manager_->append_log_record(record);
txn->set_prev_lsn(lsn);
log_manager_->flush(true);
if (!log_manager_->append_log_record(record)) {
std::cerr << "Failed to log abort\n";
} else {
txn->set_prev_lsn(record.lsn_);
log_manager_->flush(true);
}
}

const auto lock_set = txn->get_shared_lock_set();
Expand All @@ -150,7 +161,7 @@ void TransactionManager::abort(Transaction* txn) {
active_transactions_.erase(it);
}

constexpr std::size_t MAX_COMPLETED = 100;
constexpr std::size_t MAX_COMPLETED = 10;
if (completed_transactions_.size() > MAX_COMPLETED) {
completed_transactions_.pop_front();
}
Expand Down Expand Up @@ -190,7 +201,8 @@ bool TransactionManager::undo_transaction(Transaction* txn) {
uint16_t pos = idx_info.column_positions[0];
common::ValueType ktype = table_meta->columns[pos].type;
storage::BTreeIndex index(idx_info.name, bpm_, ktype);
if (!index.remove(tuple.get(pos), log.rid)) {
if (!index.remove(tuple.get(pos), log.rid) ||
FAULT_IF(cloudsql::common::FAULT_INDEX_REMOVE)) {
std::cerr << "Rollback ERROR: Index remove failed for table '"
<< log.table_name << "', index '" << idx_info.name
<< "'\n";
Expand All @@ -199,15 +211,16 @@ bool TransactionManager::undo_transaction(Transaction* txn) {
}
}
}
if (!table.physical_remove(log.rid)) {
if (!table.physical_remove(log.rid) ||
FAULT_IF(cloudsql::common::FAULT_PHYSICAL_REMOVE)) {
std::cerr << "Rollback ERROR: physical_remove failed for INSERT undo\n";
success = false;
}
break;
}
case UndoLog::Type::DELETE: {
/* For DELETE undo, reset xmax and re-insert into indexes */
if (!table.undo_remove(log.rid)) {
if (!table.undo_remove(log.rid) || FAULT_IF(cloudsql::common::FAULT_UNDO_REMOVE)) {
std::cerr << "Rollback ERROR: undo_remove failed for DELETE undo\n";
success = false;
} else {
Expand All @@ -218,7 +231,8 @@ bool TransactionManager::undo_transaction(Transaction* txn) {
uint16_t pos = idx_info.column_positions[0];
common::ValueType ktype = table_meta->columns[pos].type;
storage::BTreeIndex index(idx_info.name, bpm_, ktype);
if (!index.insert(tuple.get(pos), log.rid)) {
if (!index.insert(tuple.get(pos), log.rid) ||
FAULT_IF(cloudsql::common::FAULT_INDEX_INSERT)) {
std::cerr << "Rollback ERROR: Index insert failed for table '"
<< log.table_name << "', index '" << idx_info.name
<< "'\n";
Expand All @@ -240,7 +254,8 @@ bool TransactionManager::undo_transaction(Transaction* txn) {
uint16_t pos = idx_info.column_positions[0];
common::ValueType ktype = table_meta->columns[pos].type;
storage::BTreeIndex index(idx_info.name, bpm_, ktype);
if (!index.remove(new_tuple.get(pos), log.rid)) {
if (!index.remove(new_tuple.get(pos), log.rid) ||
FAULT_IF(cloudsql::common::FAULT_INDEX_REMOVE)) {
std::cerr << "Rollback ERROR: Index remove failed for table '"
<< log.table_name << "', index '" << idx_info.name
<< "'\n";
Expand All @@ -249,14 +264,16 @@ bool TransactionManager::undo_transaction(Transaction* txn) {
}
}
}
if (!table.physical_remove(log.rid)) {
if (!table.physical_remove(log.rid) ||
FAULT_IF(cloudsql::common::FAULT_PHYSICAL_REMOVE)) {
std::cerr << "Rollback ERROR: physical_remove failed for new version in UPDATE "
"undo\n";
success = false;
}

if (log.old_rid.has_value()) {
if (!table.undo_remove(log.old_rid.value())) {
if (!table.undo_remove(log.old_rid.value()) ||
FAULT_IF(cloudsql::common::FAULT_UNDO_REMOVE)) {
std::cerr << "Rollback ERROR: undo_remove failed for old version in UPDATE "
"undo\n";
success = false;
Expand All @@ -268,7 +285,8 @@ bool TransactionManager::undo_transaction(Transaction* txn) {
uint16_t pos = idx_info.column_positions[0];
common::ValueType ktype = table_meta->columns[pos].type;
storage::BTreeIndex index(idx_info.name, bpm_, ktype);
if (!index.insert(old_tuple.get(pos), log.old_rid.value())) {
if (!index.insert(old_tuple.get(pos), log.old_rid.value()) ||
FAULT_IF(cloudsql::common::FAULT_INDEX_INSERT)) {
std::cerr
<< "Rollback ERROR: Index insert failed for table '"
<< log.table_name << "', index '" << idx_info.name
Expand All @@ -282,6 +300,11 @@ bool TransactionManager::undo_transaction(Transaction* txn) {
}
break;
}
default: {
std::cerr << "Rollback ERROR: Unknown undo log type\n";
success = false;
break;
}
}
}
return success;
Expand Down
12 changes: 6 additions & 6 deletions tests/recovery_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,16 @@ TEST(RecoveryTests, LogManagerBasic) {

// Append a few logs
LogRecord qlog1(1, -1, LogRecordType::BEGIN);
const lsn_t lsn1 = log_manager.append_log_record(qlog1);
EXPECT_EQ(lsn1, 0);
ASSERT_TRUE(log_manager.append_log_record(qlog1));
EXPECT_EQ(qlog1.lsn_, 0);

LogRecord qlog2(1, lsn1, LogRecordType::COMMIT);
const lsn_t lsn2 = log_manager.append_log_record(qlog2);
EXPECT_EQ(lsn2, 1);
LogRecord qlog2(1, qlog1.lsn_, LogRecordType::COMMIT);
ASSERT_TRUE(log_manager.append_log_record(qlog2));
EXPECT_EQ(qlog2.lsn_, 1);

// Wait for flush
log_manager.flush(true);
EXPECT_GE(log_manager.get_persistent_lsn(), lsn2);
EXPECT_GE(log_manager.get_persistent_lsn(), qlog2.lsn_);
}

// Verify file content size roughly
Expand Down
Loading
Loading