Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
0b37dd7
feat(cluster): add shuffle data buffering to ClusterManager
poyrazK Mar 2, 2026
b90e678
feat(network): integrate cluster manager buffering into PushData RPC …
poyrazK Mar 2, 2026
7fa53d7
feat(executor): add BufferScanOperator declaration
poyrazK Mar 2, 2026
9cbc145
feat(executor): implement BufferScanOperator for shuffled data
poyrazK Mar 2, 2026
eea1dfe
feat(executor): add ClusterManager support to QueryExecutor interface
poyrazK Mar 2, 2026
6020f65
feat(executor): enable shuffle-aware plan building in QueryExecutor
poyrazK Mar 2, 2026
41c36f1
feat(distributed): add broadcast_table interface to DistributedExecutor
poyrazK Mar 2, 2026
1c0379a
feat(distributed): implement Broadcast Join orchestration logic
poyrazK Mar 2, 2026
4567516
test(distributed): add validation for shuffle and broadcast join orch…
poyrazK Mar 2, 2026
4bab09e
docs(plans): update migration plan and architecture for Phase 5 compl…
poyrazK Mar 2, 2026
cfc3aac
docs(phases): add detailed technical records for Phases 1-3
poyrazK Mar 2, 2026
936b7dd
docs(phases): add detailed technical records for Phases 4-5
poyrazK Mar 2, 2026
9c5245b
docs: update project README to reflect distributed capabilities
poyrazK Mar 2, 2026
4f12126
feat(network): implement full value/tuple serialization and robust RP…
poyrazK Mar 2, 2026
2ffb2e0
fix(parser): improve INSERT parsing and add support for COUNT(*)
poyrazK Mar 2, 2026
aca6936
chore: minor infrastructure cleanup and tuple bounds checking
poyrazK Mar 2, 2026
76ed0b8
style: apply clang-format to all files
poyrazK Mar 2, 2026
ca7081e
chore: remove temporary PR metadata files
poyrazK Mar 2, 2026
7b968bb
style: align struct initialization formatting with CI clang-format
poyrazK Mar 2, 2026
8c5a688
style: remove extra newline in main.cpp
poyrazK Mar 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 36 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
# cloudSQL

A lightweight, distributed SQL database engine. Designed for cloud environments with a focus on simplicity, type safety, and PostgreSQL compatibility.
A lightweight, distributed SQL database engine. Designed for cloud environments with a focus on simplicity, type safety, and PostgreSQL compatibility. cloudSQL bridges the gap between single-node databases and complex distributed systems by providing horizontal scaling with a familiar interface.

## Key Features

- **Modern C++ Architecture**: High-performance, object-oriented codebase using C++17.
- **Type-Safe Value System**: Robust handling of SQL data types (Integer, Float, Text, Boolean, etc.) using `std::variant`.
- **Paged Storage Engine**: Efficient page-level random access I/O via a custom `StorageManager`.
- **Slot-Based Heap Tables**: Optimized row-oriented storage with support for variable-length data.
- **B+ Tree Indexing**: Fast secondary access paths for point lookups and ordered scans.
- **SQL Parser**: Powerful recursive descent parser supporting DDL (`CREATE TABLE`) and DML (`INSERT`, `SELECT` with `WHERE`, `GROUP BY`, `ORDER BY`, `LIMIT`).
- **Volcano Execution Engine**: Advanced iterator-based execution supporting sequential scans, index scans, filtering, projection, hash joins, sorting, and aggregation.
- **Distributed Consensus (Raft)**: Global metadata and catalog consistency powered by a custom Raft implementation.
- **Horizontal Sharding**: Hash-based data partitioning across multiple Data Nodes.
- **Distributed Query Optimization**:
- **Shard Pruning**: Intelligent routing to avoid cluster-wide broadcasts.
- **Aggregation Merging**: Global coordination for `COUNT`, `SUM`, and other aggregates.
- **Broadcast Joins**: Optimized cross-shard joins for small-to-large table scenarios.
- **Multi-Node Transactions**: ACID guarantees across the cluster via Two-Phase Commit (2PC).
- **Type-Safe Value System**: Robust handling of SQL data types using `std::variant`.
- **Volcano Execution Engine**: Iterator-based execution supporting sequential scans, index scans, filtering, projection, hash joins, sorting, and aggregation.
- **PostgreSQL Wire Protocol**: Handshake and simple query protocol implementation for tool compatibility.

