Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a520346
build: expand config with distributed run modes and ports
poyrazK Mar 1, 2026
2816ad5
build: integrate GMock and enable coverage instrumentation
poyrazK Mar 1, 2026
0174e2d
feat: implement ClusterManager for node discovery and tracking
poyrazK Mar 1, 2026
712672f
feat: add PREPARED state to Transaction state machine
poyrazK Mar 1, 2026
08983b0
feat: add PREPARE record type to WAL for 2PC recovery
poyrazK Mar 1, 2026
c87a049
feat: define internal custom binary RPC protocol
poyrazK Mar 1, 2026
d5361aa
feat: implement internal RPC server transport
poyrazK Mar 1, 2026
73b061e
feat: implement internal RPC client transport
poyrazK Mar 1, 2026
418c3c6
feat: define Raft consensus message types
poyrazK Mar 1, 2026
e8426b6
feat: implement core Raft consensus state machine
poyrazK Mar 1, 2026
61f0479
feat: implement consistent hashing for shard management
poyrazK Mar 1, 2026
f5f82bb
feat: implement 2PC Resource Manager logic in TransactionManager
poyrazK Mar 1, 2026
53fbfce
feat: implement DistributedExecutor with shard routing and 2PC coordi…
poyrazK Mar 1, 2026
a1f2eb7
feat: integrate Raft-backed catalog replication
poyrazK Mar 1, 2026
9425eb0
feat: bootstrap distributed roles and 2PC handlers in main
poyrazK Mar 1, 2026
c414c4e
test: add Raft election and heartbeat simulation tests
poyrazK Mar 1, 2026
c9cfd0a
test: add sharding and 2PC failure simulation tests
poyrazK Mar 1, 2026
42e33ad
test: update server tests for distributed configuration
poyrazK Mar 1, 2026
ea1e95a
style: apply clang-format fixes
poyrazK Mar 1, 2026
00bc145
style: fix struct initialization spacing for CI compliance
poyrazK Mar 1, 2026
14a182d
style: final spacing and multi-line formatting for CI
poyrazK Mar 1, 2026
954b6ec
style: fix remaining clang-tidy warnings and formatting issues
poyrazK Mar 1, 2026
420206c
style: remove extra newline and finalize headers for CI
poyrazK Mar 1, 2026
635023f
fix: address unused variable in main.cpp
poyrazK Mar 1, 2026
b05f2be
fix: rename args to cmd_args in main to avoid shadowing
poyrazK Mar 1, 2026
7f01d81
style: apply clang-format wrapping to cerr in main.cpp
poyrazK Mar 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ set(USE_SANITIZER "address" CACHE STRING "Sanitizer to use: address, thread, und
# Configure Sanitizers
if (NOT USE_SANITIZER STREQUAL "none")
if (CMAKE_CXX_COMPILER_ID MATCHES "Clang|GNU")
set(CMAKE_CXX_COMPILER_ID MATCHES "Clang|GNU")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

find . -name "CMakeLists.txt" -type f

Repository: poyrazK/cloudSQL

Length of output: 76


🏁 Script executed:

cat -n CMakeLists.txt

Repository: poyrazK/cloudSQL

Length of output: 7632


Remove the invalid set(CMAKE_CXX_COMPILER_ID MATCHES ...) assignment on line 23.

Line 23 uses invalid CMake syntax—the set() command does not accept the MATCHES operator. This corrupts CMAKE_CXX_COMPILER_ID and breaks downstream compiler checks on lines 39 and 87. The line is also redundant since it's already within the if (CMAKE_CXX_COMPILER_ID MATCHES "Clang|GNU") block from line 22.

Proposed fix
        if (CMAKE_CXX_COMPILER_ID MATCHES "Clang|GNU")
-           set(CMAKE_CXX_COMPILER_ID MATCHES "Clang|GNU")
            string(REPLACE "," ";" SAN_LIST ${USE_SANITIZER})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@CMakeLists.txt` at line 23, Remove the invalid assignment that uses the
MATCHES operator inside set(): delete the line "set(CMAKE_CXX_COMPILER_ID
MATCHES "Clang|GNU")" so CMAKE_CXX_COMPILER_ID is not overwritten; the correct
approach is to keep the surrounding "if (CMAKE_CXX_COMPILER_ID MATCHES
"Clang|GNU")" condition and remove the redundant set(...) call that breaks
downstream checks referencing CMAKE_CXX_COMPILER_ID (used later around the
existing checks on lines near 39 and 87).

string(REPLACE "," ";" SAN_LIST ${USE_SANITIZER})
foreach (SAN ${SAN_LIST})
list(APPEND SAN_FLAGS "-fsanitize=${SAN}")
Expand Down Expand Up @@ -132,7 +133,7 @@ if (BUILD_TESTS)

macro(add_cloudsql_test name source)
add_executable(${name} ${source})
target_link_libraries(${name} PRIVATE sqlEngineCore GTest::gtest_main)
target_link_libraries(${name} PRIVATE sqlEngineCore GTest::gtest_main GTest::gmock_main)
add_test(NAME ${name} COMMAND ${name})
endmacro()

Expand All @@ -144,6 +145,10 @@ if (BUILD_TESTS)
add_cloudsql_test(recovery_tests tests/recovery_tests.cpp)
add_cloudsql_test(recovery_manager_tests tests/recovery_manager_tests.cpp)
add_cloudsql_test(buffer_pool_tests tests/buffer_pool_tests.cpp)
add_cloudsql_test(raft_tests tests/raft_tests.cpp)
add_cloudsql_test(distributed_tests tests/distributed_tests.cpp)
add_cloudsql_test(raft_sim_tests tests/raft_simulation_tests.cpp)
add_cloudsql_test(distributed_txn_tests tests/distributed_txn_tests.cpp)

add_custom_target(run-tests
COMMAND ${CMAKE_CTEST_COMMAND}
Expand Down
30 changes: 30 additions & 0 deletions include/catalog/catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

namespace cloudsql {

namespace raft {
class RaftNode;
}

// Type aliases
using oid_t = uint32_t;

Expand Down Expand Up @@ -65,6 +69,15 @@ struct IndexInfo {
IndexInfo() = default;
};

/**
* @brief Shard information
*/
struct ShardInfo {
uint32_t shard_id;
std::string node_address;
uint16_t port;
};

/**
* @brief Table information structure
*/
Expand All @@ -73,6 +86,7 @@ struct TableInfo {
std::string name;
std::vector<ColumnInfo> columns;
std::vector<IndexInfo> indexes;
std::vector<ShardInfo> shards; // New: Shard mapping
uint64_t num_rows = 0;
std::string filename;
uint32_t flags = 0;
Expand Down Expand Up @@ -143,6 +157,11 @@ class Catalog {
*/
[[nodiscard]] static std::unique_ptr<Catalog> create();

/**
* @brief Set Raft node for distributed operations
*/
void set_raft_node(raft::RaftNode* raft_node) { raft_node_ = raft_node; }

/**
* @brief Load catalog from file
*/
Expand All @@ -159,11 +178,21 @@ class Catalog {
*/
oid_t create_table(const std::string& table_name, std::vector<ColumnInfo> columns);

/**
* @brief Local-only table creation (called by Raft)
*/
oid_t create_table_local(const std::string& table_name, std::vector<ColumnInfo> columns);

/**
* @brief Drop a table
*/
bool drop_table(oid_t table_id);

/**
* @brief Local-only table drop (called by Raft)
*/
bool drop_table_local(oid_t table_id);

/**
* @brief Get table by ID
*/
Expand Down Expand Up @@ -242,6 +271,7 @@ class Catalog {
DatabaseInfo database_;
oid_t next_oid_ = 1;
uint64_t version_ = 1;
raft::RaftNode* raft_node_ = nullptr;

[[nodiscard]] static uint64_t get_current_time();
};
Expand Down
104 changes: 104 additions & 0 deletions include/common/cluster_manager.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* @file cluster_manager.hpp
* @brief Manager for cluster topology and node health
*/

#ifndef SQL_ENGINE_COMMON_CLUSTER_MANAGER_HPP
#define SQL_ENGINE_COMMON_CLUSTER_MANAGER_HPP

#include <chrono>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include "common/config.hpp"

namespace cloudsql::cluster {

/**
* @brief Represents a node in the cluster
*/
struct NodeInfo {
std::string id;
std::string address;
uint16_t cluster_port = 0;
config::RunMode role = config::RunMode::Standalone;
std::chrono::system_clock::time_point last_heartbeat;
bool is_active = true;
};

/**
* @brief Manages the cluster topology and node discovery
*/
class ClusterManager {
public:
explicit ClusterManager(const config::Config* config) : config_(config) {
// Add self to node map if in distributed mode
if (config_ != nullptr && config_->mode != config::RunMode::Standalone) {
self_node_.id = "local_node"; // Will be replaced by unique ID later
self_node_.address = "127.0.0.1";
self_node_.cluster_port = config_->cluster_port;
self_node_.role = config_->mode;
self_node_.last_heartbeat = std::chrono::system_clock::now();
}
}

/**
* @brief Register a new node in the cluster
*/
void register_node(const std::string& id, const std::string& address, uint16_t port,
config::RunMode role) {
const std::scoped_lock<std::mutex> lock(mutex_);
nodes_[id] = {id, address, port, role, std::chrono::system_clock::now(), true};
}

/**
* @brief Update heartbeat for a node
*/
void heartbeat(const std::string& id) {
const std::scoped_lock<std::mutex> lock(mutex_);
if (nodes_.count(id) != 0U) {
nodes_[id].last_heartbeat = std::chrono::system_clock::now();
nodes_[id].is_active = true;
}
}

/**
* @brief Get list of active data nodes
*/
[[nodiscard]] std::vector<NodeInfo> get_data_nodes() const {
const std::scoped_lock<std::mutex> lock(mutex_);
std::vector<NodeInfo> data_nodes;
for (const auto& [id, info] : nodes_) {
if (info.role == config::RunMode::Data && info.is_active) {
data_nodes.push_back(info);
}
}
return data_nodes;
}

/**
* @brief Get list of active coordinator nodes
*/
[[nodiscard]] std::vector<NodeInfo> get_coordinators() const {
const std::scoped_lock<std::mutex> lock(mutex_);
std::vector<NodeInfo> coordinators;
for (const auto& [id, info] : nodes_) {
if (info.role == config::RunMode::Coordinator && info.is_active) {
coordinators.push_back(info);
}
}
return coordinators;
}

private:
const config::Config* config_;
NodeInfo self_node_;
std::unordered_map<std::string, NodeInfo> nodes_;
mutable std::mutex mutex_;
};

} // namespace cloudsql::cluster

#endif // SQL_ENGINE_COMMON_CLUSTER_MANAGER_HPP
11 changes: 9 additions & 2 deletions include/common/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@ namespace cloudsql::config {
/**
* @brief Run modes for the database engine
*/
enum class RunMode : uint8_t { Embedded = 0, Distributed = 1 };
enum class RunMode : uint8_t {
Standalone = 0, /**< Single process mode (legacy Embedded) */
Coordinator = 1, /**< Distributed coordinator node */
Data = 2 /**< Distributed data storage node */
};

/**
* @brief Server configuration structure (C++ wrapper)
*/
class Config {
public:
static constexpr uint16_t DEFAULT_PORT = 5432;
static constexpr uint16_t DEFAULT_CLUSTER_PORT = 6432;
static constexpr uint16_t MAX_PORT = 65535;
static constexpr const char* DEFAULT_DATA_DIR = "./data";
static constexpr int DEFAULT_MAX_CONNECTIONS = 100;
Expand All @@ -32,9 +37,11 @@ class Config {

// Configuration fields
uint16_t port = DEFAULT_PORT;
uint16_t cluster_port = DEFAULT_CLUSTER_PORT;
std::string data_dir = DEFAULT_DATA_DIR;
std::string config_file;
RunMode mode = RunMode::Embedded;
RunMode mode = RunMode::Standalone;
std::string seed_nodes; // Comma-separated list of coordinator addresses
int max_connections = DEFAULT_MAX_CONNECTIONS;
int buffer_pool_size = DEFAULT_BUFFER_POOL_SIZE;
int page_size = DEFAULT_PAGE_SIZE;
Expand Down
37 changes: 37 additions & 0 deletions include/distributed/distributed_executor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* @file distributed_executor.hpp
* @brief High-level executor for distributed queries
*/
#ifndef SQL_ENGINE_DISTRIBUTED_EXECUTOR_HPP
#define SQL_ENGINE_DISTRIBUTED_EXECUTOR_HPP

#include <memory>
#include <string>

#include "catalog/catalog.hpp"
#include "common/cluster_manager.hpp"
#include "executor/query_executor.hpp"
#include "parser/statement.hpp"

namespace cloudsql::executor {

/**
* @brief Handles distributed query routing and execution
*/
class DistributedExecutor {
public:
DistributedExecutor(Catalog& catalog, cluster::ClusterManager& cm);

/**
* @brief Execute a statement across the cluster
*/
QueryResult execute(const parser::Statement& stmt, const std::string& raw_sql);

private:
Catalog& catalog_;
cluster::ClusterManager& cluster_manager_;
};

} // namespace cloudsql::executor

#endif // SQL_ENGINE_DISTRIBUTED_EXECUTOR_HPP
86 changes: 86 additions & 0 deletions include/distributed/raft_node.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* @file raft_node.hpp
* @brief Raft consensus node implementation
*/

#ifndef SQL_ENGINE_DISTRIBUTED_RAFT_NODE_HPP
#define SQL_ENGINE_DISTRIBUTED_RAFT_NODE_HPP

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <random>
#include <thread>

#include "common/cluster_manager.hpp"
#include "distributed/raft_types.hpp"
#include "network/rpc_client.hpp"
#include "network/rpc_server.hpp"

namespace cloudsql::raft {

/**
* @brief Implementation of a Raft consensus node
*/
class RaftNode {
public:
RaftNode(std::string node_id, cluster::ClusterManager& cluster_manager,
network::RpcServer& rpc_server);
~RaftNode();

// Prevent copying and moving
RaftNode(const RaftNode&) = delete;
RaftNode& operator=(const RaftNode&) = delete;
RaftNode(RaftNode&&) = delete;
RaftNode& operator=(RaftNode&&) = delete;

void start();
void stop();

// Raft RPC Handlers
void handle_request_vote(const network::RpcHeader& header, const std::vector<uint8_t>& payload,
int client_fd);
void handle_append_entries(const network::RpcHeader& header,
const std::vector<uint8_t>& payload, int client_fd);

// Client interface
bool replicate(const std::string& command);
[[nodiscard]] bool is_leader() const { return state_.load() == NodeState::Leader; }

private:
void run_loop();
void do_follower();
void do_candidate();
void do_leader();

void step_down(term_t new_term);
void persist_state();
void load_state();

// Helpers
[[nodiscard]] std::chrono::milliseconds get_random_timeout() const;

std::string node_id_;
cluster::ClusterManager& cluster_manager_;
network::RpcServer& rpc_server_;

// State
std::atomic<NodeState> state_{NodeState::Follower};
RaftPersistentState persistent_state_;
RaftVolatileState volatile_state_;
LeaderState leader_state_;

mutable std::mutex mutex_;
std::condition_variable cv_;
std::atomic<bool> running_{false};
std::thread raft_thread_;

std::chrono::system_clock::time_point last_heartbeat_;
std::mt19937 rng_;
};

} // namespace cloudsql::raft

#endif // SQL_ENGINE_DISTRIBUTED_RAFT_NODE_HPP
Loading