## Project Structure

- `include/`: Header files defining the core engine API.
- `src/`: Core implementation modules.
- `include/`: Header files defining the core engine and distributed API.
- `src/`: implementations modules.
- `catalog/`: Metadata and schema management.
- `common/`: Core types and configuration.
- `executor/`: Query operators and execution coordination.
- `network/`: PostgreSQL server implementation.
- `distributed/`: Raft consensus, shard management, and distributed execution.
- `executor/`: Volcano operators and local query coordination.
- `network/`: PostgreSQL server and internal cluster RPC.
- `parser/`: Lexical analysis and SQL parsing.
- `storage/`: Paged storage, heap files, and indexes.
- `tests/`: Comprehensive test suite for reliability and performance.
- `storage/`: Paged storage, heap files, and B+ tree indexes.
- `docs/`: Technical documentation and [Phase-by-Phase Roadmap](./docs/phases/README.md).
- `tests/`: Comprehensive test suite including simulation-based Raft tests and distributed scenarios.

## Building and Running

Expand All @@ -38,39 +42,41 @@ A lightweight, distributed SQL database engine. Designed for cloud environments
mkdir build
cd build
cmake ..
make
make -j$(nproc)
```

### Running Tests

```bash
# Run all tests
./build/sqlEngine_tests
# Run distributed-specific tests
./build/distributed_tests
./build/distributed_txn_tests
```

### Starting the Server
### Starting the Cluster

Start a Coordinator:
```bash
./build/sqlEngine --port 5432 --data ./data
./build/sqlEngine --mode coordinator --port 5432 --cluster-port 6432 --data ./coord_data
```

Start a Data Node:
```bash
./build/sqlEngine --mode data --cluster-port 6433 --data ./data_node_1
```

## Core Components

### 1. Value System
The engine features a unified `Value` class that safely encapsulates SQL types. This ensures data integrity during calculations and data retrieval.
### 1. Raft Consensus
Ensures that all Coordinator nodes share an identical view of the database schema and shard mappings. DDL operations are replicated and committed via the Raft log before being applied to the local catalog.

### 2. Execution Operators
Queries are executed using the Volcano model, allowing for scalable and modular operator trees:
- `SeqScanOperator`: Scans all tuples in a table.
- `IndexScanOperator`: Leverages B+ Trees for high-speed lookups.
- `FilterOperator`: Efficiently filters data based on complex expressions.
- `ProjectOperator`: Computes results and transforms data columns.
- `SortOperator`: Handles `ORDER BY` with multiple keys and directions.
- `AggregateOperator`: Implements `GROUP BY` and aggregate functions (`COUNT`, `SUM`, etc.).
- `HashJoinOperator`: Performs high-performance in-memory inner joins.
- `LimitOperator`: Manages result set windowing.
### 2. Distributed Executor
Orchestrates query fragments across the cluster. It performs plan splitting, dispatches sub-queries to relevant Data Nodes, and merges partial results (e.g., summing partial counts) before returning the final set to the client.

### 3. Storage Layer
Data is persisted in fixed-size pages (default 4KB) using a slot-based layout. The `StorageManager` coordinates access to these pages, ensuring atomic operations and enabling future support for buffer pool management.
Data is persisted in fixed-size pages (default 4KB) using a slot-based layout. The `StorageManager` coordinates access, while the `BufferPoolManager` provides an LRU-K caching layer to minimize disk I/O.

## License

Expand Down
33 changes: 33 additions & 0 deletions docs/phases/PHASE_1_CORE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Phase 1: Core Foundation

## Overview
Phase 1 established the fundamental types and storage primitives required for a modern, type-safe SQL engine.

## Key Components

### 1. Unified Value System (`common/value.hpp`)
Transitioned from C-style unions to `std::variant`.
- **Type Safety**: Use of `std::get` and `std::holds_alternative` prevents invalid memory access.
- **Null Handling**: Explicit `std::monostate` representation for SQL `NULL`.
- **Operators**: Overloaded comparison and arithmetic operators for native SQL expression evaluation.

### 2. Paged Storage Manager (`storage/storage_manager.cpp`)
Implemented a platform-agnostic abstraction for random access I/O.
- **Fixed-size Pages**: Default 4KB pages matching OS memory pages.
- **Atomic Operations**: Ensure consistent page-level reads and writes.

### 3. Buffer Pool Manager (`storage/buffer_pool_manager.cpp`)
Introduced a caching layer to minimize disk I/O.
- **Replacement Policy**: LRU-K algorithm implementation for intelligent page eviction.
- **Thread Safety**: Mutex-guarded page table and free list management.
- **Pinning**: Support for pinning pages in memory during critical operations.

### 4. Slot-based Heap Tables (`storage/heap_table.cpp`)
Implemented the physical row storage format.
- **Slotted Pages**: Header-based layout tracking row offsets and lengths.
- **Variable Length Support**: Efficient handling of `VARCHAR` and `TEXT` data.
- **Meta-data Management**: In-page tracking of `xmin`, `xmax`, and `lsn` for MVCC and recovery.

## Lessons Learned
- Pre-allocating the buffer pool reduces runtime fragmentation.
- Binary compatibility with the previous C implementation was maintained for initial data migration.
35 changes: 35 additions & 0 deletions docs/phases/PHASE_2_EXECUTION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Phase 2: Execution & Networking

## Overview
Phase 2 focused on transforming raw data into results through a standardized execution model and enabling communication between nodes.

## Key Components

### 1. Volcano Execution Engine (`executor/operator.cpp`)
Implemented the standard pull-based iterator model.
- **Base Operator Class**: Defines the `init`, `open`, `next`, and `close` interface.
- **Physical Operators**:
- `SeqScanOperator`: Linear scan of heap tables.
- `FilterOperator`: Expression evaluation using the Value system.
- `ProjectOperator`: Column transformation and aliasing.
- `HashJoinOperator`: Efficient in-memory inner joins.

### 2. Internal RPC Layer (`network/rpc_server.cpp`, `rpc_client.cpp`)
Built a high-performance communication backbone for the cluster.
- **Binary Protocol**: Custom header-payload format for minimal overhead.
- **Command Routing**: Registry-based handler system for different RPC types (`ExecuteFragment`, `TxnPrepare`, etc.).
- **Async Execution**: Support for parallel query dispatch to multiple nodes.

### 3. PostgreSQL Wire Protocol (`network/server.cpp`)
Ensured compatibility with standard SQL tools.
- **Handshake**: Support for startup messages and authentication.
- **Simple Query Protocol**: Enables tools like `psql` to send SQL strings and receive formatted results.

### 4. Transaction Management (`transaction/lock_manager.cpp`)
Implemented local concurrency control.
- **Two-Phase Locking (2PL)**: Support for Shared (S) and Exclusive (X) locks.
- **Two-Phase Commit (2PC)**: Infrastructure for distributed transaction coordination (Prepare/Commit/Abort).

## Lessons Learned
- The pull-based model simplifies operator composition but requires careful memory management of intermediate results.
- Sockets with `MSG_WAITALL` require strict protocol adherence to avoid deadlocks.
29 changes: 29 additions & 0 deletions docs/phases/PHASE_3_SQL_CATALOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Phase 3: SQL & Catalog

## Overview
Phase 3 enabled the engine to understand SQL syntax and manage dynamic database schemas through a persistent catalog.

## Key Components

### 1. SQL Parser (`parser/parser.cpp`, `lexer.cpp`)
Implemented a custom recursive descent parser.
- **Lexer**: Tokenizes SQL strings with support for keywords, identifiers, and literals.
- **Parser**: Constructs Abstract Syntax Trees (AST) for:
- **DDL**: `CREATE TABLE`, `DROP TABLE`.
- **DML**: `SELECT`, `INSERT`, `UPDATE`, `DELETE`.
- **Expression Support**: Parsing of complex boolean and arithmetic expressions in `WHERE` and `SET` clauses.

### 2. Global Catalog (`catalog/catalog.cpp`)
Introduced a centralized authority for metadata.
- **Schema Management**: Tracks table definitions, column types, and constraints.
- **Thread Safety**: Uses readers-writer locks to allow concurrent metadata lookups while ensuring atomic updates.
- **Object IDs (OID)**: System-wide unique identifiers for tables and indexes.

### 3. System Tables
Implemented persistence for metadata.
- **Storage**: Catalog state is stored in internal heap tables (`pg_class`, `pg_attribute`).
- **Bootstrap**: Logic to initialize a fresh data directory with core system tables.

## Lessons Learned
- Decoupling the AST from the execution plan allows for easier query optimization in later stages.
- A robust catalog is essential for multi-node consistency.
26 changes: 26 additions & 0 deletions docs/phases/PHASE_4_CONSENSUS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Phase 4: Distributed State (Raft)

## Overview
Phase 4 transformed cloudSQL from a single-node engine into a distributed system by implementing the Raft consensus protocol for metadata consistency.

## Key Components

### 1. Raft Core (`distributed/raft_node.cpp`)
Implemented the Raft consensus algorithm from scratch.
- **Leader Election**: Automated transition between Follower, Candidate, and Leader states based on heartbeats.
- **Log Replication**: Ensures all coordinator nodes have an identical sequence of catalog operations.
- **Persistence**: Raft log is persisted to disk to survive node restarts.

### 2. Catalog-Raft Integration
Linked the Raft log to catalog state transitions.
- **Replicated DDL**: `CREATE TABLE` and `DROP TABLE` are proposed to Raft; they are only applied to the local catalog after being committed to the majority of the cluster.
- **Consistency**: Guaranteed that all coordinators see the same schema at the same logical time.

### 3. Cluster Membership (`common/cluster_manager.hpp`)
Managed the dynamic topology of the cluster.
- **Node Discovery**: Automated registration of Data Nodes and Coordinators.
- **Role Awareness**: Distinguishes between nodes that participate in consensus (Coordinators) and those that store shards (Data Nodes).

## Lessons Learned
- Raft heartbeats must be fine-tuned to avoid unnecessary re-elections in high-latency cloud environments.
- Coupling the Catalog directly to Raft state machine application ensures strict serializability for schema changes.
34 changes: 34 additions & 0 deletions docs/phases/PHASE_5_OPTIMIZATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Phase 5: Distributed Optimization

## Overview
Phase 5 introduced high-level optimizations to reduce network latency and enable complex multi-shard query patterns.

## Key Components

### 1. Shard Pruning (`distributed/distributed_executor.cpp`)
Optimized query routing based on partitioning keys.
- **Predicate Analysis**: Detects filters on sharding keys (e.g., `WHERE id = 100`).
- **Targeted Dispatch**: Routes fragments only to the specific node owning the shard, avoiding cluster-wide broadcasts.

### 2. Aggregation Merging
Implemented coordination for distributed analytics.
- **Partial Aggregation**: Data nodes compute local counts and sums.
- **Global Merge**: The coordinator identifies aggregate functions in the SELECT list and merges partial results from all shards into a final result set.

### 3. Broadcast Join Orchestration
Developed a prototype for cross-shard JOINs.
- **Table Fetching**: Coordinator retrieves full data from a smaller table across all shards.
- **Broadcasting**: Pushes the gathered data to the `ShuffleBuffer` of every node in the cluster.
- **Local Execution**: Rewrites the query so each node joins its local shard with the broadcasted buffer data.

### 4. Shuffle Infrastructure
Enabled inter-node data movement.
- **BufferScanOperator**: A physical operator that reads from in-memory shuffle buffers instead of heap files.
- **ClusterManager Buffering**: Thread-safe staging area for data received via `PushData` RPCs.

## Lessons Learned
- Broadcast joins are highly effective for small-to-large table joins but require careful consideration of coordinator memory limits.
- Merging aggregates at the coordinator is a bottleneck for very large clusters; future work could explore tree-based merging.

## Status: 100% Test Pass
All scenarios, including distributed transactions (2PC) and join orchestration, have been verified with automated integration tests.
45 changes: 45 additions & 0 deletions docs/phases/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# cloudSQL C++ Migration & Distributed Roadmap

This directory contains the technical documentation for the lifecycle of the cloudSQL migration from C to C++, and its subsequent expansion into a distributed engine.

## Lifecycle Phases

### [Phase 1: Core Foundation](./PHASE_1_CORE.md)
**Focus**: Type safety and Paged Storage.
- Modernized `Value` system using `std::variant`.
- Binary-compatible `StorageManager` and thread-safe `BufferPoolManager`.
- Slot-based `HeapTable` implementation.

### [Phase 2: Execution & Networking](./PHASE_2_EXECUTION.md)
**Focus**: Volcano Model & Communication.
- Iterator-based physical operators (`SeqScan`, `Filter`, `Project`, `HashJoin`).
- POSIX-based internal RPC layer.
- PostgreSQL Wire Protocol (Handshake + Simple Query).
- Local `LockManager` for concurrency control.

### [Phase 3: SQL & Catalog](./PHASE_3_SQL_CATALOG.md)
**Focus**: SQL Ingestion & Metadata.
- Recursive Descent Parser for DDL and DML.
- Global `Catalog` for schema management.
- Integration of System Tables for persistence.

### [Phase 4: Distributed State](./PHASE_4_CONSENSUS.md)
**Focus**: Raft Consistency.
- Core Raft implementation (Leader Election, Heartbeats, Replication).
- Catalog-Raft integration for consistent metadata.
- `ClusterManager` for node discovery and membership.

### [Phase 5: Distributed Optimization](./PHASE_5_OPTIMIZATION.md)
**Focus**: Performance & Advanced Advanced Joins.
- Shard Pruning logic for targeted routing.
- Global Aggregation Merging (COUNT/SUM).
- Broadcast Join orchestration.
- Inter-node data redistribution (Shuffle infrastructure).

---

## Technical Standards
- **Standard**: C++17
- **Build System**: CMake
- **Tests**: GoogleTest
- **Protocol**: Binary internal RPC / PostgreSQL Wire Protocol external.
33 changes: 33 additions & 0 deletions include/common/cluster_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <vector>

#include "common/config.hpp"
#include "executor/types.hpp"

namespace cloudsql::cluster {

Expand Down Expand Up @@ -92,10 +93,42 @@ class ClusterManager {
return coordinators;
}

/**
* @brief Buffer received shuffle data
*/
void buffer_shuffle_data(const std::string& table, std::vector<executor::Tuple> rows) {
const std::scoped_lock<std::mutex> lock(mutex_);
auto& target = shuffle_buffers_[table];
target.insert(target.end(), std::make_move_iterator(rows.begin()),
std::make_move_iterator(rows.end()));
}

/**
* @brief Check if shuffle data exists for a table
*/
[[nodiscard]] bool has_shuffle_data(const std::string& table) const {
const std::scoped_lock<std::mutex> lock(mutex_);
return shuffle_buffers_.count(table) != 0U;
}

/**
* @brief Retrieve and clear buffered shuffle data
*/
std::vector<executor::Tuple> fetch_shuffle_data(const std::string& table) {
const std::scoped_lock<std::mutex> lock(mutex_);
std::vector<executor::Tuple> data;
if (shuffle_buffers_.count(table) != 0U) {
data = std::move(shuffle_buffers_[table]);
shuffle_buffers_.erase(table);
}
return data;
}

private:
const config::Config* config_;
NodeInfo self_node_;
std::unordered_map<std::string, NodeInfo> nodes_;
std::unordered_map<std::string, std::vector<executor::Tuple>> shuffle_buffers_;
mutable std::mutex mutex_;
};

Expand Down
5 changes: 5 additions & 0 deletions include/distributed/distributed_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ class DistributedExecutor {
QueryResult execute(const parser::Statement& stmt, const std::string& raw_sql);

private:
/**
* @brief Fetch data for a table from all nodes and broadcast it to all nodes
*/
bool broadcast_table(const std::string& table_name);

Catalog& catalog_;
cluster::ClusterManager& cluster_manager_;
};
Expand Down
8 changes: 5 additions & 3 deletions include/distributed/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ class ShardManager {
* @brief Compute target shard index based on primary key value
*/
static uint32_t compute_shard(const common::Value& pk_value, uint32_t num_shards) {
if (num_shards == 0) return 0;
if (num_shards == 0) {
return 0;
}

// Simple hash for demo purposes
std::string s = pk_value.to_string();
size_t hash = std::hash<std::string>{}(s);
const std::string s = pk_value.to_string();
const size_t hash = std::hash<std::string>{}(s);
return static_cast<uint32_t>(hash % num_shards);
}

Expand Down
Loading