diff --git a/.gitignore b/.gitignore
index eceea09..14fa8aa 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,6 @@
+# Local scratch / backup area (never committed)
+.backup/
+
# Build directories
build/
build-*/
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f3b6d69..6109bb3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,6 +10,35 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
+## [1.5.3] - 2026-04-12
+
+### Added
+
+- **Reactor I/O model (epoll/kqueue)** — New event-driven TCP path replaces the blocking one-thread-per-connection loop; a single event-loop thread plus a bounded worker pool serves thousands of persistent connections
+- **Per-connection slow-reader backpressure** — `api.tcp.max_write_queue_bytes` (default 16 MiB) force-closes clients whose enqueued response bytes exceed the cap
+- **Reactor error codes** — `kNetworkReactorUnsupported`/`PollFailed`/`RegisterFailed`/`ModifyFailed`/`RemoveFailed`/`QueueFull`/`AlreadyOpen` (6016–6023)
+
+### Fixed
+
+- **TCP half-close drain regression** — `shutdown(SHUT_WR)` + `recv()` clients now receive their response; `kHangup` events no longer short-circuit to `OnError`, and `read_eof_` is tracked separately from `closing_` so the drain task can enqueue the response
+- **Rate limiting silently disabled under reactor** — `api.rate_limiting.enable = true` is now honored on every accepted connection; the reactor handler calls `getpeername()` + `AllowRequest()` before `Register()` and returns `SERVER_BUSY` on rejection
+- **Unix domain socket acceptor could not start** — Removed the dead secondary `unix_acceptor_` that collided with the primary acceptor's own UDS bind; UDS now flows end-to-end through the primary acceptor's reactor handler
+- **Grafana memory usage PromQL** — Use `ignoring(type)` on the division so `mygramdb_memory_used_bytes{type="total"}` matches the denominator label set
+
+### Changed
+
+- **Blocking I/O path removed entirely** — `ConnectionIOHandler`, `TcpServer::HandleConnection`, the `api.tcp.io_model` feature flag, `connection_contexts_` map, `ConnectionAcceptor::SetConnectionHandler`, and the `BlockingMode` ctest entries are all deleted
+- **Thread-pool auto-size floor reverted** — Dropped the emergency `hw*4`/64-worker mitigation for blocking-mode starvation; restored `max(hw*2, 4)`
+- **Reactor hot-path polish** — epoll/kqueue poll buffers grow on demand up to 4 KiB entries; `Register`/`Stop` race closed by re-checking `running_` under `mux_lifecycle_` shared; `OnWritable` empty-queue teardown flattened
+
+### Testing
+
+- New unit tests: `event_multiplexer_test`, `io_reactor_test`, `reactor_connection_test`
+- New integration tests: `reactor_integration_test` (write backpressure, many-idle-connections, half-close, rate limit, UDS, max query length), `reactor_starvation_regression_test`, `thread_pool_saturation_test` (migrated, assertion inverted for reactor default)
+- e2e `test_half_close_write` now passes (previously failing on reactor path)
+
+**Detailed Release Notes**: [docs/releases/v1.5.3.md](docs/releases/v1.5.3.md)
+
## [1.5.2] - 2026-04-09
### Added
@@ -444,7 +473,9 @@ Initial release with core search engine functionality and MySQL replication supp
---
-[Unreleased]: https://github.com/libraz/mygram-db/compare/v1.5.1...HEAD
+[Unreleased]: https://github.com/libraz/mygram-db/compare/v1.5.3...HEAD
+[1.5.3]: https://github.com/libraz/mygram-db/compare/v1.5.2...v1.5.3
+[1.5.2]: https://github.com/libraz/mygram-db/compare/v1.5.1...v1.5.2
[1.5.1]: https://github.com/libraz/mygram-db/compare/v1.5.0...v1.5.1
[1.5.0]: https://github.com/libraz/mygram-db/compare/v1.4.0...v1.5.0
[1.4.0]: https://github.com/libraz/mygram-db/compare/v1.3.9...v1.4.0
diff --git a/docs/en/architecture.md b/docs/en/architecture.md
index e262f85..a60c564 100644
--- a/docs/en/architecture.md
+++ b/docs/en/architecture.md
@@ -300,19 +300,27 @@ graph TB
#### Request Dispatch Pipeline
**ConnectionAcceptor** (`connection_acceptor.h`)
-- **Responsibility**: Socket accept loop
+- **Responsibility**: Socket accept loop (TCP or UDS)
- **Features**:
- - `SO_RCVTIMEO` to prevent indefinite hangs
- - Dispatches connections to thread pool
- - Thread-safe connection tracking
+ - Hands off each accepted fd to `IoReactor` inline (accept thread → reactor, no thread-pool hop)
+ - max_connections gate with SERVER_BUSY backpressure
+ - Thread-safe `active_fds_` tracking
-**ConnectionIOHandler** (`connection_io_handler.h`)
-- **Responsibility**: Per-connection I/O handling
+**IoReactor** (`io_reactor.h`)
+- **Responsibility**: Single-threaded event loop for readiness notification
- **Features**:
- - Reads/buffers socket data
- - Parses protocol messages (delimited by `\r\n`)
- - Enforces maximum query length (default 1MB)
- - Writes responses to socket
+ - `EventMultiplexer` abstraction (Linux: epoll, macOS/BSD: kqueue)
+ - Per-fd `ReactorConnection` lifecycle management
+ - Write arm/disarm (EPOLLOUT / EVFILT_WRITE), close callback
+ - Graceful shutdown
+
+**ReactorConnection** (`reactor_connection.h`)
+- **Responsibility**: Per-connection I/O state and drain-task pattern
+- **Features**:
+ - Non-blocking `recv()` drains into `read_buf_`, frames on `\r\n`
+ - Schedules at most one in-flight drain task per connection on the worker pool ("clear-then-recheck" reschedule)
+ - Non-blocking write queue (inline fast path + EPOLLOUT fallback on EAGAIN)
+ - Per-connection `max_write_queue_bytes` slow-reader backpressure cap
**RequestDispatcher** (`request_dispatcher.h`)
- **Responsibility**: Application logic routing
@@ -565,15 +573,17 @@ graph TB
MainThread["Main Thread
Application::RunMainLoop()
Signal polling (SignalManager)
Config reload handling
Initialization/shutdown coordination"]
BinlogThread["BinlogReader Thread (if MySQL enabled)
Reads from MySQL binlog, queues events"]
EventLoop["Event Processing Loop (main thread)
Dequeues binlog events, applies to Index/DocumentStore"]
- TCPThread["TCP Server Accept Thread
Listens on TCP port, accepts connections"]
+ TCPThread["TCP/UDS Accept Thread
Listens on socket, hands accepted fds directly to IoReactor"]
+ ReactorThread["IoReactor Event Loop Thread (single)
Drains epoll_wait/kevent, dispatches readiness to ReactorConnection
Owns per-fd write arm/disarm and close"]
HTTPThread["HTTP Server Thread
Listens on HTTP port"]
- WorkerPool["Worker Thread Pool (configurable, default = CPU count)
Thread 1: Processes client requests from queue
Thread 2: ...
Thread N: ..."]
+ WorkerPool["Worker Thread Pool (configurable, default = CPU count)
Thread 1: Runs per-connection drain tasks (request processing + inline send)
Thread 2: ...
Thread N: ..."]
SnapshotThread["SnapshotScheduler Background Thread (if enabled)
Periodically creates snapshots"]
Main --> MainThread
Main --> BinlogThread
Main --> EventLoop
Main --> TCPThread
+ Main --> ReactorThread
Main --> HTTPThread
Main --> WorkerPool
Main --> SnapshotThread
@@ -719,8 +729,9 @@ Following the dependency graph, components are initialized in this order:
- AdminHandler, ReplicationHandler, DebugHandler
- CacheHandler, SyncHandler (MySQL)
6. **RequestDispatcher** (depends on handlers)
-7. **ConnectionAcceptor** (depends on thread pool)
-8. **SnapshotScheduler** (optional, depends on catalog)
+7. **ConnectionAcceptor** (depends only on ServerConfig; the reactor handler is installed later in `TcpServer::Start()`)
+8. **IoReactor** (created in `TcpServer::Start()`, depends on ThreadPool and RequestDispatcher)
+9. **SnapshotScheduler** (optional, depends on catalog)
**Note**: RateLimiter and SyncOperationManager are created in `TcpServer::Start()` before ServerLifecycleManager is instantiated.
@@ -932,8 +943,9 @@ class InvalidationManager {
### Between Acceptor and Handlers
-- **ConnectionAcceptor** → accepts connection → submits to **ThreadPool**
-- **ThreadPool worker** → calls **ConnectionIOHandler** → calls **RequestDispatcher** → calls handler
+- **ConnectionAcceptor** → accepts connection → hands the fd directly to **IoReactor::Register** (no thread-pool hop)
+- **IoReactor (event loop thread)** → detects readable → **ReactorConnection::OnReadable** → extracts frames → schedules a drain task on **ThreadPool**
+- **ThreadPool worker** → **ReactorConnection::DrainTask** → dispatches each frame via **RequestDispatcher** → handler → response sent through `EnqueueResponse()` (inline fast path; partial sends fall back to EPOLLOUT in the event loop)
---
diff --git a/docs/ja/architecture.md b/docs/ja/architecture.md
index fc8625d..bccb4f0 100644
--- a/docs/ja/architecture.md
+++ b/docs/ja/architecture.md
@@ -300,19 +300,27 @@ graph TB
#### リクエストディスパッチパイプライン
**ConnectionAcceptor** (`connection_acceptor.h`)
-- **責務**: ソケット受け入れループ
+- **責務**: ソケット受け入れループ (TCP または UDS)
- **機能**:
- - 無期限ハングを防ぐ`SO_RCVTIMEO`
- - スレッドプールへの接続ディスパッチ
- - スレッドセーフな接続トラッキング
+ - `accept()` した fd を `IoReactor` にインラインで引き渡す(accept スレッド → reactor、ワーカープールを経由しない)
+ - max_connections のゲート、SERVER_BUSY バックプレッシャー
+ - スレッドセーフな active_fds_ トラッキング
-**ConnectionIOHandler** (`connection_io_handler.h`)
-- **責務**: 接続ごとのI/Oハンドリング
+**IoReactor** (`io_reactor.h`)
+- **責務**: 単一スレッドのイベントループでの readiness 通知
- **機能**:
- - ソケットデータの読み取り/バッファリング
- - プロトコルメッセージの解析(`\r\n`で区切り)
- - 最大クエリ長の強制(デフォルト1MB)
- - ソケットへのレスポンス書き込み
+ - `EventMultiplexer` 抽象(Linux: epoll、macOS/BSD: kqueue)
+ - per-fd `ReactorConnection` のライフサイクル管理
+ - 書き込みの arm/disarm (EPOLLOUT / EVFILT_WRITE)、close コールバック
+ - graceful shutdown
+
+**ReactorConnection** (`reactor_connection.h`)
+- **責務**: 接続ごとの I/O 状態と drain タスクパターン
+- **機能**:
+ - 非ブロッキング `recv()` で `read_buf_` にドレイン、`\r\n` 区切りでフレーム化
+ - drain task をワーカープールに 1 本だけ in-flight でスケジュール("clear-then-recheck" 再スケジュール)
+ - 非ブロッキング書き込みキュー(インライン高速パス + EAGAIN 時は EPOLLOUT フォールバック)
+ - per-connection `max_write_queue_bytes` でスローリーダーバックプレッシャー
**RequestDispatcher** (`request_dispatcher.h`)
- **責務**: アプリケーションロジックルーティング
@@ -565,15 +573,17 @@ graph TB
MainThread["Main Thread
Application::RunMainLoop()
シグナルポーリング (SignalManager)
設定リロード処理
初期化/シャットダウン調整"]
BinlogThread["BinlogReader Thread (if MySQL enabled)
MySQLバイナリログを読み取り、イベントをキューイング"]
EventLoop["Event Processing Loop (main thread)
バイナリログイベントをデキュー、Index/DocumentStoreに適用"]
- TCPThread["TCP Server Accept Thread
TCPポートでリスン、接続を受け入れ"]
+ TCPThread["TCP/UDS Accept Thread
ソケットでリスン、accept した fd を IoReactor に直接登録"]
+ ReactorThread["IoReactor Event Loop Thread (1 本)
epoll_wait/kevent でドレイン、ReactorConnection に readiness を配送
per-fd の write arm/disarm と close を担当"]
HTTPThread["HTTP Server Thread
HTTPポートでリスン"]
- WorkerPool["Worker Thread Pool (configurable, default = CPU count)
Thread 1: キューからクライアントリクエストを処理
Thread 2: ...
Thread N: ..."]
+ WorkerPool["Worker Thread Pool (configurable, default = CPU count)
Thread 1: per-connection drain task を実行(リクエスト処理 + 高速送信)
Thread 2: ...
Thread N: ..."]
SnapshotThread["SnapshotScheduler Background Thread (if enabled)
定期的にスナップショットを作成"]
Main --> MainThread
Main --> BinlogThread
Main --> EventLoop
Main --> TCPThread
+ Main --> ReactorThread
Main --> HTTPThread
Main --> WorkerPool
Main --> SnapshotThread
@@ -720,8 +730,9 @@ graph TB
- AdminHandler、ReplicationHandler、DebugHandler
- CacheHandler、SyncHandler(MySQL)
6. **RequestDispatcher**(ハンドラーに依存)
-7. **ConnectionAcceptor**(ThreadPoolに依存)
-8. **SnapshotScheduler**(オプショナル、catalogに依存)
+7. **ConnectionAcceptor**(ServerConfig のみに依存。reactor handler は `TcpServer::Start()` で後付け)
+8. **IoReactor**(`TcpServer::Start()` で生成、ThreadPool と RequestDispatcher に依存)
+9. **SnapshotScheduler**(オプショナル、catalogに依存)
**注記**: RateLimiterとSyncOperationManagerは、ServerLifecycleManagerがインスタンス化される前に`TcpServer::Start()`で作成されます。
@@ -933,8 +944,9 @@ class InvalidationManager {
### AcceptorとHandlersの間
-- **ConnectionAcceptor** → 接続を受け入れ → **ThreadPool**に投入
-- **ThreadPoolワーカー** → **ConnectionIOHandler**を呼び出し → **RequestDispatcher**を呼び出し → ハンドラを呼び出し
+- **ConnectionAcceptor** → 接続を受け入れ → **IoReactor::Register** に直接渡す(thread pool ホップなし)
+- **IoReactor (event loop thread)** → 読み取り可能を検知 → **ReactorConnection::OnReadable** → フレームを抽出 → drain task を **ThreadPool** にスケジュール
+- **ThreadPool ワーカー** → **ReactorConnection::DrainTask** → **RequestDispatcher** にフレームをディスパッチ → ハンドラ → レスポンスを `EnqueueResponse()` → 高速パスでインライン送信、不完全送信時は EPOLLOUT で event loop に再送出
---
diff --git a/docs/releases/README.md b/docs/releases/README.md
index 3bfa24d..6dd7ce2 100644
--- a/docs/releases/README.md
+++ b/docs/releases/README.md
@@ -4,7 +4,8 @@ This directory contains detailed release notes for each version of MygramDB.
## Available Versions
-- [v1.5.2](v1.5.2.md) - Latest release (2026-04-09) - MySQL 9.x compatibility, VECTOR type support, Auth fix
+- [v1.5.3](v1.5.3.md) - Latest release (2026-04-12) - Reactor I/O Model (epoll/kqueue), Half-close Fix, Rate Limit & UDS Hardening
+- [v1.5.2](v1.5.2.md) - 2026-04-09 - MySQL 9.x compatibility, VECTOR type support, Auth fix
- [v1.5.1](v1.5.1.md) - 2026-04-01 - Multi-distro packaging (EL10, Ubuntu DEB), Package verification, Build fixes
- [v1.5.0](v1.5.0.md) - 2026-03-23 - verify_text Post-Filter, Docker Benchmark, Namespace Cleanup
- [v1.4.0](v1.4.0.md) - 2026-03-16 - Unix Socket, Prometheus Metrics, Benchmark Suite & Bug Fixes
diff --git a/docs/releases/v1.5.3.md b/docs/releases/v1.5.3.md
new file mode 100644
index 0000000..ac29390
--- /dev/null
+++ b/docs/releases/v1.5.3.md
@@ -0,0 +1,248 @@
+# MygramDB v1.5.3 Release Notes
+
+**Release Date:** 2026-04-12
+**Type:** Feature / Bug Fix / Refactor
+**Previous Version:** v1.5.2
+
+---
+
+## Overview
+
+Version 1.5.3 replaces the legacy blocking one-thread-per-connection TCP path with an event-driven **reactor I/O model** built on `epoll` (Linux) and `kqueue` (macOS/BSD). The reactor is now the only supported TCP path; the blocking code has been removed entirely. A small server can now serve thousands of idle persistent connections with a single event-loop thread plus a bounded worker pool, eliminating the one-thread-per-connection starvation failure mode.
+
+The release also ships the hardening fixes that landed while the reactor was being rolled out: a half-close drain regression, a silently disabled rate limiter, and a broken Unix-domain-socket path are all fixed. A Grafana dashboard PromQL bug is fixed as well.
+
+**Highlights:**
+
+- **Reactor I/O model is the default (and only) TCP path** — `epoll`/`kqueue`-backed event loop with a bounded, per-connection non-blocking write queue
+- **Blocking I/O path removed entirely** — `ConnectionIOHandler` and the `api.tcp.io_model` feature flag are gone; configuration rejects the field
+- **Half-close responses delivered correctly** — `shutdown(SHUT_WR)` + `recv()` clients now receive their response instead of being torn down on the hangup event
+- **Rate limiting enforced under reactor** — `api.rate_limiting.enable = true` is now honored on every accepted connection; regression from the reactor flip
+- **Unix domain socket restored** — UDS listener works end-to-end under the reactor default (previously failed with a spurious "already listening" error)
+- **Slow-reader backpressure** — new `api.tcp.max_write_queue_bytes` (default 16 MiB) force-closes clients whose write queue exceeds the cap instead of stalling the event loop
+- **Grafana memory usage panel fix** — PromQL label-set mismatch corrected so the memory usage percentage renders again
+
+---
+
+## New Features
+
+### 1. Reactor I/O Model (epoll / kqueue)
+
+**Type:** Feature
+**Commit:** `c66e8b1`
+
+The new reactor replaces the blocking `recv()`/`send()` loop that ran one thread per connection. Readiness events come from `epoll` on Linux and `kqueue` on macOS/BSD behind a platform-agnostic `EventMultiplexer` interface. A single event-loop thread owns the multiplexer; accepted fds are handed off inline from `ConnectionAcceptor` to `IoReactor::Register`.
+
+Per-connection state lives in `ReactorConnection`, which implements a drain-task pattern: when the socket becomes readable the reactor schedules work on the existing `ThreadPool`, and the "clear-then-recheck" reschedule idiom ensures bytes that arrive while the drain task is running are picked up without dropping readable-edge notifications. Writes use a non-blocking queue (`std::deque` plus a `front_offset` for partial head sends) with an inline fast path and an `EPOLLOUT` fallback.
+
+**New components:**
+
+- `src/server/reactor/event_multiplexer.{h,cpp}` — Platform-agnostic readiness interface
+- `src/server/reactor/epoll_multiplexer.{h,cpp}` — Level-triggered epoll backend (Linux)
+- `src/server/reactor/kqueue_multiplexer.{h,cpp}` — kqueue backend (macOS/BSD)
+- `src/server/io_reactor.{h,cpp}` — Single-threaded event loop, connection registration, arm/disarm-write, graceful shutdown
+- `src/server/reactor_connection.{h,cpp}` — Per-connection drain-task state, bounded write queue, slow-reader cap
+
+**Integration:**
+
+- `src/server/tcp_server.cpp` — Always creates and starts `IoReactor` on `Start()`; reactor init failure now propagates instead of silently falling back
+- `src/server/connection_acceptor.cpp` — Routes accepted fds through the installed `ReactorHandler` on the accept thread (no thread-pool submit)
+- `src/server/server_lifecycle_manager.cpp` — Drains the reactor ahead of the thread pool during shutdown
+
+**Thread-pool auto-size revert:** With the reactor default, the emergency `hw*4`/64-worker floor added to mitigate blocking-mode starvation is no longer needed. `src/server/thread_pool.cpp` restores the original `max(hw*2, 4)` formula.
+
+**New error codes** (`src/utils/error.h`, 6016–6023): `kNetworkReactorUnsupported`, `kNetworkReactorPollFailed`, `kNetworkReactorRegisterFailed`, `kNetworkReactorModifyFailed`, `kNetworkReactorRemoveFailed`, `kNetworkReactorQueueFull`, `kNetworkReactorAlreadyOpen`.
+
+### 2. Per-Connection Slow-Reader Backpressure
+
+**Type:** Feature
+**Commit:** `c66e8b1`
+
+Added `api.tcp.max_write_queue_bytes` (default: 16 MiB, immutable at runtime). When a client's enqueued-but-not-yet-sent response bytes exceed the cap, the reactor logs `reactor_write_queue_overflow` and force-closes the connection. This prevents a single slow or stalled reader from consuming unbounded server memory while the event loop tries to drain it.
+
+**Files:**
+
+- `src/config/config.{h,cpp}`, `config-schema.json`, `config_help.cpp`
+- `src/server/server_types.h` — New `max_write_queue_bytes` field on `ServerConfig`
+
+---
+
+## Bug Fixes
+
+### 1. TCP Half-Close Drain Regression
+
+**Type:** Bug Fix
+**Severity:** High — Clients using `shutdown(SHUT_WR)` + `recv()` never received their response
+**Commit:** `ed04e24`
+
+**Problem:** A client pattern of `send(...); shutdown(SHUT_WR); recv()` was losing its response under the reactor path. The blocking `ConnectionIOHandler` used to handle this fine by reading until EOF and then flushing. Regression uncovered by `e2e/tests/load/test_connection_stress.py::test_half_close_write`.
+
+Two bugs compounded:
+
+1. `IoReactor::DispatchEvent` treated `reactor::event::kHangup` (`EV_EOF` on kqueue, `EPOLLRDHUP` on epoll) as fatal alongside `kError`, short-circuiting to `OnError()` without ever calling `OnReadable()`. The half-close raises the hangup flag on the *same* readable event as the payload bytes, so the reactor tore the connection down before reading the request.
+
+2. `ReactorConnection::OnReadable`, on `recv() == 0`, set the `closing_` flag. `EnqueueResponse` refuses writes when `closing_` is set, so even if the drain task had run, the response would have been dropped.
+
+**Solution:**
+
+- `kHangup` now falls through to `OnReadable()`, which already drains to EOF. `kError` remains fatal.
+- Introduced a distinct `read_eof_` atomic for "peer has stopped writing" semantics. `OnReadable` sets `read_eof_` (not `closing_`) on orderly EOF and still extracts buffered frames into the drain queue. The drain task runs to completion, `EnqueueResponse` accepts the response, the write queue drains, and only then does the drain task set `closing_` and `Unregister`.
+- Subsequent `OnReadable` calls while `read_eof_` is set skip further `recv()` syscalls.
+
+**Files:**
+
+- `src/server/io_reactor.cpp`
+- `src/server/reactor_connection.{h,cpp}`
+
+**Regression test:** `tests/integration/server/reactor_integration_test.cpp::HalfCloseStillReceivesResponse` — mirrors the e2e case: send `INFO`, `shutdown(SHUT_WR)`, read response bytes with raw `recv()`. Fails on the pre-fix code.
+
+### 2. Rate Limiting Silently Disabled Under Reactor
+
+**Type:** Bug Fix
+**Severity:** High — `api.rate_limiting.enable = true` was unenforced on every accepted connection
+**Commit:** `6717a9d`
+
+**Problem:** The blocking `ConnectionIOHandler` path enforced `api.rate_limiting` via `TcpServer::HandleConnection` (`getpeername` + `AllowRequest` + close-on-reject). The reactor path had no equivalent — zero references to `rate_limit` in `reactor_connection.cpp` / `io_reactor.cpp`. Any user with rate limiting enabled got unmetered traffic the moment the default `io_model` flipped to reactor.
+
+**Solution:** The `reactor_handler` lambda in `TcpServer::Start` now captures a `RateLimiter*` (null if rate limiting is disabled or if the acceptor is a UDS acceptor), calls `getpeername()` on each accepted fd, extracts the peer IPv4 address, and calls `AllowRequest()` before `Register()`. On rejection it returns `false` so `ConnectionAcceptor::AcceptLoop` emits `SERVER_BUSY` and closes the fd, matching the existing accept-side backpressure path.
+
+**UDS bypass:** The blocking path used the `"unix-local"` sentinel to skip rate limiting for UDS clients. The reactor path computes the bypass once at `Start()` time (`rate_limiter_ptr = null` whenever the acceptor is UDS) rather than on every accept.
+
+**File:** `src/server/tcp_server.cpp`
+
+**Regression test:** `RateLimitEnforcedInReactorMode` — capacity=2, refill=0; third connection must be closed without a response.
+
+### 3. Unix Domain Socket Acceptor Could Not Start
+
+**Type:** Bug Fix
+**Severity:** High — Every `TcpServer::Start` with `unix_socket_path` set failed deterministically
+**Commit:** `6717a9d`
+
+**Problem:** `TcpServer::Start` unconditionally created a *secondary* `unix_acceptor_` whenever `config_.unix_socket_path` was non-empty. But the primary `acceptor_` constructed in `ServerLifecycleManager::InitAcceptor` received the same `ServerConfig`, and `ConnectionAcceptor::Start` checks `unix_socket_path.empty()` before the TCP branch — so the primary was *already* in UDS mode and had already bound the path. The secondary hit its own stale-socket probe on the same path and failed with "Another server is already listening on: …". No integration test exercised `unix_socket_path` end-to-end, so this never surfaced until one was added.
+
+**Solution:** Deleted the dead `unix_acceptor_` code path entirely (`tcp_server.h` field, `Start()` setup block, `Stop()` teardown, `IsRunning()` check). The primary acceptor already routes UDS client fds through the `reactor_handler` installed on it, so UDS flows through the reactor for free.
+
+**File:** `src/server/tcp_server.{h,cpp}`
+
+**Regression test:** `UnixSocketServedUnderReactorDefault` — `AF_UNIX` client connects, sends `INFO`, expects `"OK INFO"`.
+
+### 4. Grafana Dashboard Memory Usage PromQL
+
+**Type:** Bug Fix
+**Severity:** Medium — Memory usage percentage panel rendered as "No data"
+**Commit:** `10b32cb`
+
+**Problem:** The memory usage percentage panel divided `mygramdb_memory_used_bytes` by `mygramdb_memory_system_total_bytes` without reconciling the differing label sets: the numerator carries a `type="total"` label that the denominator lacks, so the Prometheus query engine returned no samples.
+
+**Solution:** Use `ignoring(type)` on the division so the two vectors match on the remaining labels.
+
+**File:** Grafana dashboard JSON
+
+---
+
+## Refactoring
+
+### 1. Remove Blocking I/O Path Entirely
+
+**Commit:** `8520fcb`
+
+With the reactor default shipping rate limiting and UDS routing fixes, nothing of value remained in the legacy blocking path. The feature flag, the handler, and the per-connection context map are all deleted.
+
+**Removed:**
+
+- `src/server/connection_io_handler.{h,cpp}` — Deleted; no remaining caller
+- `TcpServer::HandleConnection`, `connection_contexts_` map and its mutex (per-connection state now lives in `ReactorConnection`)
+- The `reactor_active` fallback branch in `TcpServer::Start` and the `SetConnectionHandler` else-arm
+- `ServerConfig::io_model`, the `api.tcp.io_model` config field, its schema enum, help entry, and runtime variable getter
+- `ConnectionHandler` typedef, `SetConnectionHandler`, and the `connection_handler_` field on `ConnectionAcceptor`
+- `tests/server/connection_io_handler_test.cpp` (8 tests), `test_io_model_override.h`, and all `ApplyIoModelOverride()` call sites
+- Three `.BlockingMode` ctest entries (`multi_table`, `end_to_end`, `verify_text`)
+- Two blocking-mode thread-pool saturation tests (`LargerThreadPoolRemovesStarvationInBlockingMode`, `QueueOverflowTriggersBusyResponseInBlockingMode`)
+- The T3 blocking-mode starvation negative-control test
+
+`kNetworkReactorUnsupported` (6016) is preserved — it now propagates from `TcpServer::Start` instead of being caught and fallback-handled.
+
+### 2. Drop Unused `ConnectionAcceptor::thread_pool_`
+
+**Commit:** `5db19e3`
+
+With the thread-pool-submit branch gone from `ConnectionAcceptor::AcceptLoop`, the `thread_pool_` member is unreferenced. Accepted fds are handed off inline to the `ReactorHandler` on the accept thread.
+
+- `ConnectionAcceptor` constructor is now `explicit ConnectionAcceptor(ServerConfig)`
+- `ServerLifecycleManager::InitAcceptor` no longer takes a `thread_pool` argument
+- 6 test fixtures in `connection_acceptor_unix_test.cpp` drop the local `ThreadPool pool` declarations
+
+### 3. Reactor Hot-Path and Lifecycle Tightening
+
+**Commit:** `1d0ad99`
+
+Four small, independent improvements surfaced during reactor code review:
+
+- **epoll/kqueue `Poll()` buffer grows on demand.** Both backends double their scratch buffer up to a 4 KiB-entry cap when a `Poll()` fills capacity, so high-concurrency bursts are not fragmented across multiple `Poll()` rounds. Growth is monotonic; the buffer never shrinks.
+- **`ReactorConnection::OnWritable` flattened.** The empty-queue branch previously wrapped the disarm + half-close teardown in a nested block with a fall-through `return true` that doubled as both the partial-drain and the fully-drained-but-not-closing return. Splitting the partial-drain early-return out removes the head-scratcher without changing semantics.
+- **`IoReactor::Register` closes a narrow race with `Stop()`.** If a concurrent `Register` interleaved between its initial `running_` check and the `mux_->Add` call, the emplace had already been cleared by `Stop()` and the caller would receive a spurious success whose fd was never tracked. `Register` now re-checks `running_` while still holding `mux_lifecycle_` shared and rolls back the `Add` on the losing side.
+- **`kMaxReadBufferBytes` contract documented.** Clarify in the header that this constant is an OOM safety rail, not per-query size enforcement — that responsibility belongs to the downstream query parser (`api.max_query_length`). Deliberately decoupled from config so that lowering `max_query_length` at runtime cannot drop well-formed but large requests still in flight on an existing connection.
+
+---
+
+## Testing
+
+### New Unit Tests
+
+- `tests/server/reactor/event_multiplexer_test.cpp` — `MockEventMultiplexer` covers register/modify/remove/poll semantics against a deterministic fake
+- `tests/server/io_reactor_test.cpp` — Start/stop lifecycle, register-after-stop rejection, `ArmWrite` before `Register` error, close-callback exactly-once
+- `tests/server/reactor_connection_test.cpp` — Drain-task "clear-then-recheck" reschedule, `EnqueueResponse` cap enforcement, partial-send `front_offset` tracking, `write_armed` transitions, close during in-flight drain
+
+### New Integration Tests
+
+- `tests/integration/server/reactor_integration_test.cpp` — Happy-path (SEARCH/INFO), many concurrent clients, and:
+ - `WriteBackpressureHandledGracefully` — 64 KiB cap + slow reader (`SO_RCVBUF=4096`) must log `reactor_write_queue_overflow` and force-close without stalling four parallel fast clients
+ - `ManyIdleConnectionsDoNotBlockActiveClient` — `kWorkers=8` + thousands of idle persistent clients must still serve a new active client in under 500 ms
+ - `HalfCloseStillReceivesResponse`, `RateLimitEnforcedInReactorMode`, `MaxQueryLengthEnforcedInReactorMode`, `UnixSocketServedUnderReactorDefault` (regression guards)
+- `tests/integration/server/reactor_starvation_regression_test.cpp` — `LateClientServedUnderLoad`, `NoBusyErrorUnderSustainedLoad`
+- `tests/integration/server/thread_pool_saturation_test.cpp` — Migrated into its own binary with the assertion inverted (`LateClientServedDespitePinnedIdleClientsInDefaultMode`) now that the reactor is the default
+
+Test helpers use `poll()` instead of `select()` so tests work past the `FD_SETSIZE=1024` cliff on macOS when opening many fds.
+
+### Test Suite Totals
+
+| Metric | Before | After | Delta |
+|---|---|---|---|
+| Fast suite (`ctest -LE "LOAD\|SLOW"`) | 2164 | 2165 | +1 regression guard |
+| e2e suite | 1 failed (`test_half_close_write`) | 195 passed, 4 skipped, 0 failed | Fixed |
+
+---
+
+## Documentation Updates
+
+- `docs/en/architecture.md` / `docs/ja/architecture.md` — Replaced the `ConnectionIOHandler` subsection with new `IoReactor` and `ReactorConnection` subsections. Added an "IoReactor Event Loop Thread (single)" node to the thread model diagram. Updated `ServerLifecycleManager` initialization order: acceptor no longer depends on `ThreadPool`; `IoReactor` is now step 8, created in `TcpServer::Start()` and depending on `ThreadPool` + `RequestDispatcher`. Rewrote "Between Acceptor and Handlers" request flow as `ConnectionAcceptor → IoReactor::Register → ReactorConnection → drain task on ThreadPool → RequestDispatcher → handler → EnqueueResponse() with EPOLLOUT fallback`.
+- `e2e/tests/load/test_load.py` — Module docstring updated to describe `ReactorConnection`'s drain task framing `\r\n`-delimited requests (previously claimed pipelined commands were processed by `ConnectionIOHandler` "in a while loop").
+- Historical release notes (`docs/releases/v1.1.0.md`, `v1.3.5.md`) reference the old class names — left intact since they document past releases.
+
+---
+
+## Migration Guide
+
+### From v1.5.2
+
+**No breaking changes to wire protocol or on-disk formats.** Direct upgrade is safe.
+
+**Configuration cleanup required** if you explicitly set `api.tcp.io_model`:
+
+- `api.tcp.io_model` has been **removed** from the config schema
+- If your config file contains this field, remove it — leaving it will cause schema validation to reject the config
+- The reactor is now the only TCP path
+
+**Optional new setting:**
+
+- `api.tcp.max_write_queue_bytes` — Per-connection write queue cap (default: `16777216` = 16 MiB). Clients whose enqueued response bytes exceed this cap are force-closed with a `reactor_write_queue_overflow` log entry. Tune downward to reduce memory exposure to slow readers; tune upward only if you see legitimate clients being closed. Immutable at runtime.
+
+**Platform requirements:**
+
+- Linux: requires `epoll` (kernel 2.6+ — already required)
+- macOS/BSD: requires `kqueue` (always present)
+- Other POSIX platforms will fail `TcpServer::Start` with `kNetworkReactorUnsupported` (6016). There is no blocking fallback.
+
+**Rate limiting users:** If you have `api.rate_limiting.enable = true` in your config and were running v1.5.2 with the reactor (where rate limiting was silently disabled), you will see enforcement resume in v1.5.3. Review your `capacity` / `refill_rate` settings against current traffic to avoid unexpected `SERVER_BUSY` responses.
+
+**Unix-domain-socket users:** UDS now works end-to-end under the reactor. If you had worked around the v1.5.2 breakage by disabling `unix_socket_path`, you can re-enable it.
diff --git a/e2e/tests/load/test_load.py b/e2e/tests/load/test_load.py
index a39c298..f999482 100644
--- a/e2e/tests/load/test_load.py
+++ b/e2e/tests/load/test_load.py
@@ -1,9 +1,9 @@
"""Load and performance tests.
Uses persistent TCP connections per worker to avoid ephemeral port exhaustion.
-The server supports pipelined commands over a single connection (ConnectionIOHandler
-processes multiple \\r\\n-delimited requests in a while loop), so each worker
-keeps one socket open for the entire duration.
+The server supports pipelined commands over a single connection: ReactorConnection
+frames each ``\\r\\n``-delimited request inside a single drain task, so each
+worker keeps one socket open for the entire duration.
"""
import json
diff --git a/examples/grafana-dashboard.json b/examples/grafana-dashboard.json
index 3bef8bf..2a99b7b 100644
--- a/examples/grafana-dashboard.json
+++ b/examples/grafana-dashboard.json
@@ -149,7 +149,7 @@
"targets": [
{
"datasource": "${DS_PROMETHEUS}",
- "expr": "mygramdb_memory_used_bytes{instance=~\"$instance\", type=\"total\"} / mygramdb_memory_system_total_bytes{instance=~\"$instance\"} * 100",
+ "expr": "mygramdb_memory_used_bytes{instance=~\"$instance\", type=\"total\"} / ignoring(type) mygramdb_memory_system_total_bytes{instance=~\"$instance\"} * 100",
"legendFormat": "{{instance}}"
}
],
diff --git a/src/app/server_orchestrator.cpp b/src/app/server_orchestrator.cpp
index b423eac..e0243ac 100644
--- a/src/app/server_orchestrator.cpp
+++ b/src/app/server_orchestrator.cpp
@@ -418,6 +418,14 @@ mygram::utils::Expected ServerOrchestrator::Initiali
server_config.host = deps_.config.api.tcp.bind;
server_config.port = deps_.config.api.tcp.port;
server_config.max_connections = deps_.config.api.tcp.max_connections;
+ server_config.worker_threads = deps_.config.api.tcp.worker_threads;
+ server_config.recv_timeout_sec = deps_.config.api.tcp.recv_timeout_sec;
+ server_config.thread_pool_queue_size = deps_.config.api.tcp.thread_pool_queue_size;
+ server_config.keepalive.enabled = deps_.config.api.tcp.keepalive.enabled;
+ server_config.keepalive.idle_sec = deps_.config.api.tcp.keepalive.idle_sec;
+ server_config.keepalive.interval_sec = deps_.config.api.tcp.keepalive.interval_sec;
+ server_config.keepalive.probe_count = deps_.config.api.tcp.keepalive.probe_count;
+ server_config.max_write_queue_bytes = deps_.config.api.tcp.max_write_queue_bytes;
server_config.default_limit = deps_.config.api.default_limit;
server_config.max_query_length = deps_.config.api.max_query_length;
server_config.allow_cidrs = deps_.config.network.allow_cidrs;
diff --git a/src/config/config-schema.json b/src/config/config-schema.json
index f5361f7..b17fc49 100644
--- a/src/config/config-schema.json
+++ b/src/config/config-schema.json
@@ -509,6 +509,74 @@
"default": 11016,
"minimum": 1,
"maximum": 65535
+ },
+ "max_connections": {
+ "type": "integer",
+ "description": "Maximum number of concurrent TCP connections accepted by the server.",
+ "default": 10000,
+ "minimum": 1,
+ "maximum": 1000000
+ },
+ "worker_threads": {
+ "type": "integer",
+ "description": "Number of worker threads in the TCP thread pool. In the blocking I/O model each persistent client holds one worker for its entire lifetime (blocking recv loop with 60s idle keepalive), so this is also the cap on concurrent persistent clients. 0 = auto (max(hardware_concurrency() * 4, 64)). Set explicitly when you know your client concurrency.",
+ "default": 0,
+ "minimum": 0,
+ "maximum": 16384
+ },
+ "recv_timeout_sec": {
+ "type": "integer",
+ "description": "SO_RCVTIMEO applied to accepted client connections. With the current blocking-recv I/O model, this also bounds how long a worker stays parked on an idle or half-dead client. 0 disables the timeout.",
+ "default": 60,
+ "minimum": 0,
+ "maximum": 86400
+ },
+ "thread_pool_queue_size": {
+ "type": "integer",
+ "description": "Maximum number of connection tasks that can be queued waiting for a worker. When this is exceeded, the acceptor sends SERVER_BUSY and closes the connection. 0 = unbounded (not recommended in production).",
+ "default": 1000,
+ "minimum": 0,
+ "maximum": 1000000
+ },
+ "keepalive": {
+ "type": "object",
+ "description": "Per-connection TCP keepalive settings applied to accepted client sockets. Tightens Linux's multi-hour defaults so half-open connections (dead peer host) are detected within a few minutes. See docs/ja/design/reactor-io-refactor.md §1.1.",
+ "additionalProperties": false,
+ "properties": {
+ "enabled": {
+ "type": "boolean",
+ "description": "Enable SO_KEEPALIVE on accepted client sockets.",
+ "default": true
+ },
+ "idle_sec": {
+ "type": "integer",
+ "description": "TCP_KEEPIDLE: seconds of idle before the first keepalive probe is sent.",
+ "default": 60,
+ "minimum": 1,
+ "maximum": 86400
+ },
+ "interval_sec": {
+ "type": "integer",
+ "description": "TCP_KEEPINTVL: seconds between keepalive probes.",
+ "default": 20,
+ "minimum": 1,
+ "maximum": 3600
+ },
+ "probe_count": {
+ "type": "integer",
+ "description": "TCP_KEEPCNT: number of unanswered probes before declaring the peer dead.",
+ "default": 3,
+ "minimum": 1,
+ "maximum": 32
+ }
+ }
+ },
+ "max_write_queue_bytes": {
+ "type": "integer",
+ "description": "Per-connection soft cap on unsent response bytes. When a single connection's write queue exceeds this cap, the reactor forcibly closes the connection to protect the server from slow-reader OOM. Default: 16 MiB (16777216 bytes).",
+ "default": 16777216,
+ "minimum": 4096,
+ "maximum": 1073741824
}
}
},
diff --git a/src/config/config.cpp b/src/config/config.cpp
index 71548dc..e167982 100644
--- a/src/config/config.cpp
+++ b/src/config/config.cpp
@@ -704,6 +704,36 @@ Config ParseConfigFromJson(const json& root) {
if (tcp.contains("port")) {
config.api.tcp.port = tcp["port"].get();
}
+ if (tcp.contains("max_connections")) {
+ config.api.tcp.max_connections = tcp["max_connections"].get();
+ }
+ if (tcp.contains("worker_threads")) {
+ config.api.tcp.worker_threads = tcp["worker_threads"].get();
+ }
+ if (tcp.contains("recv_timeout_sec")) {
+ config.api.tcp.recv_timeout_sec = tcp["recv_timeout_sec"].get();
+ }
+ if (tcp.contains("thread_pool_queue_size")) {
+ config.api.tcp.thread_pool_queue_size = tcp["thread_pool_queue_size"].get();
+ }
+ if (tcp.contains("keepalive")) {
+ const auto& ka = tcp["keepalive"];
+ if (ka.contains("enabled")) {
+ config.api.tcp.keepalive.enabled = ka["enabled"].get();
+ }
+ if (ka.contains("idle_sec")) {
+ config.api.tcp.keepalive.idle_sec = ka["idle_sec"].get();
+ }
+ if (ka.contains("interval_sec")) {
+ config.api.tcp.keepalive.interval_sec = ka["interval_sec"].get();
+ }
+ if (ka.contains("probe_count")) {
+ config.api.tcp.keepalive.probe_count = ka["probe_count"].get();
+ }
+ }
+ if (tcp.contains("max_write_queue_bytes")) {
+ config.api.tcp.max_write_queue_bytes = tcp["max_write_queue_bytes"].get();
+ }
}
if (api.contains("http")) {
const auto& http = api["http"];
diff --git a/src/config/config.h b/src/config/config.h
index eb3dbe5..34ecce5 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -232,6 +232,49 @@ struct ApiConfig {
std::string bind = "127.0.0.1";
int port = defaults::kTcpPort;
int max_connections = kDefaultMaxConnections; ///< Maximum concurrent connections
+ /// Number of worker threads in the TCP thread pool.
+ /// Architecture note: each persistent client holds one worker for its entire
+ /// lifetime (blocking recv loop with 60s idle timeout), so this cap is also
+ /// the maximum number of concurrent persistent clients the server can serve.
+ /// 0 = auto (see ThreadPool: max(hardware_concurrency() * 4, 64))
+ int worker_threads = 0;
+
+ /// SO_RCVTIMEO (seconds) applied to each client connection.
+ /// Drives the blocking recv() idle watchdog; also bounds the worst-case
+ /// time a worker remains stuck on a dead peer before Linux's own TCP
+ /// keepalive probes expire. Default: 60.
+ int recv_timeout_sec = 60; // NOLINT(readability-magic-numbers)
+
+ /// Thread pool task queue size. Once a connection is accepted but cannot
+ /// be dispatched (all workers busy, queue full), the server responds with
+ /// SERVER_BUSY and closes. Default: 1000.
+ int thread_pool_queue_size = 1000; // NOLINT(readability-magic-numbers)
+
+ /// Per-connection soft cap on unsent response bytes (design doc §7 R3).
+ /// When a single connection's write queue exceeds this cap, the reactor
+ /// forcibly closes the connection to protect the server from slow-reader
+ /// OOM. Default: 16 MiB. A production operator who sees
+ /// `reactor_write_queue_overflow` warnings in steady state should either
+ /// raise this cap (if the responses are legitimately large) or
+ /// investigate the client(s) that are failing to drain their socket.
+ int64_t max_write_queue_bytes = 16LL * 1024 * 1024; // 16 MiB
+
+ /// Per-connection TCP keepalive (applied to accepted client sockets).
+ ///
+ /// Under the blocking-recv I/O model, a half-open TCP connection (peer's
+ /// host died but the socket was never closed) keeps a worker thread parked
+ /// in recv() until either `recv_timeout_sec` fires or TCP keepalive probes
+ /// expire. The Linux defaults (2 hour idle + 75s interval * 9 probes) are
+ /// too permissive for a latency-sensitive server, so we tighten them here.
+ /// This is kept as a defense-in-depth measure even after the reactor
+ /// refactor switches away from blocking recv.
+ struct {
+ bool enabled = true;
+ int idle_sec = 60; ///< TCP_KEEPIDLE: start probing after N seconds idle
+ int interval_sec = 20; ///< TCP_KEEPINTVL: seconds between probes
+ int probe_count = 3; ///< TCP_KEEPCNT: probes before declaring dead
+ } keepalive;
+
} tcp;
struct {
diff --git a/src/config/config_help.cpp b/src/config/config_help.cpp
index 926f47a..b61c470 100644
--- a/src/config/config_help.cpp
+++ b/src/config/config_help.cpp
@@ -183,6 +183,18 @@ nlohmann::json ConfigToJson(const Config& config) {
{
{"bind", config.api.tcp.bind},
{"port", config.api.tcp.port},
+ {"max_connections", config.api.tcp.max_connections},
+ {"worker_threads", config.api.tcp.worker_threads},
+ {"recv_timeout_sec", config.api.tcp.recv_timeout_sec},
+ {"thread_pool_queue_size", config.api.tcp.thread_pool_queue_size},
+ {"keepalive",
+ {
+ {"enabled", config.api.tcp.keepalive.enabled},
+ {"idle_sec", config.api.tcp.keepalive.idle_sec},
+ {"interval_sec", config.api.tcp.keepalive.interval_sec},
+ {"probe_count", config.api.tcp.keepalive.probe_count},
+ }},
+ {"max_write_queue_bytes", config.api.tcp.max_write_queue_bytes},
}},
{"http",
{
diff --git a/src/config/runtime_variable_manager.cpp b/src/config/runtime_variable_manager.cpp
index 60d820e..cb535d4 100644
--- a/src/config/runtime_variable_manager.cpp
+++ b/src/config/runtime_variable_manager.cpp
@@ -55,14 +55,22 @@ static const std::map kVariableMutability = {
// API settings
{"api.default_limit", true},
{"api.max_query_length", true},
- {"api.tcp.bind", false}, // Immutable (requires socket rebind)
- {"api.tcp.port", false}, // Immutable
- {"api.tcp.max_connections", false}, // Immutable
- {"api.http.enable", false}, // Immutable
- {"api.http.bind", false}, // Immutable
- {"api.http.port", false}, // Immutable
- {"api.http.enable_cors", false}, // Immutable
- {"api.http.cors_allow_origin", false}, // Immutable
+ {"api.tcp.bind", false}, // Immutable (requires socket rebind)
+ {"api.tcp.port", false}, // Immutable
+ {"api.tcp.max_connections", false}, // Immutable
+ {"api.tcp.worker_threads", false}, // Immutable (thread pool is bound at startup)
+ {"api.tcp.recv_timeout_sec", false}, // Immutable (applied per connection at accept)
+ {"api.tcp.thread_pool_queue_size", false}, // Immutable (thread pool queue sized at startup)
+ {"api.tcp.keepalive.enabled", false}, // Immutable (applied per connection at accept)
+ {"api.tcp.keepalive.idle_sec", false}, // Immutable
+ {"api.tcp.keepalive.interval_sec", false}, // Immutable
+ {"api.tcp.keepalive.probe_count", false}, // Immutable
+ {"api.tcp.max_write_queue_bytes", false}, // Immutable (per-connection cap set at accept)
+ {"api.http.enable", false}, // Immutable
+ {"api.http.bind", false}, // Immutable
+ {"api.http.port", false}, // Immutable
+ {"api.http.enable_cors", false}, // Immutable
+ {"api.http.cors_allow_origin", false}, // Immutable
// Rate limiting
{"api.rate_limiting.enable", true},
@@ -615,6 +623,30 @@ std::string RuntimeVariableManager::GetVariableInternal(const std::string& varia
if (variable_name == "api.tcp.max_connections") {
return std::to_string(base_config_.api.tcp.max_connections);
}
+ if (variable_name == "api.tcp.worker_threads") {
+ return std::to_string(base_config_.api.tcp.worker_threads);
+ }
+ if (variable_name == "api.tcp.recv_timeout_sec") {
+ return std::to_string(base_config_.api.tcp.recv_timeout_sec);
+ }
+ if (variable_name == "api.tcp.thread_pool_queue_size") {
+ return std::to_string(base_config_.api.tcp.thread_pool_queue_size);
+ }
+ if (variable_name == "api.tcp.keepalive.enabled") {
+ return base_config_.api.tcp.keepalive.enabled ? "true" : "false";
+ }
+ if (variable_name == "api.tcp.keepalive.idle_sec") {
+ return std::to_string(base_config_.api.tcp.keepalive.idle_sec);
+ }
+ if (variable_name == "api.tcp.keepalive.interval_sec") {
+ return std::to_string(base_config_.api.tcp.keepalive.interval_sec);
+ }
+ if (variable_name == "api.tcp.keepalive.probe_count") {
+ return std::to_string(base_config_.api.tcp.keepalive.probe_count);
+ }
+ if (variable_name == "api.tcp.max_write_queue_bytes") {
+ return std::to_string(base_config_.api.tcp.max_write_queue_bytes);
+ }
if (variable_name == "api.http.enable") {
return base_config_.api.http.enable ? "true" : "false";
}
diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt
index 8ff8d4a..7ce9586 100644
--- a/src/server/CMakeLists.txt
+++ b/src/server/CMakeLists.txt
@@ -11,7 +11,11 @@ add_library(mygramdb_server STATIC
connection_acceptor.cpp
request_dispatcher.cpp
snapshot_scheduler.cpp
- connection_io_handler.cpp
+ reactor_connection.cpp
+ io_reactor.cpp
+ reactor/event_multiplexer.cpp
+ reactor/epoll_multiplexer.cpp
+ reactor/kqueue_multiplexer.cpp
sync_operation_manager.cpp
rate_limiter.cpp
handlers/command_handler.cpp
diff --git a/src/server/connection_acceptor.cpp b/src/server/connection_acceptor.cpp
index 184e8a6..5b96ed8 100644
--- a/src/server/connection_acceptor.cpp
+++ b/src/server/connection_acceptor.cpp
@@ -7,6 +7,7 @@
#include
#include
+#include
#include
#include
#include
@@ -23,10 +24,8 @@
#include
#include "server/server_types.h"
-#include "server/thread_pool.h"
#include "utils/error.h"
#include "utils/expected.h"
-#include "utils/fd_guard.h"
#include "utils/network_utils.h"
#include "utils/structured_log.h"
@@ -48,16 +47,7 @@ inline struct sockaddr* ToSockaddrUn(struct sockaddr_un* addr) {
}
} // namespace
-ConnectionAcceptor::ConnectionAcceptor(ServerConfig config, ThreadPool* thread_pool)
- : config_(std::move(config)), thread_pool_(thread_pool) {
- if (thread_pool_ == nullptr) {
- mygram::utils::StructuredLog()
- .Event("server_error")
- .Field("component", "connection_acceptor")
- .Field("error", "thread_pool cannot be null")
- .Error();
- }
-}
+ConnectionAcceptor::ConnectionAcceptor(ServerConfig config) : config_(std::move(config)) {}
ConnectionAcceptor::~ConnectionAcceptor() {
Stop();
@@ -350,8 +340,8 @@ void ConnectionAcceptor::Stop() {
mygram::utils::StructuredLog().Event("connection_acceptor_stopped").Debug();
}
-void ConnectionAcceptor::SetConnectionHandler(ConnectionHandler handler) {
- connection_handler_ = std::move(handler);
+void ConnectionAcceptor::SetReactorHandler(ReactorHandler handler) {
+ reactor_handler_ = std::move(handler);
}
void ConnectionAcceptor::AcceptLoop() {
@@ -450,6 +440,65 @@ void ConnectionAcceptor::AcceptLoop() {
.Warn();
}
+ // Apply per-connection TCP keepalive on TCP sockets only (not UDS). The
+ // stock Linux defaults (2h idle + 9 probes * 75s) are too lax for
+ // detecting half-open connections, so tighten them per YAML config.
+ if (!IsUnixSocket() && config_.keepalive.enabled) {
+ int keepalive_on = 1;
+ if (setsockopt(client_fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive_on, sizeof(keepalive_on)) < 0) {
+ mygram::utils::StructuredLog()
+ .Event("server_warning")
+ .Field("type", "setsockopt_failed")
+ .Field("option", "SO_KEEPALIVE")
+ .Field("error", strerror(errno))
+ .Warn();
+ }
+#if defined(__linux__)
+ // Linux exposes TCP_KEEPIDLE/TCP_KEEPINTVL/TCP_KEEPCNT. These are our
+ // production target and where this mitigation actually matters.
+ int idle_sec = config_.keepalive.idle_sec;
+ int intvl_sec = config_.keepalive.interval_sec;
+ int probe_cnt = config_.keepalive.probe_count;
+ if (setsockopt(client_fd, IPPROTO_TCP, TCP_KEEPIDLE, &idle_sec, sizeof(idle_sec)) < 0) {
+ mygram::utils::StructuredLog()
+ .Event("server_warning")
+ .Field("type", "setsockopt_failed")
+ .Field("option", "TCP_KEEPIDLE")
+ .Field("error", strerror(errno))
+ .Warn();
+ }
+ if (setsockopt(client_fd, IPPROTO_TCP, TCP_KEEPINTVL, &intvl_sec, sizeof(intvl_sec)) < 0) {
+ mygram::utils::StructuredLog()
+ .Event("server_warning")
+ .Field("type", "setsockopt_failed")
+ .Field("option", "TCP_KEEPINTVL")
+ .Field("error", strerror(errno))
+ .Warn();
+ }
+ if (setsockopt(client_fd, IPPROTO_TCP, TCP_KEEPCNT, &probe_cnt, sizeof(probe_cnt)) < 0) {
+ mygram::utils::StructuredLog()
+ .Event("server_warning")
+ .Field("type", "setsockopt_failed")
+ .Field("option", "TCP_KEEPCNT")
+ .Field("error", strerror(errno))
+ .Warn();
+ }
+#elif defined(__APPLE__) && defined(TCP_KEEPALIVE)
+ // macOS/BSD only exposes TCP_KEEPALIVE (equivalent to Linux TCP_KEEPIDLE).
+ // Interval/count fall back to system defaults. production target is
+ // Linux; this branch only keeps dev/CI on macOS functional.
+ int idle_sec = config_.keepalive.idle_sec;
+ if (setsockopt(client_fd, IPPROTO_TCP, TCP_KEEPALIVE, &idle_sec, sizeof(idle_sec)) < 0) {
+ mygram::utils::StructuredLog()
+ .Event("server_warning")
+ .Field("type", "setsockopt_failed")
+ .Field("option", "TCP_KEEPALIVE")
+ .Field("error", strerror(errno))
+ .Warn();
+ }
+#endif
+ }
+
#ifdef __APPLE__
// On macOS, set SO_NOSIGPIPE to prevent SIGPIPE when writing to closed connections
// Linux uses MSG_NOSIGNAL flag instead, but writev() doesn't support flags
@@ -470,43 +519,36 @@ void ConnectionAcceptor::AcceptLoop() {
active_fds_.insert(client_fd);
}
- // Submit to thread pool
- if (thread_pool_ != nullptr && connection_handler_) {
- bool submitted = thread_pool_->Submit([this, client_fd]() {
- // RAII guard to ensure connection is removed from active set
- // even if connection_handler_ throws an exception
- mygram::utils::ScopeGuard cleanup([this, client_fd]() { RemoveConnection(client_fd); });
-
- connection_handler_(client_fd);
- // Note: cleanup will call RemoveConnection on scope exit
- });
-
- if (!submitted) {
- // Queue is full - send error response and reject connection to prevent FD leak
- mygram::utils::StructuredLog()
- .Event("server_warning")
- .Field("type", "thread_pool_queue_full")
- .Field("client_fd", static_cast(client_fd))
- .Warn();
-
- // Send error response to client before closing connection
- static constexpr std::string_view kBusyResponse =
- "ERR SERVER_BUSY Server is too busy, please try again later\r\n";
- // Ignore write errors - we're closing the connection anyway
- // NOLINTNEXTLINE(bugprone-unused-return-value,cert-err33-c)
- write(client_fd, kBusyResponse.data(), kBusyResponse.size());
-
- close(client_fd);
- RemoveConnection(client_fd);
- }
- } else {
+ // Reactor I/O model: hand off inline on the accept thread. The reactor
+ // takes ownership of the fd on success; on failure we emit SERVER_BUSY
+ // and close the fd here. The active_fds_ entry stays until IoReactor's
+ // close callback invokes RemoveConnection.
+ if (!reactor_handler_) {
+ // Misconfiguration: reactor handler must be installed before Start().
+ // Close the fd and keep looping so the server does not silently leak.
mygram::utils::StructuredLog()
.Event("server_error")
- .Field("type", "no_connection_handler")
- .Field("error", "No connection handler or thread pool configured")
+ .Field("type", "no_reactor_handler")
+ .Field("error", "reactor handler not installed before accept loop started")
.Error();
close(client_fd);
RemoveConnection(client_fd);
+ continue;
+ }
+
+ const bool accepted = reactor_handler_(client_fd);
+ if (!accepted) {
+ mygram::utils::StructuredLog()
+ .Event("server_warning")
+ .Field("type", "reactor_register_rejected")
+ .Field("client_fd", static_cast(client_fd))
+ .Warn();
+ static constexpr std::string_view kBusyResponse =
+ "ERR SERVER_BUSY Server is too busy, please try again later\r\n";
+ // NOLINTNEXTLINE(bugprone-unused-return-value,cert-err33-c)
+ write(client_fd, kBusyResponse.data(), kBusyResponse.size());
+ close(client_fd);
+ RemoveConnection(client_fd);
}
}
diff --git a/src/server/connection_acceptor.h b/src/server/connection_acceptor.h
index 22babc5..799a40b 100644
--- a/src/server/connection_acceptor.h
+++ b/src/server/connection_acceptor.h
@@ -19,9 +19,6 @@
namespace mygramdb::server {
-// Forward declarations
-class ThreadPool;
-
/**
* @brief Network connection acceptor
*
@@ -44,19 +41,24 @@ class ThreadPool;
class ConnectionAcceptor {
public:
/**
- * @brief Connection handler callback type
+ * @brief Reactor handler callback type.
+ *
+ * Invoked **inline** on the accept thread for each accepted connection when
+ * `SetReactorHandler` has been installed. The handler must take ownership of
+ * `client_fd` and return true, or return false to reject the connection
+ * (the acceptor will then emit `ERR SERVER_BUSY` and close the fd).
*
- * This callback is invoked for each accepted connection.
- * The handler should process the connection and close the file descriptor.
+ * No thread pool hop: the reactor's `IoReactor::Register` is cheap (map
+ * insert + one epoll_ctl/kevent) and latency-sensitive, so bouncing through
+ * a worker would add a context switch for no gain.
*/
- using ConnectionHandler = std::function;
+ using ReactorHandler = std::function;
/**
* @brief Construct a ConnectionAcceptor
* @param config Server configuration
- * @param thread_pool Thread pool for connection handling
*/
- ConnectionAcceptor(ServerConfig config, ThreadPool* thread_pool);
+ explicit ConnectionAcceptor(ServerConfig config);
// Disable copy and move
ConnectionAcceptor(const ConnectionAcceptor&) = delete;
@@ -80,10 +82,23 @@ class ConnectionAcceptor {
void Stop();
/**
- * @brief Set connection handler callback
- * @param handler Callback to handle accepted connections
+ * @brief Set reactor handler callback.
+ *
+ * The handler is invoked inline on the accept thread and must take
+ * ownership of the fd on true return.
+ *
+ * @param handler Callback that takes ownership of the fd on true return.
+ */
+ void SetReactorHandler(ReactorHandler handler);
+
+ /**
+ * @brief Remove a connection from the active set.
+ *
+ * Exposed publicly so that `IoReactor`'s close callback can decrement the
+ * `max_connections` gate when it tears down a reactor connection. Safe to
+ * call from any thread.
*/
- void SetConnectionHandler(ConnectionHandler handler);
+ void RemoveConnection(int socket_fd);
/**
* @brief Get actual port being listened on
@@ -116,15 +131,8 @@ class ConnectionAcceptor {
*/
bool SetSocketOptions(int socket_fd) const;
- /**
- * @brief Remove connection from active list
- * @param socket_fd Socket file descriptor
- */
- void RemoveConnection(int socket_fd);
-
ServerConfig config_;
- ThreadPool* thread_pool_;
- ConnectionHandler connection_handler_;
+ ReactorHandler reactor_handler_;
int server_fd_ = -1;
uint16_t actual_port_ = 0;
diff --git a/src/server/connection_io_handler.cpp b/src/server/connection_io_handler.cpp
deleted file mode 100644
index 06500f0..0000000
--- a/src/server/connection_io_handler.cpp
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * @file connection_io_handler.cpp
- * @brief Network I/O handler implementation
- */
-
-#include "server/connection_io_handler.h"
-
-#include
-#include
-#include
-#include
-
-#include
-#include
-#include
-
-#include "server/server_types.h"
-#include "utils/structured_log.h"
-
-namespace mygramdb::server {
-
-ConnectionIOHandler::ConnectionIOHandler(const IOConfig& config, RequestProcessor processor,
- const std::atomic& shutdown_flag)
- : config_(config), processor_(std::move(processor)), shutdown_flag_(shutdown_flag) {}
-
-void ConnectionIOHandler::HandleConnection(int client_fd, ConnectionContext& ctx) {
- // Set receive timeout on the socket if configured
- if (config_.recv_timeout_sec > 0) {
- struct timeval timeout {}; // Zero-initialized to avoid uninitialized warning
- timeout.tv_sec = config_.recv_timeout_sec;
- timeout.tv_usec = 0;
- if (setsockopt(client_fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) < 0) {
- mygram::utils::StructuredLog()
- .Event("server_warning")
- .Field("operation", "setsockopt")
- .Field("option", "SO_RCVTIMEO")
- .Field("fd", static_cast(client_fd))
- .Field("error", strerror(errno))
- .Warn();
- // Continue anyway - timeout is not critical for functionality
- }
- }
-
- std::vector buffer(config_.recv_buffer_size);
- std::string accumulated;
- const size_t max_accumulated = config_.max_query_length * 10;
-
- while (!shutdown_flag_) {
- ssize_t bytes = recv(client_fd, buffer.data(), buffer.size() - 1, 0);
-
- if (bytes <= 0) {
- if (bytes < 0) {
- // With SO_RCVTIMEO set, timeout will trigger EAGAIN/EWOULDBLOCK
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- mygram::utils::StructuredLog()
- .Event("connection_recv_timeout")
- .Field("fd", static_cast(client_fd))
- .Debug();
- break; // Timeout - close connection
- }
- mygram::utils::StructuredLog()
- .Event("connection_recv_error")
- .Field("fd", static_cast(client_fd))
- .Field("error", strerror(errno))
- .Debug();
- }
- break;
- }
-
- buffer[bytes] = '\0';
-
- // Check buffer size limit
- if (accumulated.size() + bytes > max_accumulated) {
- mygram::utils::StructuredLog()
- .Event("server_warning")
- .Field("type", "request_too_large")
- .Field("fd", static_cast(client_fd))
- .Field("size", static_cast(accumulated.size() + bytes))
- .Field("limit", static_cast(max_accumulated))
- .Warn();
- SendResponse(client_fd, "ERROR Request too large (no newline detected)");
- break;
- }
-
- accumulated += buffer.data();
-
- // Process complete requests
- if (!ProcessBuffer(accumulated, client_fd, ctx)) {
- break;
- }
- }
-}
-
-bool ConnectionIOHandler::ProcessBuffer(std::string& accumulated, int client_fd, ConnectionContext& ctx) {
- // Optimized: Use indices instead of substr() to avoid string copies
- size_t start = 0;
- size_t pos = 0;
-
- while ((pos = accumulated.find("\r\n", start)) != std::string::npos) {
- // Create string_view for zero-copy parsing (convert to string only when needed)
- size_t len = pos - start;
- if (len == 0) {
- start = pos + 2;
- continue;
- }
-
- // Extract request - single allocation here is unavoidable as processor needs string
- std::string request = accumulated.substr(start, len);
- start = pos + 2;
-
- // Process request
- std::string response = processor_(request, ctx);
-
- // Send response
- if (!SendResponse(client_fd, response)) {
- // Cleanup: remove processed portion before returning
- if (start > 0) {
- accumulated.erase(0, start);
- }
- return false;
- }
- }
-
- // Remove all processed data in single operation (instead of per-request copies)
- if (start > 0) {
- accumulated.erase(0, start);
- }
-
- return true;
-}
-
-// Kept as member function for consistency and potential future extensions
-// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
-bool ConnectionIOHandler::SendResponse(int client_fd, const std::string& response) {
- // Zero-copy send using writev (scatter-gather I/O)
- // Avoids creating a copy of response just to append "\r\n"
- static constexpr std::array kCRLF = {'\r', '\n', '\0'};
-
- // const_cast required: iovec::iov_base is void*, but writev only reads data
- std::array iov = {
- {{const_cast(response.data()), response.size()}, // NOLINT(cppcoreguidelines-pro-type-const-cast)
- {const_cast(kCRLF.data()), 2}}}; // NOLINT(cppcoreguidelines-pro-type-const-cast)
-
- size_t total_to_send = response.size() + 2;
- size_t total_sent = 0;
- size_t current_iov = 0;
-
- while (total_sent < total_to_send && current_iov < 2) {
- ssize_t sent = writev(client_fd, &iov.at(current_iov), static_cast(2 - current_iov));
-
- if (sent < 0) {
- if (errno == EINTR) {
- continue; // Interrupted, retry
- }
- // EPIPE is expected when client closes connection
- if (errno != EPIPE) {
- mygram::utils::StructuredLog()
- .Event("connection_writev_error")
- .Field("fd", static_cast(client_fd))
- .Field("error", strerror(errno))
- .Debug();
- }
- return false;
- }
-
- if (sent == 0) {
- mygram::utils::StructuredLog()
- .Event("connection_writev_zero")
- .Field("fd", static_cast(client_fd))
- .Debug();
- return false;
- }
-
- total_sent += sent;
-
- // Adjust iov for partial writes
- size_t remaining = sent;
- while (remaining > 0 && current_iov < 2) {
- if (remaining >= iov.at(current_iov).iov_len) {
- remaining -= iov.at(current_iov).iov_len;
- iov.at(current_iov).iov_len = 0;
- current_iov++;
- } else {
- // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
- iov.at(current_iov).iov_base = static_cast(iov.at(current_iov).iov_base) + remaining;
- iov.at(current_iov).iov_len -= remaining;
- remaining = 0;
- }
- }
- }
-
- return true;
-}
-
-} // namespace mygramdb::server
diff --git a/src/server/connection_io_handler.h b/src/server/connection_io_handler.h
deleted file mode 100644
index 9d9cc97..0000000
--- a/src/server/connection_io_handler.h
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * @file connection_io_handler.h
- * @brief Handles network I/O for client connections
- */
-
-#pragma once
-
-#include
-#include
-#include
-#include
-
-// Forward declare to avoid circular dependency
-namespace mygramdb::server {
-struct ConnectionContext;
-}
-
-namespace mygramdb::server {
-
-// Constants for I/O configuration defaults
-inline constexpr size_t kDefaultIORecvBufferSize = 4096; // Separate from server_types.h to avoid conflicts
-inline constexpr size_t kDefaultMaxQueryLength = 1024 * 1024; // 1MB
-inline constexpr int kDefaultRecvTimeoutSec = 60;
-
-/**
- * @brief Configuration for connection I/O handling
- */
-struct IOConfig {
- size_t recv_buffer_size = kDefaultIORecvBufferSize;
- size_t max_query_length = kDefaultMaxQueryLength;
- int recv_timeout_sec = kDefaultRecvTimeoutSec;
-};
-
-/**
- * @brief Callback for processing complete requests
- * @param request The request string (without \r\n)
- * @param ctx Connection context (may be modified)
- * @return Response string (without \r\n)
- */
-using RequestProcessor = std::function;
-
-/**
- * @brief Handles network I/O for a single client connection
- *
- * Responsibilities:
- * - Read data from socket with buffering
- * - Parse protocol messages (delimiter: \r\n)
- * - Enforce size limits
- * - Write responses to socket
- * - Handle I/O errors gracefully
- */
-class ConnectionIOHandler {
- public:
- /**
- * @brief Construct I/O handler
- * @param config I/O configuration
- * @param processor Callback to process complete requests
- * @param shutdown_flag Reference to shutdown signal
- */
- ConnectionIOHandler(const IOConfig& config, RequestProcessor processor, const std::atomic& shutdown_flag);
-
- ~ConnectionIOHandler() = default;
-
- // Non-copyable and non-movable
- ConnectionIOHandler(const ConnectionIOHandler&) = delete;
- ConnectionIOHandler& operator=(const ConnectionIOHandler&) = delete;
- ConnectionIOHandler(ConnectionIOHandler&&) = delete;
- ConnectionIOHandler& operator=(ConnectionIOHandler&&) = delete;
-
- /**
- * @brief Handle connection I/O loop
- * @param client_fd Client socket file descriptor
- * @param ctx Connection context
- *
- * Sets SO_RCVTIMEO on the socket if recv_timeout_sec > 0 to prevent
- * indefinite hangs from malicious or misbehaving clients.
- *
- * Runs until:
- * - Client disconnects
- * - I/O error occurs
- * - Receive timeout expires (if configured)
- * - Shutdown signal received
- */
- void HandleConnection(int client_fd, ConnectionContext& ctx);
-
- private:
- const IOConfig config_;
- RequestProcessor processor_;
- const std::atomic& shutdown_flag_;
-
- /**
- * @brief Process accumulated buffer and extract complete requests
- * @param accumulated Current buffer
- * @param client_fd Socket for sending responses
- * @param ctx Connection context
- * @return True if connection should continue
- */
- bool ProcessBuffer(std::string& accumulated, int client_fd, ConnectionContext& ctx);
-
- /**
- * @brief Send response to client
- * @param client_fd Socket file descriptor
- * @param response Response string (will add \r\n)
- * @return True if send succeeded
- *
- * Note: Kept as member function (not static) for consistency with other handlers
- * and potential future extensions (e.g., response buffering, metrics)
- */
- // NOLINTNEXTLINE(readability-convert-member-functions-to-static)
- bool SendResponse(int client_fd, const std::string& response);
-};
-
-} // namespace mygramdb::server
diff --git a/src/server/io_reactor.cpp b/src/server/io_reactor.cpp
new file mode 100644
index 0000000..4fc39ac
--- /dev/null
+++ b/src/server/io_reactor.cpp
@@ -0,0 +1,298 @@
+/**
+ * @file io_reactor.cpp
+ * @brief Phase 2 IoReactor implementation — single-threaded event loop.
+ */
+
+#include "server/io_reactor.h"
+
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "server/reactor/event_multiplexer.h"
+#include "utils/error.h"
+#include "utils/expected.h"
+#include "utils/structured_log.h"
+
+namespace mygramdb::server {
+
+using mygram::utils::Error;
+using mygram::utils::ErrorCode;
+using mygram::utils::Expected;
+using mygram::utils::MakeError;
+using mygram::utils::MakeUnexpected;
+
+namespace {
+constexpr size_t kReadyEventReserve = 64;
+}
+
+IoReactor::IoReactor(ThreadPool* pool, RequestDispatcher* dispatcher, ReactorConfig cfg)
+ : pool_(pool), dispatcher_(dispatcher), config_(std::move(cfg)) {}
+
+IoReactor::~IoReactor() {
+ Stop();
+}
+
+Expected IoReactor::Start() {
+ if (running_.load(std::memory_order_acquire)) {
+ return {};
+ }
+
+ // In tests, mux_factory_ is set via SetMultiplexerFactoryForTest() to
+ // inject a MockEventMultiplexer. In production this branch is never taken.
+ auto mux = mux_factory_ ? mux_factory_() : reactor::CreateEventMultiplexer();
+ if (!mux) {
+ return MakeUnexpected(
+ MakeError(ErrorCode::kNetworkReactorUnsupported, "No event multiplexer available on this platform"));
+ }
+ if (auto r = mux->Open(); !r) {
+ return MakeUnexpected(r.error());
+ }
+
+ mux_ = std::move(mux);
+ running_.store(true, std::memory_order_release);
+ event_loop_thread_ = std::thread([this]() { EventLoop(); });
+
+ mygram::utils::StructuredLog()
+ .Event("reactor_started")
+ .Field("backend", mux_->Name())
+ .Field("poll_timeout_ms", static_cast(config_.poll_timeout_ms))
+ .Info();
+ return {};
+}
+
+void IoReactor::Stop() {
+ if (!running_.exchange(false, std::memory_order_acq_rel)) {
+ // Never started, or already stopped.
+ if (event_loop_thread_.joinable()) {
+ event_loop_thread_.join();
+ }
+ return;
+ }
+
+ if (event_loop_thread_.joinable()) {
+ event_loop_thread_.join();
+ }
+
+ // Drop all registered connections. Drain tasks that still hold a
+ // shared_ptr copy will keep their connection alive until they finish.
+ {
+ std::unique_lock lock(connections_mutex_);
+ connections_.clear();
+ }
+ {
+ // Exclusive lock: wait for any in-flight Register/Unregister/ArmWrite to
+ // finish before destroying the multiplexer. The event-loop thread has
+ // already been joined, so the only contenders are other threads.
+ std::unique_lock lock(mux_lifecycle_);
+ mux_.reset();
+ }
+
+ mygram::utils::StructuredLog().Event("reactor_stopped").Info();
+}
+
+Expected IoReactor::Register(std::shared_ptr conn) {
+ if (!conn) {
+ return MakeUnexpected(MakeError(ErrorCode::kInvalidArgument, "Register called with null connection"));
+ }
+ if (!running_.load(std::memory_order_acquire)) {
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkServerNotStarted, "IoReactor::Register before Start"));
+ }
+
+ const int fd = conn->Fd();
+ if (fd < 0) {
+ return MakeUnexpected(MakeError(ErrorCode::kInvalidArgument, "Register called with negative fd"));
+ }
+
+ // Put the socket in non-blocking mode before handing it to the event loop.
+ // Without this, a recv() inside OnReadable would block the entire reactor.
+ const int flags = ::fcntl(fd, F_GETFL, 0);
+ if (flags < 0) {
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkSocketCreationFailed,
+ std::string("fcntl(F_GETFL) failed: ") + std::strerror(errno)));
+ }
+ if ((flags & O_NONBLOCK) == 0) {
+ if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkSocketCreationFailed,
+ std::string("fcntl(F_SETFL, O_NONBLOCK) failed: ") + std::strerror(errno)));
+ }
+ }
+
+ {
+ std::unique_lock lock(connections_mutex_);
+ if (connections_.count(fd) != 0U) {
+ return MakeUnexpected(MakeError(ErrorCode::kInternalError, "IoReactor::Register duplicate fd"));
+ }
+ connections_.emplace(fd, conn);
+ }
+
+ {
+ std::shared_lock mux_lock(mux_lifecycle_);
+ if (!mux_) {
+ // Racing with Stop(): undo the insert.
+ std::unique_lock lock(connections_mutex_);
+ connections_.erase(fd);
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkServerNotStarted, "IoReactor::Register during shutdown"));
+ }
+ auto r = mux_->Add(fd, reactor::event::kReadable);
+ if (!r) {
+ std::unique_lock lock(connections_mutex_);
+ connections_.erase(fd);
+ return MakeUnexpected(r.error());
+ }
+
+ // Narrow race: Stop() sets running_=false and clears connections_
+ // *before* blocking on mux_lifecycle_ exclusive. If that sequence
+ // interleaved between our initial running_ check and this point, our
+ // emplace is already gone from the map and the about-to-be-destroyed
+ // multiplexer will never deliver events for this fd. Detect that window
+ // by re-checking running_ while we still hold mux_lifecycle_ shared,
+ // and roll back the Add so the caller sees a clean failure.
+ if (!running_.load(std::memory_order_acquire)) {
+ (void)mux_->Remove(fd);
+ std::unique_lock lock(connections_mutex_);
+ connections_.erase(fd); // no-op if Stop() already cleared the map
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkServerNotStarted, "IoReactor::Register raced with Stop"));
+ }
+ }
+
+ return {};
+}
+
+void IoReactor::Unregister(int fd) {
+ // Remove from the multiplexer first so the event loop stops reporting
+ // events for this fd, then drop the shared_ptr from the map. Drain tasks
+ // that captured a copy keep the ReactorConnection alive until they
+ // finish, and only then does the destructor close(2) the socket.
+ bool was_registered = false;
+ {
+ std::shared_lock mux_lock(mux_lifecycle_);
+ if (mux_) {
+ // Remove() is idempotent from the caller's perspective.
+ (void)mux_->Remove(fd);
+ }
+ }
+ {
+ std::unique_lock lock(connections_mutex_);
+ was_registered = connections_.erase(fd) > 0;
+ }
+ // Close callback runs outside all locks so callers cannot deadlock the
+ // reactor by taking their own mutexes inside the callback.
+ if (was_registered && close_callback_) {
+ close_callback_(fd);
+ }
+}
+
+void IoReactor::SetCloseCallback(std::function cb) {
+ close_callback_ = std::move(cb);
+}
+
+void IoReactor::SetMultiplexerFactoryForTest(MultiplexerFactory f) {
+ mux_factory_ = std::move(f);
+}
+
+Expected IoReactor::ArmWrite(int fd) {
+ std::shared_lock mux_lock(mux_lifecycle_);
+ if (!mux_) {
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkServerNotStarted, "ArmWrite while reactor stopped"));
+ }
+ return mux_->Modify(fd, reactor::event::kReadable | reactor::event::kWritable);
+}
+
+Expected IoReactor::DisarmWrite(int fd) {
+ std::shared_lock mux_lock(mux_lifecycle_);
+ if (!mux_) {
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkServerNotStarted, "DisarmWrite while reactor stopped"));
+ }
+ return mux_->Modify(fd, reactor::event::kReadable);
+}
+
+size_t IoReactor::ConnectionCount() const {
+ std::shared_lock lock(connections_mutex_);
+ return connections_.size();
+}
+
+const char* IoReactor::BackendName() const {
+ std::shared_lock lock(mux_lifecycle_);
+ return mux_ ? mux_->Name() : "unavailable";
+}
+
+std::shared_ptr IoReactor::Lookup(int fd) const {
+ std::shared_lock lock(connections_mutex_);
+ auto it = connections_.find(fd);
+ if (it == connections_.end()) {
+ return nullptr;
+ }
+ return it->second;
+}
+
+void IoReactor::EventLoop() {
+ std::vector ready;
+ ready.reserve(kReadyEventReserve);
+
+ while (running_.load(std::memory_order_acquire)) {
+ Expected poll_result;
+ {
+ // Hold the mux mutex for the duration of Poll so that concurrent
+ // Add/Modify/Remove calls do not race with the backend's internal
+ // state. The shared EventMultiplexer contract is single-threaded.
+ std::shared_lock mux_lock(mux_lifecycle_);
+ if (!mux_) {
+ break;
+ }
+ poll_result = mux_->Poll(config_.poll_timeout_ms, ready);
+ }
+ if (!poll_result) {
+ mygram::utils::StructuredLog()
+ .Event("reactor_poll_failed")
+ .Field("error", poll_result.error().to_string())
+ .Warn();
+ continue;
+ }
+ for (const auto& ev : ready) {
+ DispatchEvent(ev);
+ }
+ }
+}
+
+void IoReactor::DispatchEvent(const reactor::ReadyEvent& ev) {
+ auto conn = Lookup(ev.fd);
+ if (!conn) {
+ // Stale event (connection was unregistered between Poll and dispatch).
+ return;
+ }
+
+ bool keep = true;
+ // Hard error events short-circuit straight to OnError: the socket is no
+ // longer usable for either read or write.
+ if ((ev.events & reactor::event::kError) != 0) {
+ keep = conn->OnError();
+ } else {
+ // Hangup alone (EV_EOF on kqueue / EPOLLRDHUP on epoll) means the peer
+ // half-closed the write side of its socket. The *read* side of the
+ // server->client direction is still open, and the kernel may still have
+ // buffered payload bytes waiting to be drained. Fall through into
+ // OnReadable: its recv()==0 path sets read_eof_, finishes processing
+ // any pending frames, flushes the response via the drain task, and
+ // only then unregisters. Treating kHangup as a fatal error here causes
+ // the server to drop half-closed clients' responses on the floor.
+ if ((ev.events & (reactor::event::kReadable | reactor::event::kHangup)) != 0 && keep) {
+ keep = conn->OnReadable();
+ }
+ if ((ev.events & reactor::event::kWritable) != 0 && keep) {
+ keep = conn->OnWritable();
+ }
+ }
+
+ if (!keep) {
+ Unregister(ev.fd);
+ }
+}
+
+} // namespace mygramdb::server
diff --git a/src/server/io_reactor.h b/src/server/io_reactor.h
new file mode 100644
index 0000000..9e88a07
--- /dev/null
+++ b/src/server/io_reactor.h
@@ -0,0 +1,231 @@
+/**
+ * @file io_reactor.h
+ * @brief Single-threaded reactor event loop built on `EventMultiplexer`.
+ *
+ * Phase 2 implementation of docs/ja/design/reactor-io-refactor.md §4.4.
+ *
+ * Responsibilities:
+ * 1. Own one `reactor::EventMultiplexer` instance (epoll on Linux, kqueue
+ * on macOS/BSD, mock in tests).
+ * 2. Run a single event-loop thread that repeatedly calls `Poll()` and
+ * dispatches readable/writable/error events to the matching
+ * `ReactorConnection`.
+ * 3. Own the connection map keyed by fd. The map holds
+ * `std::shared_ptr` because worker drain tasks may
+ * capture a shared_ptr copy and keep the connection alive while writing
+ * the response (design doc §7 R5).
+ * 4. Provide thread-safe `Register`/`Unregister`/`ArmWrite`/`DisarmWrite`
+ * callable from the accept thread or from worker threads.
+ *
+ * Sharding across multiple event-loop threads is a Phase 3.5 optimisation;
+ * Phase 2 ships a single loop and measures first.
+ */
+
+#pragma once
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "server/reactor/event_multiplexer.h"
+#include "server/reactor_connection.h"
+#include "utils/error.h"
+#include "utils/expected.h"
+
+namespace mygramdb::server {
+
+class ThreadPool;
+class RequestDispatcher;
+
+/**
+ * @brief Runtime-tunable reactor settings.
+ *
+ * Additional fields for sharding count, event-loop CPU affinity, and write
+ * queue tuning will land alongside Phase 3 work. Values plumbed from YAML
+ * (`api.tcp.*`) feed into this struct.
+ */
+struct ReactorConfig {
+ /// Number of event-loop threads. Phase 2 hard-codes 1 regardless of this
+ /// value; sharding is Phase 3.5 (design doc §7 R2).
+ int event_loop_threads = 1;
+
+ /// Per-connection soft cap on pending write bytes before the reactor
+ /// forcibly closes a slow reader.
+ size_t max_write_queue_bytes = ReactorConnection::kDefaultMaxWriteQueueBytes;
+
+ /// Poll timeout in milliseconds. Short enough to react to `Stop()`
+ /// promptly, long enough to keep the event loop idle-efficient.
+ int poll_timeout_ms = 100;
+};
+
+/**
+ * @brief Single-threaded I/O reactor.
+ */
+class IoReactor {
+ public:
+ /**
+ * @param pool Non-owning thread pool used to run drain tasks. Must
+ * outlive this reactor.
+ * @param dispatcher Non-owning request dispatcher used by drain tasks.
+ * May be null if no connections are ever registered
+ * (e.g. reactor-parity unit tests with a mock mux).
+ * @param cfg Reactor tuning parameters.
+ */
+ IoReactor(ThreadPool* pool, RequestDispatcher* dispatcher, ReactorConfig cfg);
+ ~IoReactor();
+
+ IoReactor(const IoReactor&) = delete;
+ IoReactor& operator=(const IoReactor&) = delete;
+ IoReactor(IoReactor&&) = delete;
+ IoReactor& operator=(IoReactor&&) = delete;
+
+ /**
+ * @brief Create the multiplexer and start the event-loop thread.
+ * Idempotent: a second call while already running is a no-op success.
+ *
+ * Failure modes:
+ * - `kNetworkReactorUnsupported` if no multiplexer is available on this
+ * platform (CreateEventMultiplexer returned nullptr).
+ * - `kNetworkReactorInitFailed` propagated from `EventMultiplexer::Open`.
+ */
+ mygram::utils::Expected Start();
+
+ /**
+ * @brief Stop the event-loop thread, close the multiplexer, and drop all
+ * registered connections. Idempotent.
+ *
+ * Drain tasks in flight keep their own shared_ptr copies, so the actual
+ * socket close happens when the last shared_ptr drops (typically after
+ * the drain task finishes writing its final response).
+ */
+ void Stop();
+
+ /**
+ * @brief Register a freshly accepted connection with the reactor.
+ *
+ * On success the reactor inserts the shared_ptr into its map, sets the fd
+ * non-blocking, and arms `kReadable` on the multiplexer. On failure the
+ * caller retains the shared_ptr and is responsible for closing it.
+ *
+ * Failure modes:
+ * - `kNetworkServerNotStarted` if `Start()` has not been called.
+ * - `kInternalError` if the fd is already registered.
+ * - `kNetworkSocketCreationFailed` if fcntl(O_NONBLOCK) fails.
+ * - `kNetworkReactorRegisterFailed` propagated from `Add`.
+ */
+ mygram::utils::Expected Register(std::shared_ptr conn);
+
+ /**
+ * @brief Remove a connection from the reactor and the multiplexer.
+ *
+ * Safe to call from any thread. Idempotent: unknown fd is a silent no-op
+ * because Unregister races with drain-task teardown.
+ *
+ * If a close callback is installed via `SetCloseCallback`, it is invoked
+ * AFTER the connection is removed from the multiplexer and the map but
+ * BEFORE this function returns. The callback runs with no locks held.
+ */
+ void Unregister(int fd);
+
+ /**
+ * @brief Install a callback invoked from `Unregister` after a connection
+ * has been successfully removed.
+ *
+ * Used by `ConnectionAcceptor` to decrement its `active_fds_` set when the
+ * reactor tears down a connection, preserving the `max_connections` gate.
+ * Called at most once per fd and never while holding internal locks.
+ * Must be set before `Start()` and not mutated afterwards.
+ */
+ void SetCloseCallback(std::function cb);
+
+ /**
+ * @brief Factory type for creating an `EventMultiplexer` instance.
+ *
+ * Used by `SetMultiplexerFactoryForTest` to override the default
+ * `reactor::CreateEventMultiplexer()` backend in unit tests.
+ */
+ using MultiplexerFactory = std::function()>;
+
+ /**
+ * @brief Override the multiplexer factory used by `Start()`. TEST-ONLY.
+ *
+ * Must be called before `Start()`. In production code, `Start()` always
+ * calls `reactor::CreateEventMultiplexer()`. In tests, inject a factory
+ * that returns a `MockEventMultiplexer` instead.
+ *
+ * @warning Do not call this from production code. It exists solely to
+ * provide a dependency-injection seam for unit tests.
+ */
+ void SetMultiplexerFactoryForTest(MultiplexerFactory f);
+
+ /**
+ * @brief Arm `kWritable` on an already-registered fd. Phase 3 will use
+ * this when a drain task can no longer write synchronously.
+ *
+ * Phase 2 never calls this from production paths; it exists so the API
+ * stays stable across phases.
+ */
+ mygram::utils::Expected ArmWrite(int fd);
+
+ /**
+ * @brief Disarm `kWritable` on an already-registered fd.
+ */
+ mygram::utils::Expected DisarmWrite(int fd);
+
+ /// Number of connections currently registered (for metrics).
+ [[nodiscard]] size_t ConnectionCount() const;
+
+ /// Whether the event loop is currently running.
+ [[nodiscard]] bool IsRunning() const { return running_.load(std::memory_order_acquire); }
+
+ /// Backend identifier string (forwarded from `EventMultiplexer::Name`).
+ /// Returns "unavailable" if the reactor has not been started.
+ [[nodiscard]] const char* BackendName() const;
+
+ private:
+ void EventLoop();
+ void DispatchEvent(const reactor::ReadyEvent& ev);
+
+ /// Look up a connection under a shared lock and return a shared_ptr copy
+ /// (or nullptr). The copy is released outside the lock so user code runs
+ /// without holding the connections_mutex_.
+ std::shared_ptr Lookup(int fd) const;
+
+ ThreadPool* pool_; // non-owning
+ RequestDispatcher* dispatcher_; // non-owning
+ ReactorConfig config_;
+
+ std::unique_ptr mux_;
+ // shared_mutex around mux_:
+ // - Event loop holds `shared_lock` across Poll.
+ // - Register/Unregister/ArmWrite/DisarmWrite hold `shared_lock` while
+ // calling Add/Modify/Remove. Multiple shared locks coexist, so the
+ // loop's steady-state polling does NOT block connection registration.
+ // - Stop holds `unique_lock` to destroy mux_ after the event-loop thread
+ // has been joined; the exclusive wait is bounded by any in-flight
+ // Register call, not by the 100ms Poll interval.
+ // The backends (epoll/kqueue) are kernel-level thread-safe for concurrent
+ // poll + ctl/kevent from different threads; KqueueMultiplexer uses its own
+ // internal mutex to protect its interest_ map.
+ mutable std::shared_mutex mux_lifecycle_;
+ std::thread event_loop_thread_;
+ std::atomic running_{false};
+
+ mutable std::shared_mutex connections_mutex_;
+ std::unordered_map> connections_;
+
+ // Optional teardown callback (see SetCloseCallback). Set-once before Start.
+ std::function close_callback_;
+
+ // TEST-ONLY: overrides reactor::CreateEventMultiplexer() when set.
+ // Null in production; set by SetMultiplexerFactoryForTest() before Start().
+ MultiplexerFactory mux_factory_;
+};
+
+} // namespace mygramdb::server
diff --git a/src/server/reactor/epoll_multiplexer.cpp b/src/server/reactor/epoll_multiplexer.cpp
new file mode 100644
index 0000000..d4783f8
--- /dev/null
+++ b/src/server/reactor/epoll_multiplexer.cpp
@@ -0,0 +1,199 @@
+/**
+ * @file epoll_multiplexer.cpp
+ * @brief Linux epoll backend implementation.
+ */
+
+#include "server/reactor/epoll_multiplexer.h"
+
+#if defined(__linux__)
+
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+
+#include "utils/error.h"
+#include "utils/expected.h"
+
+namespace mygramdb::server::reactor {
+
+namespace {
+
+using mygram::utils::Error;
+using mygram::utils::ErrorCode;
+using mygram::utils::Expected;
+using mygram::utils::MakeError;
+using mygram::utils::MakeUnexpected;
+
+/// Starting batch size for `epoll_wait`. The buffer grows on demand up to
+/// `kMaxEventsCapacity` whenever a Poll fills it completely — a full batch
+/// is a strong signal that the next tick will need more headroom.
+constexpr std::size_t kInitialEventsCapacity = 64;
+
+/// Upper bound on the `epoll_wait` output buffer. 4096 keeps the scratch
+/// allocation bounded at ~48 KiB (`sizeof(epoll_event) * 4096`) while still
+/// covering the server's expected peak concurrency of ~2000 connections
+/// with comfortable headroom. Beyond this cap, excess ready events roll
+/// over to the next Poll — harmless because epoll is level-triggered.
+constexpr std::size_t kMaxEventsCapacity = 4096;
+
+/// Build the `errno`-decorated error message suffix used everywhere in this
+/// translation unit. Captures `errno` by value to avoid TOCTOU between the
+/// failing syscall and the `strerror` call.
+std::string FormatErrno(const char* syscall_label, int captured_errno) {
+ std::string msg = syscall_label;
+ msg += " failed: ";
+ msg += std::strerror(captured_errno);
+ msg += " (errno=";
+ msg += std::to_string(captured_errno);
+ msg += ")";
+ return msg;
+}
+
+/// Translate the reactor-level interest bitmask to an `epoll_event.events`
+/// mask. `EPOLLRDHUP | EPOLLERR | EPOLLHUP` are always armed so the reactor
+/// can observe peer-initiated shutdowns even when neither read nor write
+/// interest is set (e.g. a fully idle connection that the peer closes).
+/// Explicitly level-triggered — no `EPOLLET`.
+uint32_t InterestToEpollEvents(uint8_t interest) {
+ uint32_t events = EPOLLRDHUP | EPOLLERR | EPOLLHUP;
+ if ((interest & event::kReadable) != 0U) {
+ events |= EPOLLIN;
+ }
+ if ((interest & event::kWritable) != 0U) {
+ events |= EPOLLOUT;
+ }
+ return events;
+}
+
+/// Translate an `epoll_event.events` mask back to the reactor's bitmask.
+uint8_t EpollEventsToReady(uint32_t events) {
+ uint8_t ready = event::kNone;
+ if ((events & EPOLLIN) != 0U) {
+ ready |= event::kReadable;
+ }
+ if ((events & EPOLLOUT) != 0U) {
+ ready |= event::kWritable;
+ }
+ if ((events & EPOLLERR) != 0U) {
+ ready |= event::kError;
+ }
+ if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0U) {
+ ready |= event::kHangup;
+ }
+ return ready;
+}
+
+} // namespace
+
+EpollMultiplexer::EpollMultiplexer() {
+ // Reserve once up front so the Poll() hot path does not allocate. We size
+ // the buffer via resize() (not reserve()) because epoll_wait() writes into
+ // [data(), data() + size()); the actual returned count is what we iterate.
+ events_.resize(kInitialEventsCapacity);
+}
+
+EpollMultiplexer::~EpollMultiplexer() {
+ if (epoll_fd_ >= 0) {
+ // Best-effort close; we are in a destructor and must not throw.
+ ::close(epoll_fd_);
+ epoll_fd_ = -1;
+ }
+}
+
+Expected EpollMultiplexer::Open() {
+ if (epoll_fd_ >= 0) {
+ return MakeUnexpected(
+ MakeError(ErrorCode::kNetworkReactorAlreadyOpen, "EpollMultiplexer::Open called on already-open multiplexer"));
+ }
+
+ const int fd = ::epoll_create1(EPOLL_CLOEXEC);
+ if (fd < 0) {
+ const int en = errno;
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkReactorInitFailed, FormatErrno("epoll_create1", en)));
+ }
+ epoll_fd_ = fd;
+ return {};
+}
+
+Expected EpollMultiplexer::Add(int fd, uint8_t interest) {
+ struct epoll_event ev {};
+ ev.events = InterestToEpollEvents(interest);
+ ev.data.fd = fd;
+
+ if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) != 0) {
+ const int en = errno;
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkReactorRegisterFailed, FormatErrno("epoll_ctl(ADD)", en)));
+ }
+ return {};
+}
+
+Expected EpollMultiplexer::Modify(int fd, uint8_t interest) {
+ struct epoll_event ev {};
+ ev.events = InterestToEpollEvents(interest);
+ ev.data.fd = fd;
+
+ if (::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev) != 0) {
+ const int en = errno;
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkReactorModifyFailed, FormatErrno("epoll_ctl(MOD)", en)));
+ }
+ return {};
+}
+
+Expected EpollMultiplexer::Remove(int fd) {
+ if (::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr) != 0) {
+ const int en = errno;
+ // Idempotent teardown race: the fd may have been closed (EBADF) or never
+ // actually registered (ENOENT) by the time IoReactor::Stop() reaches us.
+ // Swallow those two cases; everything else is a real failure.
+ if (en == ENOENT || en == EBADF) {
+ return {};
+ }
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkReactorRemoveFailed, FormatErrno("epoll_ctl(DEL)", en)));
+ }
+ return {};
+}
+
+Expected EpollMultiplexer::Poll(int timeout_ms, std::vector& out) {
+ out.clear();
+
+ const int n = ::epoll_wait(epoll_fd_, events_.data(), static_cast(events_.size()), timeout_ms);
+ if (n < 0) {
+ const int en = errno;
+ // EINTR is not an error condition: a signal interrupted the wait and the
+ // reactor loop will simply call us again. Report success with empty `out`.
+ if (en == EINTR) {
+ return {};
+ }
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkReactorPollFailed, FormatErrno("epoll_wait", en)));
+ }
+
+ out.reserve(static_cast(n));
+ for (int i = 0; i < n; ++i) {
+ const auto& ev = events_[static_cast(i)];
+ out.push_back(ReadyEvent{ev.data.fd, EpollEventsToReady(ev.events)});
+ }
+
+ // Dynamic grow: if this Poll() filled the scratch buffer, chances are we
+ // are running behind and the next tick will need more slots. Double the
+ // capacity up to `kMaxEventsCapacity` so we stop fragmenting high-concurrency
+ // bursts across multiple Poll() rounds. Growth is one-shot and monotonic;
+ // the buffer never shrinks back down.
+ if (static_cast(n) == events_.size() && events_.size() < kMaxEventsCapacity) {
+ const std::size_t new_size = std::min(events_.size() * 2, kMaxEventsCapacity);
+ events_.resize(new_size);
+ }
+ return {};
+}
+
+const char* EpollMultiplexer::Name() const {
+ return "epoll";
+}
+
+} // namespace mygramdb::server::reactor
+
+#endif // defined(__linux__)
diff --git a/src/server/reactor/epoll_multiplexer.h b/src/server/reactor/epoll_multiplexer.h
new file mode 100644
index 0000000..92340f2
--- /dev/null
+++ b/src/server/reactor/epoll_multiplexer.h
@@ -0,0 +1,63 @@
+/**
+ * @file epoll_multiplexer.h
+ * @brief Linux epoll backend for the reactor I/O event multiplexer.
+ *
+ * This backend is compiled in only on Linux builds. On all other platforms
+ * the entire translation unit collapses to nothing, and the factory in
+ * `event_multiplexer.cpp` selects a different backend (kqueue on BSD/macOS)
+ * or returns nullptr.
+ *
+ * The epoll backend is deliberately level-triggered (no `EPOLLET`): workers
+ * drain socket buffers on their own schedule, and the event loop re-reports
+ * readiness on each `Poll()` until the interest bit is cleared. This matches
+ * the contract documented in `event_multiplexer.h`.
+ */
+
+#pragma once
+
+#if defined(__linux__)
+
+#include
+
+#include
+
+#include "server/reactor/event_multiplexer.h"
+
+namespace mygramdb::server::reactor {
+
+/**
+ * @brief `EventMultiplexer` implementation backed by Linux `epoll(7)`.
+ *
+ * Not thread-safe: the reactor's event-loop thread is the sole owner.
+ */
+class EpollMultiplexer : public EventMultiplexer {
+ public:
+ EpollMultiplexer();
+ ~EpollMultiplexer() override;
+
+ EpollMultiplexer(const EpollMultiplexer&) = delete;
+ EpollMultiplexer& operator=(const EpollMultiplexer&) = delete;
+ EpollMultiplexer(EpollMultiplexer&&) = delete;
+ EpollMultiplexer& operator=(EpollMultiplexer&&) = delete;
+
+ mygram::utils::Expected Open() override;
+ mygram::utils::Expected Add(int fd, uint8_t interest) override;
+ mygram::utils::Expected Modify(int fd, uint8_t interest) override;
+ mygram::utils::Expected Remove(int fd) override;
+ mygram::utils::Expected Poll(int timeout_ms, std::vector& out) override;
+ const char* Name() const override;
+
+ private:
+ /// File descriptor returned by `epoll_create1`. -1 until `Open()` succeeds.
+ int epoll_fd_{-1};
+
+ /// Reusable scratch buffer for `epoll_wait`. Sized once in the constructor
+ /// and doubled on demand (up to a fixed cap) whenever a Poll() fills it
+ /// completely, so sustained bursts do not fragment across multiple Poll
+ /// rounds. Never shrinks back down.
+ std::vector events_;
+};
+
+} // namespace mygramdb::server::reactor
+
+#endif // defined(__linux__)
diff --git a/src/server/reactor/event_multiplexer.cpp b/src/server/reactor/event_multiplexer.cpp
new file mode 100644
index 0000000..24a055e
--- /dev/null
+++ b/src/server/reactor/event_multiplexer.cpp
@@ -0,0 +1,31 @@
+/**
+ * @file event_multiplexer.cpp
+ * @brief Factory for the platform-appropriate EventMultiplexer backend.
+ *
+ * The individual backend implementations live next to this file
+ * (`epoll_multiplexer.cpp`, `kqueue_multiplexer.cpp`). Each one is compiled
+ * only on its target platform; this factory picks the right one at
+ * compile time.
+ */
+
+#include "server/reactor/event_multiplexer.h"
+
+#if defined(__linux__)
+#include "server/reactor/epoll_multiplexer.h"
+#elif defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
+#include "server/reactor/kqueue_multiplexer.h"
+#endif
+
+namespace mygramdb::server::reactor {
+
+std::unique_ptr CreateEventMultiplexer() {
+#if defined(__linux__)
+ return std::make_unique();
+#elif defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
+ return std::make_unique();
+#else
+ return nullptr;
+#endif
+}
+
+} // namespace mygramdb::server::reactor
diff --git a/src/server/reactor/event_multiplexer.h b/src/server/reactor/event_multiplexer.h
new file mode 100644
index 0000000..961bb6a
--- /dev/null
+++ b/src/server/reactor/event_multiplexer.h
@@ -0,0 +1,157 @@
+/**
+ * @file event_multiplexer.h
+ * @brief Platform-agnostic level-triggered event multiplexer.
+ *
+ * Inspired by Redis's `ae.c`, this module provides a minimal abstraction over
+ * platform-specific polling primitives (epoll on Linux, kqueue on BSD/macOS,
+ * and a deterministic mock for tests). `IoReactor` is written against this
+ * interface so that the event loop itself is portable and the same test suite
+ * can exercise every backend in parity.
+ *
+ * Design contracts:
+ *
+ * - **Level-triggered semantics.** If a registered fd is still readable or
+ * writable and the interest bit is armed, the next `Poll()` must re-report
+ * the event. The reactor and `ReactorConnection` rely on this to decouple
+ * event notification from consumption (a worker thread may drain the fd on
+ * its own schedule without starving the event loop).
+ *
+ * - **`Expected` for all mutating operations.** Syscalls map to
+ * `ErrorCode::kNetworkReactor*`. The `errno` captured at failure time is
+ * copied into the error message.
+ *
+ * - **Caller-owned output buffer.** `Poll()` takes a reusable
+ * `std::vector&` so the event-loop hot path allocates nothing
+ * steady-state.
+ *
+ * - **Fd ownership.** The multiplexer never closes fds it is given; that
+ * remains `IoReactor`'s job. `Remove()` only unregisters.
+ */
+
+#pragma once
+
+#include
+#include
+#include
+#include
+
+#include "utils/error.h"
+#include "utils/expected.h"
+
+namespace mygramdb::server::reactor {
+
+/**
+ * @brief Interest / ready-event bitmask values.
+ *
+ * Stored as a plain `uint8_t` so callers can `|` them together without
+ * enum-class ceremony. The set is intentionally tiny; sharding or edge-
+ * triggered modes are deliberately out of scope and would be a separate
+ * abstraction if ever needed.
+ */
+namespace event {
+constexpr uint8_t kNone = 0;
+constexpr uint8_t kReadable = 1U << 0; ///< EPOLLIN / EVFILT_READ
+constexpr uint8_t kWritable = 1U << 1; ///< EPOLLOUT / EVFILT_WRITE
+constexpr uint8_t kError = 1U << 2; ///< EPOLLERR / EV_ERROR
+constexpr uint8_t kHangup = 1U << 3; ///< EPOLLHUP|EPOLLRDHUP / EV_EOF
+} // namespace event
+
+/**
+ * @brief One fd's ready events as produced by `Poll`.
+ *
+ * `events` is a bit-OR of `event::k*` values. `kReadable`/`kWritable` only
+ * appear if the corresponding interest bit was armed.
+ */
+struct ReadyEvent {
+ int fd;
+ uint8_t events;
+};
+
+/**
+ * @brief Abstract interface for backend-specific polling primitives.
+ *
+ * Thread-safety: instances are NOT thread-safe on their own. `IoReactor` owns
+ * the multiplexer from a single event-loop thread and serializes any
+ * cross-thread invocations (e.g. `ArmWrite` from a worker) via its own
+ * synchronisation before calling into the multiplexer.
+ */
+class EventMultiplexer {
+ public:
+ virtual ~EventMultiplexer() = default;
+
+ EventMultiplexer(const EventMultiplexer&) = delete;
+ EventMultiplexer& operator=(const EventMultiplexer&) = delete;
+ EventMultiplexer(EventMultiplexer&&) = delete;
+ EventMultiplexer& operator=(EventMultiplexer&&) = delete;
+
+ /**
+ * @brief Create the underlying poller fd (`epoll_create1`, `kqueue`, ...).
+ *
+ * Must be called exactly once before `Add`/`Poll`. Returns
+ * `kNetworkReactorInitFailed` on syscall failure, or
+ * `kNetworkReactorAlreadyOpen` if called twice.
+ */
+ virtual mygram::utils::Expected Open() = 0;
+
+ /**
+ * @brief Register a new fd with the given interest set.
+ *
+ * The multiplexer does NOT take ownership of the fd. `interest` is a bit-OR
+ * of `event::kReadable` and `event::kWritable`; `kError` and `kHangup` are
+ * always reported and cannot be masked. Returns
+ * `kNetworkReactorRegisterFailed` on syscall failure.
+ */
+ virtual mygram::utils::Expected Add(int fd, uint8_t interest) = 0;
+
+ /**
+ * @brief Update the interest set for an already-registered fd.
+ *
+ * This is the hot-path primitive for `ArmWrite`/`DisarmWrite`: flipping
+ * `kWritable` on or off without recreating registration state.
+ */
+ virtual mygram::utils::Expected Modify(int fd, uint8_t interest) = 0;
+
+ /**
+ * @brief Remove a previously-registered fd.
+ *
+ * Idempotent from the caller's perspective: removing an unknown fd returns
+ * success because `IoReactor::Stop()` may race with connection teardown.
+ */
+ virtual mygram::utils::Expected Remove(int fd) = 0;
+
+ /**
+ * @brief Block until at least one event is ready, or `timeout_ms` elapses.
+ *
+ * @param timeout_ms -1 for infinite wait, 0 for non-blocking probe, >0 for
+ * relative milliseconds.
+ * @param out Output buffer. Cleared and appended to; capacity is
+ * preserved so the hot path does not allocate steady-state.
+ * On error, `out` is left empty.
+ */
+ virtual mygram::utils::Expected Poll(int timeout_ms, std::vector& out) = 0;
+
+ /**
+ * @brief Backend identifier for metrics and log fields.
+ *
+ * Must return a stable string literal: "epoll", "kqueue", "mock".
+ */
+ virtual const char* Name() const = 0;
+
+ protected:
+ EventMultiplexer() = default;
+};
+
+/**
+ * @brief Factory: return the best multiplexer available on this build.
+ *
+ * Selection order (compile-time):
+ * 1. Linux -> `EpollMultiplexer`
+ * 2. Apple / {Free,Net,Open}BSD -> `KqueueMultiplexer`
+ * 3. Otherwise -> `nullptr`
+ *
+ * A nullptr return is how `TcpServer` decides to emit the "reactor mode not
+ * supported on this platform, falling back to blocking" warning at startup.
+ */
+std::unique_ptr CreateEventMultiplexer();
+
+} // namespace mygramdb::server::reactor
diff --git a/src/server/reactor/kqueue_multiplexer.cpp b/src/server/reactor/kqueue_multiplexer.cpp
new file mode 100644
index 0000000..b74524c
--- /dev/null
+++ b/src/server/reactor/kqueue_multiplexer.cpp
@@ -0,0 +1,332 @@
+/**
+ * @file kqueue_multiplexer.cpp
+ * @brief kqueue implementation of `EventMultiplexer` for BSD/macOS.
+ */
+
+#include "server/reactor/kqueue_multiplexer.h"
+
+#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
+
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "utils/error.h"
+#include "utils/expected.h"
+
+namespace mygramdb::server::reactor {
+
+namespace {
+
+using mygram::utils::Error;
+using mygram::utils::ErrorCode;
+using mygram::utils::Expected;
+using mygram::utils::MakeError;
+using mygram::utils::MakeUnexpected;
+
+/// Starting size of the `kevent()` output buffer. Grows on demand up to
+/// `kMaxEventBufferSize` whenever a Poll fills the buffer completely.
+constexpr std::size_t kDefaultEventBufferSize = 64;
+
+/// Upper bound on the `kevent()` output buffer. 4096 keeps the scratch
+/// allocation bounded at ~128 KiB (`sizeof(struct kevent) * 4096`) while
+/// still covering the server's expected peak concurrency of ~2000
+/// connections with comfortable headroom. kqueue is level-triggered, so
+/// excess ready events beyond the cap are harmlessly re-reported on the
+/// next Poll.
+constexpr std::size_t kMaxEventBufferSize = 4096;
+
+/// Number of milliseconds in a second (kept as a named constant to keep the
+/// timespec conversion below readable under clang-tidy's magic-number rule).
+constexpr long kNanosPerMilli = 1'000'000L;
+constexpr int kMillisPerSecond = 1000;
+
+/// Build an errno-decorated error message suffix. Captures `captured_errno` by
+/// value to avoid TOCTOU between the failing syscall and the `strerror` call.
+std::string FormatErrno(const char* syscall_label, int captured_errno) {
+ std::string msg = syscall_label;
+ msg += " failed: ";
+ msg += std::strerror(captured_errno);
+ msg += " (errno=";
+ msg += std::to_string(captured_errno);
+ msg += ")";
+ return msg;
+}
+
+} // namespace
+
+KqueueMultiplexer::KqueueMultiplexer() {
+ // Size the output buffer once up front so the steady-state Poll() path does
+ // not allocate. We use resize() (not reserve()) because kevent() writes into
+ // [data(), data() + size()); the actual returned count is what we iterate.
+ events_.resize(kDefaultEventBufferSize);
+}
+
+KqueueMultiplexer::~KqueueMultiplexer() {
+ if (kqueue_fd_ >= 0) {
+ // Best-effort close; we are in a destructor and must not throw.
+ ::close(kqueue_fd_);
+ kqueue_fd_ = -1;
+ }
+}
+
+Expected KqueueMultiplexer::Open() {
+ if (kqueue_fd_ >= 0) {
+ return MakeUnexpected(
+ MakeError(ErrorCode::kNetworkReactorAlreadyOpen, "KqueueMultiplexer::Open called on already-open multiplexer"));
+ }
+
+ const int kq = ::kqueue();
+ if (kq < 0) {
+ const int en = errno;
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkReactorInitFailed, FormatErrno("kqueue", en)));
+ }
+
+ // Darwin / BSD `kqueue()` does not atomically set FD_CLOEXEC the way Linux's
+ // `epoll_create1(EPOLL_CLOEXEC)` does. Apply it now so that any fork/exec
+ // performed by the host process does not leak the poller fd to children.
+ if (::fcntl(kq, F_SETFD, FD_CLOEXEC) < 0) {
+ const int en = errno;
+ ::close(kq);
+ return MakeUnexpected(
+ MakeError(ErrorCode::kNetworkReactorInitFailed, FormatErrno("fcntl(F_SETFD, FD_CLOEXEC)", en)));
+ }
+
+ kqueue_fd_ = kq;
+ return {};
+}
+
+Expected KqueueMultiplexer::ApplyInterest(int fd, uint8_t new_interest, uint8_t old_interest,
+ bool is_add) {
+ // kqueue has no single-call "set the interest set of this fd to X" primitive
+ // the way `epoll_ctl(MOD)` does, so we diff the new interest against the
+ // previously-armed interest and emit at most two change records: one per
+ // filter (EVFILT_READ, EVFILT_WRITE).
+ std::array changes{};
+ int nchanges = 0;
+
+ // --- EVFILT_READ ---
+ const bool want_read = (new_interest & event::kReadable) != 0U;
+ const bool had_read = (old_interest & event::kReadable) != 0U;
+ if (want_read && (is_add || !had_read)) {
+ EV_SET(&changes[nchanges], static_cast(fd), EVFILT_READ, EV_ADD, 0, 0, nullptr);
+ ++nchanges;
+ } else if (!want_read && !is_add && had_read) {
+ EV_SET(&changes[nchanges], static_cast(fd), EVFILT_READ, EV_DELETE, 0, 0, nullptr);
+ ++nchanges;
+ }
+
+ // --- EVFILT_WRITE ---
+ const bool want_write = (new_interest & event::kWritable) != 0U;
+ const bool had_write = (old_interest & event::kWritable) != 0U;
+ if (want_write && (is_add || !had_write)) {
+ EV_SET(&changes[nchanges], static_cast(fd), EVFILT_WRITE, EV_ADD, 0, 0, nullptr);
+ ++nchanges;
+ } else if (!want_write && !is_add && had_write) {
+ EV_SET(&changes[nchanges], static_cast(fd), EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
+ ++nchanges;
+ }
+
+ // Nothing to flush is not an error: e.g. Modify() with the same interest
+ // set, or Add() with `kNone`. Note the deliberate absence of `EV_CLEAR` on
+ // every EV_SET above: kqueue defaults to level-triggered, which matches the
+ // EventMultiplexer contract and the reactor's expectations.
+ if (nchanges == 0) {
+ return {};
+ }
+
+ if (::kevent(kqueue_fd_, changes.data(), nchanges, nullptr, 0, nullptr) < 0) {
+ const int en = errno;
+ const ErrorCode code = is_add ? ErrorCode::kNetworkReactorRegisterFailed : ErrorCode::kNetworkReactorModifyFailed;
+ return MakeUnexpected(
+ MakeError(code, FormatErrno(is_add ? "kevent(EV_ADD)" : "kevent(modify)", en), "fd=" + std::to_string(fd)));
+ }
+
+ return {};
+}
+
+Expected KqueueMultiplexer::Add(int fd, uint8_t interest) {
+ if (kqueue_fd_ < 0) {
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkReactorRegisterFailed,
+ "KqueueMultiplexer::Add called before Open", "fd=" + std::to_string(fd)));
+ }
+
+ auto result = ApplyInterest(fd, interest, /*old_interest=*/0U, /*is_add=*/true);
+ if (!result.has_value()) {
+ return result;
+ }
+ {
+ std::lock_guard lock(interest_mutex_);
+ interest_[fd] = interest;
+ }
+ return {};
+}
+
+Expected KqueueMultiplexer::Modify(int fd, uint8_t interest) {
+ if (kqueue_fd_ < 0) {
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkReactorModifyFailed,
+ "KqueueMultiplexer::Modify called before Open", "fd=" + std::to_string(fd)));
+ }
+
+ uint8_t old_interest = 0;
+ {
+ std::lock_guard lock(interest_mutex_);
+ const auto it = interest_.find(fd);
+ if (it == interest_.end()) {
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkReactorModifyFailed,
+ "KqueueMultiplexer::Modify called with unknown fd", "fd=" + std::to_string(fd)));
+ }
+ old_interest = it->second;
+ }
+
+ auto result = ApplyInterest(fd, interest, old_interest, /*is_add=*/false);
+ if (!result.has_value()) {
+ return result;
+ }
+ {
+ std::lock_guard lock(interest_mutex_);
+ interest_[fd] = interest;
+ }
+ return {};
+}
+
+Expected KqueueMultiplexer::Remove(int fd) {
+ // Drop our interest-tracking entry before touching the kernel state, so
+ // that even if the kevent teardown syscall below races with connection
+ // close we leave the map consistent.
+ uint8_t old_interest = 0;
+ {
+ std::lock_guard lock(interest_mutex_);
+ const auto it = interest_.find(fd);
+ if (it == interest_.end()) {
+ // Idempotent: never-added or already-removed fds are a no-op success.
+ return {};
+ }
+ old_interest = it->second;
+ interest_.erase(it);
+ }
+
+ if (kqueue_fd_ < 0) {
+ // Multiplexer already torn down; nothing to unregister.
+ return {};
+ }
+
+ std::array changes{};
+ int nchanges = 0;
+ if ((old_interest & event::kReadable) != 0U) {
+ EV_SET(&changes[nchanges], static_cast(fd), EVFILT_READ, EV_DELETE, 0, 0, nullptr);
+ ++nchanges;
+ }
+ if ((old_interest & event::kWritable) != 0U) {
+ EV_SET(&changes[nchanges], static_cast(fd), EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
+ ++nchanges;
+ }
+
+ if (nchanges == 0) {
+ return {};
+ }
+
+ if (::kevent(kqueue_fd_, changes.data(), nchanges, nullptr, 0, nullptr) < 0) {
+ const int en = errno;
+ // Idempotent teardown race: kqueue auto-removes filters on close (EBADF),
+ // and the filter may already be gone from an earlier path (ENOENT).
+ // Everything else is a real failure.
+ if (en == ENOENT || en == EBADF) {
+ return {};
+ }
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkReactorRemoveFailed, FormatErrno("kevent(EV_DELETE)", en),
+ "fd=" + std::to_string(fd)));
+ }
+
+ return {};
+}
+
+Expected KqueueMultiplexer::Poll(int timeout_ms, std::vector& out) {
+ out.clear();
+
+ if (kqueue_fd_ < 0) {
+ return MakeUnexpected(
+ MakeError(ErrorCode::kNetworkReactorPollFailed, "KqueueMultiplexer::Poll called before Open"));
+ }
+
+ // Translate the reactor's integer timeout convention to the pointer-or-null
+ // form that `kevent` expects:
+ // timeout_ms < 0 => tsp == nullptr => block indefinitely
+ // timeout_ms == 0 => zeroed timespec => non-blocking probe
+ // timeout_ms > 0 => populated timespec => relative wait
+ struct timespec ts {};
+ struct timespec* tsp = nullptr;
+ if (timeout_ms == 0) {
+ tsp = &ts; // ts is already zero-initialised.
+ } else if (timeout_ms > 0) {
+ ts.tv_sec = timeout_ms / kMillisPerSecond;
+ ts.tv_nsec = static_cast(timeout_ms % kMillisPerSecond) * kNanosPerMilli;
+ tsp = &ts;
+ }
+
+ const int n = ::kevent(kqueue_fd_, nullptr, 0, events_.data(), static_cast(events_.size()), tsp);
+ if (n < 0) {
+ const int en = errno;
+ // EINTR is not an error condition: a signal interrupted the wait and the
+ // reactor loop will simply call us again. Report success with empty out.
+ if (en == EINTR) {
+ return {};
+ }
+ return MakeUnexpected(MakeError(ErrorCode::kNetworkReactorPollFailed, FormatErrno("kevent", en)));
+ }
+
+ out.reserve(static_cast(n));
+ for (int i = 0; i < n; ++i) {
+ const struct kevent& kev = events_[static_cast(i)];
+ ReadyEvent ready{};
+ ready.fd = static_cast(kev.ident);
+ ready.events = event::kNone;
+
+ // Per kevent(2): `EV_ERROR` signals a per-change failure and `kev.data`
+ // carries the errno. A zero `kev.data` with `EV_ERROR` set is used by
+ // some BSDs as a benign ack of a change-list entry, which we ignore.
+ if ((kev.flags & EV_ERROR) != 0 && kev.data != 0) {
+ ready.events |= event::kError;
+ } else if (kev.filter == EVFILT_READ) {
+ ready.events |= event::kReadable;
+ } else if (kev.filter == EVFILT_WRITE) {
+ ready.events |= event::kWritable;
+ }
+
+ // `EV_EOF` means the peer has half-closed (or fully closed) the
+ // connection. Mirror the epoll backend's EPOLLHUP/EPOLLRDHUP semantics so
+ // the reactor can drain the read side before releasing the connection.
+ if ((kev.flags & EV_EOF) != 0) {
+ ready.events |= event::kHangup;
+ }
+
+ // Multiple kevents for the same fd (e.g. readable and writable delivered
+ // in the same Poll) are intentionally emitted as separate ReadyEvent
+ // entries. IoReactor ORs them together at dispatch time. Coalescing here
+ // would add per-poll bookkeeping for no correctness gain.
+ out.push_back(ready);
+ }
+
+ // Dynamic grow: if this Poll() filled the scratch buffer, chances are we
+ // are running behind and the next tick will need more slots. Double the
+ // capacity up to `kMaxEventBufferSize` so high-concurrency bursts are not
+ // fragmented across multiple Poll() rounds. Growth is one-shot and
+ // monotonic; the buffer never shrinks back down.
+ if (static_cast(n) == events_.size() && events_.size() < kMaxEventBufferSize) {
+ const std::size_t new_size = std::min(events_.size() * 2, kMaxEventBufferSize);
+ events_.resize(new_size);
+ }
+ return {};
+}
+
+} // namespace mygramdb::server::reactor
+
+#endif // defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
diff --git a/src/server/reactor/kqueue_multiplexer.h b/src/server/reactor/kqueue_multiplexer.h
new file mode 100644
index 0000000..cd7ed47
--- /dev/null
+++ b/src/server/reactor/kqueue_multiplexer.h
@@ -0,0 +1,127 @@
+/**
+ * @file kqueue_multiplexer.h
+ * @brief kqueue-based EventMultiplexer backend for BSD and macOS.
+ *
+ * Implements the level-triggered `EventMultiplexer` abstraction on top of
+ * FreeBSD/macOS `kqueue(2)`. Compiled only on BSD-family platforms (including
+ * Darwin); on other platforms this header compiles away to nothing so that
+ * `event_multiplexer.cpp` can `#include` it unconditionally inside the
+ * factory's platform guard.
+ *
+ * Semantics:
+ *
+ * - **Level-triggered.** `EV_CLEAR` is deliberately NOT set. kqueue's default
+ * delivery mode matches epoll's level-triggered contract: while the fd is
+ * still readable/writable and the corresponding filter is armed, every
+ * `Poll()` re-reports the event. This preserves the decoupling between
+ * event notification and consumption that `ReactorConnection` depends on.
+ *
+ * - **Per-fd interest tracking.** kqueue has no single-call "set the current
+ * interest set of this fd to X" primitive the way `epoll_ctl(MOD)` does.
+ * Instead, we must diff the new interest against the previously-armed
+ * interest and emit `EV_ADD` for newly-armed filters and `EV_DELETE` for
+ * newly-disarmed ones. The mapping from fd to last-known interest lives
+ * in `interest_`.
+ *
+ * - **Idempotent teardown.** `Remove()` tolerates `ENOENT`/`EBADF` from
+ * `kevent()` because `IoReactor::Stop()` may race with connection teardown
+ * that already closed the fd (kqueue automatically drops filters on
+ * close).
+ */
+
+#pragma once
+
+#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
+
+#include
+
+#include
+#include
+#include
+#include
+
+#include "server/reactor/event_multiplexer.h"
+#include "utils/error.h"
+#include "utils/expected.h"
+
+namespace mygramdb::server::reactor {
+
+/**
+ * @brief kqueue-backed `EventMultiplexer` implementation.
+ *
+ * Not thread-safe. `IoReactor` owns the instance from a single event-loop
+ * thread and serialises any cross-thread arm/disarm requests before calling
+ * into this class.
+ */
+class KqueueMultiplexer final : public EventMultiplexer {
+ public:
+ KqueueMultiplexer();
+ ~KqueueMultiplexer() override;
+
+ KqueueMultiplexer(const KqueueMultiplexer&) = delete;
+ KqueueMultiplexer& operator=(const KqueueMultiplexer&) = delete;
+ KqueueMultiplexer(KqueueMultiplexer&&) = delete;
+ KqueueMultiplexer& operator=(KqueueMultiplexer&&) = delete;
+
+ /// @copydoc EventMultiplexer::Open
+ mygram::utils::Expected Open() override;
+
+ /// @copydoc EventMultiplexer::Add
+ mygram::utils::Expected Add(int fd, uint8_t interest) override;
+
+ /// @copydoc EventMultiplexer::Modify
+ mygram::utils::Expected Modify(int fd, uint8_t interest) override;
+
+ /// @copydoc EventMultiplexer::Remove
+ mygram::utils::Expected Remove(int fd) override;
+
+ /// @copydoc EventMultiplexer::Poll
+ mygram::utils::Expected Poll(int timeout_ms, std::vector& out) override;
+
+ /// Backend identifier for metrics and logging.
+ const char* Name() const override { return "kqueue"; }
+
+ private:
+ /**
+ * @brief Diff `old_interest` against `new_interest` and apply the delta.
+ *
+ * @param fd Target file descriptor.
+ * @param new_interest Desired interest bitmask (`event::kReadable` |
+ * `event::kWritable`).
+ * @param old_interest Previously-armed interest bitmask. Ignored when
+ * `is_add` is true.
+ * @param is_add If true, this is a fresh `Add()` and we unconditionally
+ * emit `EV_ADD` for every bit set in `new_interest`.
+ * If false, only the bits that changed are touched.
+ *
+ * Emits up to two `struct kevent` change records and flushes them in a
+ * single `kevent()` call. Returns `kNetworkReactorRegisterFailed` (on add)
+ * or `kNetworkReactorModifyFailed` (on modify) if the syscall fails.
+ */
+ mygram::utils::Expected ApplyInterest(int fd, uint8_t new_interest, uint8_t old_interest,
+ bool is_add);
+
+ int kqueue_fd_ = -1;
+
+ /// Reusable output buffer for `kevent()`. Sized at construction and
+ /// doubled on demand (up to a fixed cap) whenever a Poll() fills it
+ /// completely, so sustained bursts do not fragment across multiple Poll
+ /// rounds. Touched only by the event-loop thread via `Poll()`; no locking.
+ std::vector events_;
+
+ /// Mutex protecting `interest_`. IoReactor now allows concurrent Poll (on
+ /// the event-loop thread) and Add/Modify/Remove (from accept / worker
+ /// threads) because the kqueue kernel object is thread-safe for that
+ /// pattern. The `interest_` map, however, is process-level state we keep
+ /// ourselves and therefore needs its own synchronisation.
+ mutable std::mutex interest_mutex_;
+
+ /// Last-known interest mask per registered fd. Populated by `Add()`,
+ /// updated by `Modify()`, erased by `Remove()`. Required because kqueue
+ /// does not let us read back the currently-armed filter set.
+ std::unordered_map interest_;
+};
+
+} // namespace mygramdb::server::reactor
+
+#endif // __APPLE__ || BSD family
diff --git a/src/server/reactor_connection.cpp b/src/server/reactor_connection.cpp
new file mode 100644
index 0000000..cb7c6a7
--- /dev/null
+++ b/src/server/reactor_connection.cpp
@@ -0,0 +1,451 @@
+/**
+ * @file reactor_connection.cpp
+ * @brief Per-connection state + drain-task-per-connection pattern.
+ *
+ * Implements the Phase 2 read side + Phase 3 non-blocking write queue of
+ * the reactor refactor described in docs/ja/design/reactor-io-refactor.md
+ * §4.3/§7 R3.
+ */
+
+#include "server/reactor_connection.h"
+
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "server/io_reactor.h"
+#include "server/request_dispatcher.h"
+#include "server/server_stats.h"
+#include "server/thread_pool.h"
+#include "utils/structured_log.h"
+
+namespace mygramdb::server {
+
+namespace {
+constexpr size_t kRecvChunkBytes = 4096;
+constexpr const char kFrameDelimiter[] = "\r\n";
+constexpr size_t kFrameDelimiterLen = 2;
+constexpr const char kResponseTerminator[] = "\r\n";
+constexpr size_t kResponseTerminatorLen = 2;
+} // namespace
+
+std::shared_ptr ReactorConnection::Create(int fd, IoReactor* reactor, RequestDispatcher* dispatcher,
+ ThreadPool* thread_pool, ServerStats* stats,
+ size_t max_write_queue_bytes) {
+ return std::make_shared(fd, reactor, dispatcher, thread_pool, stats, max_write_queue_bytes);
+}
+
+ReactorConnection::ReactorConnection(int fd, IoReactor* reactor, RequestDispatcher* dispatcher, ThreadPool* thread_pool,
+ ServerStats* stats, size_t max_write_queue_bytes)
+ : fd_(fd),
+ max_write_queue_bytes_(max_write_queue_bytes),
+ reactor_(reactor),
+ dispatcher_(dispatcher),
+ thread_pool_(thread_pool),
+ stats_(stats) {
+ conn_ctx_.client_fd = fd_;
+ read_buf_.reserve(kDefaultReadBufferBytes);
+}
+
+ReactorConnection::~ReactorConnection() {
+ if (!closed_ && fd_ >= 0) {
+ ::close(fd_);
+ closed_ = true;
+ }
+}
+
+bool ReactorConnection::OnReadable() {
+ if (closing_.load(std::memory_order_acquire)) {
+ return false;
+ }
+
+ // If the peer already half-closed (we saw recv()==0 on a previous readable
+ // event), suppress further recv() calls. The write side may still be open
+ // while the drain task flushes responses, so we remain registered until
+ // DrainTask finishes and marks us closing_.
+ if (read_eof_.load(std::memory_order_acquire)) {
+ return true;
+ }
+
+ // 1. Drain the socket until EAGAIN / EWOULDBLOCK.
+ std::array chunk{};
+ while (true) {
+ ssize_t n = ::recv(fd_, chunk.data(), chunk.size(), 0);
+ if (n > 0) {
+ // Hard cap on the accumulation buffer — slow-reader / malformed frame
+ // protection.
+ if (read_buf_.size() + static_cast(n) > kMaxReadBufferBytes) {
+ mygram::utils::StructuredLog()
+ .Event("reactor_read_buf_overflow")
+ .Field("fd", static_cast(fd_))
+ .Field("buf_bytes", static_cast(read_buf_.size() + static_cast(n)))
+ .Field("cap_bytes", static_cast(kMaxReadBufferBytes))
+ .Warn();
+ closing_.store(true, std::memory_order_release);
+ return false;
+ }
+ read_buf_.insert(read_buf_.end(), chunk.data(), chunk.data() + n);
+ continue;
+ }
+ if (n == 0) {
+ // Peer performed orderly close or half-close (shutdown(SHUT_WR)). The
+ // write side of the socket may still be open, so we must not tear down
+ // the connection here — we have to finish dispatching any already
+ // framed requests and flush the response. Set read_eof_ so subsequent
+ // OnReadable calls short-circuit, then fall through to frame
+ // extraction + drain task scheduling below. The drain task closes the
+ // connection after the last response has been queued for send.
+ read_eof_.store(true, std::memory_order_release);
+ break;
+ }
+ // n < 0
+ if (errno == EINTR) {
+ continue;
+ }
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ break;
+ }
+ mygram::utils::StructuredLog()
+ .Event("reactor_recv_failed")
+ .Field("fd", static_cast(fd_))
+ .Field("errno", static_cast(errno))
+ .Field("error", std::strerror(errno))
+ .Warn();
+ closing_.store(true, std::memory_order_release);
+ return false;
+ }
+
+ // 2. Extract complete frames and push onto the drain queue.
+ size_t enqueued = 0;
+ {
+ std::lock_guard lock(frame_mutex_);
+ enqueued = ExtractFramesLocked();
+ }
+
+ // 3. If we parsed at least one frame, make sure a drain task is running.
+ // The drain task will close the connection on behalf of the read path
+ // once read_eof_ is set and there is nothing left to flush.
+ if (enqueued > 0) {
+ if (!ScheduleDrainTask()) {
+ return false;
+ }
+ }
+
+ // If the peer half-closed and there are no frames in flight and nothing
+ // pending in the write queue, we can tear down immediately. Otherwise the
+ // drain task (or OnWritable, after the write queue drains) will do the
+ // close for us.
+ if (read_eof_.load(std::memory_order_acquire)) {
+ bool nothing_to_dispatch = false;
+ {
+ std::lock_guard lock(frame_mutex_);
+ nothing_to_dispatch = pending_frames_.empty() && !drain_scheduled_.load(std::memory_order_acquire);
+ }
+ bool write_queue_empty = false;
+ {
+ std::lock_guard lock(write_mutex_);
+ write_queue_empty = write_queue_.empty();
+ }
+ if (nothing_to_dispatch && write_queue_empty) {
+ closing_.store(true, std::memory_order_release);
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool ReactorConnection::OnWritable() {
+ std::unique_lock lock(write_mutex_);
+
+ if (!DrainWriteQueueLocked()) {
+ // Fatal send error during drain.
+ closing_.store(true, std::memory_order_release);
+ return false;
+ }
+
+ if (!write_queue_.empty()) {
+ // Partial drain: leave the queue armed, fire again on next writable event.
+ return true;
+ }
+
+ // Fully drained: disarm kWritable so the event loop stops spinning on
+ // this fd. If we had never actually armed (edge case — OnWritable fired
+ // spuriously), skip the disarm call.
+ if (write_armed_ && reactor_ != nullptr) {
+ (void)reactor_->DisarmWrite(fd_);
+ write_armed_ = false;
+ }
+
+ if (closing_.load(std::memory_order_acquire)) {
+ return false;
+ }
+
+ // Peer already half-closed and the drain task has no more work in flight:
+ // we just flushed the last response, so unregister now.
+ if (read_eof_.load(std::memory_order_acquire) && !drain_scheduled_.load(std::memory_order_acquire)) {
+ std::lock_guard frames_lock(frame_mutex_);
+ if (pending_frames_.empty()) {
+ closing_.store(true, std::memory_order_release);
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool ReactorConnection::OnError() {
+ closing_.store(true, std::memory_order_release);
+ return false;
+}
+
+size_t ReactorConnection::ExtractFramesLocked() {
+ size_t enqueued = 0;
+ size_t scan_start = 0;
+ size_t consumed = 0;
+ while (scan_start + kFrameDelimiterLen <= read_buf_.size()) {
+ // Search for the next delimiter.
+ const char* begin = read_buf_.data() + scan_start;
+ const size_t remaining = read_buf_.size() - scan_start;
+ const char* found = static_cast(std::memchr(begin, kFrameDelimiter[0], remaining));
+ if (found == nullptr) {
+ break;
+ }
+ const size_t found_off = static_cast(found - read_buf_.data());
+ if (found_off + kFrameDelimiterLen > read_buf_.size()) {
+ break; // delimiter straddles the buffer end; wait for more bytes
+ }
+ if (read_buf_[found_off + 1] != kFrameDelimiter[1]) {
+ // Lone CR without LF — skip past the CR and keep scanning.
+ scan_start = found_off + 1;
+ continue;
+ }
+ // Frame is [consumed, found_off); delimiter is [found_off, found_off+2).
+ const size_t frame_len = found_off - consumed;
+ pending_frames_.emplace_back(read_buf_.data() + consumed, frame_len);
+ ++enqueued;
+ consumed = found_off + kFrameDelimiterLen;
+ scan_start = consumed;
+ }
+ if (consumed > 0) {
+ // Single splice at the end to avoid quadratic erase-per-frame cost.
+ read_buf_.erase(read_buf_.begin(), read_buf_.begin() + static_cast(consumed));
+ }
+ return enqueued;
+}
+
+bool ReactorConnection::ScheduleDrainTask() {
+ bool expected = false;
+ if (!drain_scheduled_.compare_exchange_strong(expected, true, std::memory_order_acq_rel, std::memory_order_acquire)) {
+ // A drain task is already running or queued; it will pick up the new
+ // frames when it next checks `pending_frames_`.
+ return true;
+ }
+
+ if (thread_pool_ == nullptr || dispatcher_ == nullptr) {
+ // Misconfiguration — no way to process the frames.
+ drain_scheduled_.store(false, std::memory_order_release);
+ return false;
+ }
+
+ auto self = shared_from_this();
+ const bool submitted = thread_pool_->Submit([self]() { self->DrainTask(); });
+ if (!submitted) {
+ drain_scheduled_.store(false, std::memory_order_release);
+ mygram::utils::StructuredLog().Event("reactor_drain_submit_failed").Field("fd", static_cast(fd_)).Warn();
+ closing_.store(true, std::memory_order_release);
+ return false;
+ }
+ return true;
+}
+
+void ReactorConnection::DrainTask() {
+ while (!closing_.load(std::memory_order_acquire)) {
+ std::string frame;
+ {
+ std::lock_guard lock(frame_mutex_);
+ if (pending_frames_.empty()) {
+ break;
+ }
+ frame = std::move(pending_frames_.front());
+ pending_frames_.pop_front();
+ }
+
+ // Dispatch. `Dispatch` is synchronous and returns the full response.
+ std::string response = dispatcher_->Dispatch(frame, conn_ctx_);
+ if (stats_ != nullptr) {
+ // Mirror the blocking path's per-request counter increment.
+ stats_->IncrementRequests();
+ }
+
+ // Enqueue the response for non-blocking send. The fast path in
+ // EnqueueResponse attempts an inline drain before returning; only on
+ // EAGAIN does it hand off to the event loop via ArmWrite.
+ if (!EnqueueResponse(std::move(response))) {
+ closing_.store(true, std::memory_order_release);
+ break;
+ }
+ }
+
+ // Netty/Vert.x "clear-then-recheck": before releasing the drain slot,
+ // confirm that no new frames arrived in the window between the last
+ // queue-empty check and now. If frames did arrive, reschedule ourselves.
+ bool reschedule = false;
+ {
+ std::lock_guard lock(frame_mutex_);
+ if (!pending_frames_.empty() && !closing_.load(std::memory_order_acquire)) {
+ reschedule = true;
+ }
+ }
+ drain_scheduled_.store(false, std::memory_order_release);
+ if (reschedule) {
+ (void)ScheduleDrainTask();
+ return;
+ }
+
+ // If the peer half-closed (recv()==0) and we just finished dispatching the
+ // last buffered frame, we own the close. Wait for the write queue to
+ // drain first — the last response may still be in flight via
+ // EnqueueResponse's EPOLLOUT fallback, in which case OnWritable will
+ // perform the unregister once the queue empties.
+ if (read_eof_.load(std::memory_order_acquire) && !closing_.load(std::memory_order_acquire)) {
+ bool write_queue_empty = false;
+ {
+ std::lock_guard lock(write_mutex_);
+ write_queue_empty = write_queue_.empty();
+ }
+ if (write_queue_empty) {
+ closing_.store(true, std::memory_order_release);
+ }
+ }
+
+ if (closing_.load(std::memory_order_acquire) && reactor_ != nullptr) {
+ // Ask the reactor to unregister us. This is safe from a worker: the
+ // IoReactor::Unregister acquires the connections_ write lock, and the
+ // event loop will observe the erase on its next Poll iteration. The
+ // shared_ptr held by this lambda capture keeps the object alive until
+ // DrainTask returns.
+ reactor_->Unregister(fd_);
+ }
+}
+
+bool ReactorConnection::EnqueueResponse(std::string response) {
+ // Payload + CRLF terminator. We hold write_mutex_ across the entire
+ // enqueue + optional inline drain + optional ArmWrite sequence so that
+ // the event loop's OnWritable cannot race and pop frames out from under
+ // us mid-drain, and so OnWritable cannot observe write_armed_ in an
+ // inconsistent state relative to the multiplexer's interest mask.
+ //
+ // Holding `write_mutex_` across `reactor_->ArmWrite` is safe: ArmWrite
+ // only acquires the reactor's `mux_lifecycle_` (shared). No IoReactor
+ // method ever takes `write_mutex_`, so there is no reverse lock order.
+ const size_t payload_bytes = response.size() + kResponseTerminatorLen;
+
+ std::unique_lock lock(write_mutex_);
+
+ if (closing_.load(std::memory_order_acquire)) {
+ return false;
+ }
+
+ // Slow-reader backpressure: cap the per-connection unsent byte budget.
+ // Design doc §7 R3: exceeding the cap means the peer cannot keep up and
+ // the server forcibly closes the connection to protect its own memory.
+ if (write_queue_bytes_ + payload_bytes > max_write_queue_bytes_) {
+ mygram::utils::StructuredLog()
+ .Event("reactor_write_queue_overflow")
+ .Field("fd", static_cast(fd_))
+ .Field("current_bytes", static_cast(write_queue_bytes_))
+ .Field("attempted_bytes", static_cast(payload_bytes))
+ .Field("cap_bytes", static_cast(max_write_queue_bytes_))
+ .Warn();
+ return false;
+ }
+
+ response.append(kResponseTerminator, kResponseTerminatorLen);
+ write_queue_.emplace_back(std::move(response));
+ write_queue_bytes_ += payload_bytes;
+ pending_write_bytes_.store(write_queue_bytes_, std::memory_order_relaxed);
+
+ // Fast path: if the queue is not currently armed for EPOLLOUT, the
+ // event loop is NOT going to drain us. Try an inline non-blocking drain
+ // right here on the worker thread (design doc §4.2 D6: attempt write
+ // immediately, register EPOLLOUT on EAGAIN).
+ if (!write_armed_) {
+ if (!DrainWriteQueueLocked()) {
+ return false; // fatal send error
+ }
+ if (write_queue_.empty()) {
+ return true; // fully drained inline — no arming required
+ }
+ // Residue remains → ask the reactor to arm kWritable so the event
+ // loop takes over.
+ if (reactor_ == nullptr) {
+ // Unit-test harness with no reactor and residue we cannot arm on.
+ return false;
+ }
+ auto arm_result = reactor_->ArmWrite(fd_);
+ if (!arm_result) {
+ mygram::utils::StructuredLog()
+ .Event("reactor_arm_write_failed")
+ .Field("fd", static_cast(fd_))
+ .Field("error", arm_result.error().to_string())
+ .Warn();
+ return false;
+ }
+ write_armed_ = true;
+ }
+ // Queue was already armed — event loop's OnWritable will pick up the
+ // new entries when it next fires.
+ return true;
+}
+
+bool ReactorConnection::DrainWriteQueueLocked() {
+ while (!write_queue_.empty()) {
+ const std::string& front = write_queue_.front();
+ const char* data = front.data() + front_offset_;
+ const size_t remaining = front.size() - front_offset_;
+
+ ssize_t n = ::send(fd_, data, remaining, MSG_NOSIGNAL);
+ if (n > 0) {
+ front_offset_ += static_cast(n);
+ write_queue_bytes_ -= static_cast(n);
+ pending_write_bytes_.store(write_queue_bytes_, std::memory_order_relaxed);
+ if (front_offset_ == front.size()) {
+ write_queue_.pop_front();
+ front_offset_ = 0;
+ }
+ continue;
+ }
+ if (n == 0) {
+ // send() returning 0 on a non-zero-length buffer is undefined per POSIX
+ // but defensively treat as a fatal peer state.
+ return false;
+ }
+ // n < 0
+ if (errno == EINTR) {
+ continue;
+ }
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return true; // partial — event loop will finish via OnWritable
+ }
+ // EPIPE / ECONNRESET / ENOTCONN / etc.
+ mygram::utils::StructuredLog()
+ .Event("reactor_send_failed")
+ .Field("fd", static_cast(fd_))
+ .Field("errno", static_cast(errno))
+ .Field("error", std::strerror(errno))
+ .Debug();
+ return false;
+ }
+ return true;
+}
+
+} // namespace mygramdb::server
diff --git a/src/server/reactor_connection.h b/src/server/reactor_connection.h
new file mode 100644
index 0000000..f3943e5
--- /dev/null
+++ b/src/server/reactor_connection.h
@@ -0,0 +1,313 @@
+/**
+ * @file reactor_connection.h
+ * @brief Heap-allocated per-connection state for the reactor I/O model.
+ *
+ * This is the Phase 2/3 implementation of the per-connection state object
+ * described in docs/ja/design/reactor-io-refactor.md §4.3. An instance is
+ * created per accepted client socket and lives on the heap as a
+ * `std::shared_ptr`, jointly owned by:
+ * - `IoReactor`'s connection map (primary owner), and
+ * - any in-flight drain task captured by the thread pool.
+ *
+ * The shared ownership is deliberate (design doc §7 R5): once a worker has
+ * started draining a connection's frame queue we must keep the object alive
+ * until the worker finishes writing, even if the event loop has already
+ * observed EPOLLHUP and unregistered the fd.
+ *
+ * Naming note: the design document calls this class `ConnectionContext`, but
+ * that name is already used by `mygramdb::server::ConnectionContext` in
+ * `server_types.h` for the per-request dispatch struct passed to command
+ * handlers. To avoid a disruptive rename across ~36 files, the reactor
+ * per-connection state type is introduced here as `ReactorConnection`.
+ * Semantically it is exactly the type described in §4.3 of the design doc.
+ *
+ * -----------------------------------------------------------------------
+ * Thread-safety contract
+ * -----------------------------------------------------------------------
+ * - `read_buf_` is touched exclusively by the event-loop thread (via
+ * `OnReadable`) and requires no locking.
+ * - `pending_frames_` is shared between the event-loop thread (producer)
+ * and a worker thread (consumer) and is protected by `frame_mutex_`.
+ * - `write_queue_`, `write_queue_bytes_`, `front_offset_`, `write_armed_`
+ * are shared between the worker thread (via `EnqueueResponse` → inline
+ * drain) and the event-loop thread (via `OnWritable`) and are protected
+ * by `write_mutex_`. The contract is: holders of `write_mutex_` may
+ * call `reactor_->ArmWrite/DisarmWrite` while the mutex is held. The
+ * reverse is never done (no IoReactor method acquires `write_mutex_`).
+ * - `closing_` and `drain_scheduled_` are atomics.
+ * - `fd_` is immutable after construction; the destructor closes it
+ * exactly once via `closed_` guard.
+ * - `reactor_`, `dispatcher_`, `thread_pool_` are set once at construction
+ * and read-only thereafter.
+ */
+
+#pragma once
+
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "server/server_types.h"
+
+namespace mygramdb::server {
+
+class IoReactor;
+class RequestDispatcher;
+class ServerStats;
+class ThreadPool;
+
+/**
+ * @brief Per-connection state owned jointly by the reactor and drain tasks.
+ *
+ * Lifetime:
+ * 1. `IoReactor::Register` inserts the shared_ptr into its map and arms
+ * `kReadable` on the multiplexer.
+ * 2. The event loop calls `OnReadable`/`OnError`. When `OnReadable` parses
+ * at least one complete frame it schedules a drain task via the
+ * `ThreadPool`; the task captures a shared_ptr copy.
+ * 3. If the connection is torn down (peer close, error, write failure),
+ * either the event loop or the drain task calls
+ * `IoReactor::Unregister(fd)`, which removes the shared_ptr from the
+ * map. The object is destroyed when the last shared_ptr (typically the
+ * drain task's) drops, and the destructor closes `fd_`.
+ */
+class ReactorConnection : public std::enable_shared_from_this {
+ public:
+ /// Default read buffer reservation. Grows on demand up to kMaxReadBufferBytes.
+ static constexpr size_t kDefaultReadBufferBytes = 4096;
+
+ /// Hard cap on the read accumulation buffer. This is an OOM safety rail
+ /// only — per-query size enforcement (`api.max_query_length`) is the
+ /// responsibility of the downstream query parser, which rejects oversized
+ /// requests with a structured error. 1 MiB is comfortably above the
+ /// default `max_query_length` (~64 KiB) and is deliberately decoupled
+ /// from config so that lowering `max_query_length` at runtime cannot make
+ /// the reactor drop well-formed but large requests that are still in
+ /// flight on an existing connection.
+ static constexpr size_t kMaxReadBufferBytes = 1 * 1024 * 1024; // 1 MiB
+
+ /// Hard upper bound on unsent response bytes; once exceeded the reactor
+ /// forcibly closes the connection to protect against slow-reader OOM
+ /// (see design doc §7 R3). Phase 3 enforces this cap in `EnqueueResponse`:
+ /// a push that would exceed the cap sets `closing_` and causes the drain
+ /// task to tear down the connection.
+ static constexpr size_t kDefaultMaxWriteQueueBytes = 16 * 1024 * 1024; // 16 MiB
+
+ /**
+ * @brief Factory. Must be used instead of a bare constructor because
+ * `std::enable_shared_from_this` requires the object to live inside
+ * a `shared_ptr` from the moment it is born.
+ *
+ * @param stats Optional non-owning pointer to `ServerStats`. If non-null,
+ * the drain task calls `stats->IncrementRequests()` after
+ * each successful Dispatch, matching the blocking path's
+ * per-request counter. May be null in unit tests.
+ */
+ static std::shared_ptr Create(int fd, IoReactor* reactor, RequestDispatcher* dispatcher,
+ ThreadPool* thread_pool, ServerStats* stats = nullptr,
+ size_t max_write_queue_bytes = kDefaultMaxWriteQueueBytes);
+
+ /**
+ * @brief Public constructor (required by `std::make_shared`). Prefer
+ * `Create()` at call sites for clarity.
+ */
+ ReactorConnection(int fd, IoReactor* reactor, RequestDispatcher* dispatcher, ThreadPool* thread_pool,
+ ServerStats* stats, size_t max_write_queue_bytes);
+
+ ~ReactorConnection();
+
+ ReactorConnection(const ReactorConnection&) = delete;
+ ReactorConnection& operator=(const ReactorConnection&) = delete;
+ ReactorConnection(ReactorConnection&&) = delete;
+ ReactorConnection& operator=(ReactorConnection&&) = delete;
+
+ /// Returns the raw client fd. The reactor still owns close(2).
+ [[nodiscard]] int Fd() const { return fd_; }
+
+ // ---- Reactor event callbacks (event-loop thread) --------------------
+
+ /**
+ * @brief Handle `event::kReadable` for this connection.
+ *
+ * Drains the socket via non-blocking recv() into `read_buf_`, scans for
+ * "\r\n"-delimited frames, enqueues each complete frame onto
+ * `pending_frames_`, and schedules a single drain task on the thread pool
+ * if one is not already in flight.
+ *
+ * @return false if the reactor should close and unregister this fd.
+ */
+ bool OnReadable();
+
+ /**
+ * @brief Handle `event::kWritable` for this connection.
+ *
+ * Phase 3: drain `write_queue_` via non-blocking `send()` until EAGAIN
+ * or empty. On full drain, call `reactor_->DisarmWrite(fd_)` and return
+ * true (or false if `closing_` was also set, so the reactor tears down
+ * the fd). On partial drain, leave the queue armed and return true. On
+ * fatal send error (EPIPE / ECONNRESET / etc.), return false.
+ */
+ bool OnWritable();
+
+ /**
+ * @brief Handle `event::kError` / `event::kHangup` for this connection.
+ * Always returns false so the reactor tears the fd down.
+ */
+ bool OnError();
+
+ /// Current bytes held in the pending write accounting (for metrics / tests).
+ [[nodiscard]] size_t PendingWriteBytes() const { return pending_write_bytes_.load(std::memory_order_relaxed); }
+
+ /// Whether `closing_` has been set. Exposed for tests.
+ [[nodiscard]] bool IsClosing() const { return closing_.load(std::memory_order_acquire); }
+
+ /// Returns the number of frames currently in `pending_frames_`. Exposed for tests only.
+ [[nodiscard]] size_t PendingFrameCountForTest() const {
+ std::lock_guard lock(frame_mutex_);
+ return pending_frames_.size();
+ }
+
+ /// Current number of entries in the write queue. TEST ONLY.
+ [[nodiscard]] size_t WriteQueueDepthForTest() const {
+ std::lock_guard lock(write_mutex_);
+ return write_queue_.size();
+ }
+
+ /// Whether the reactor currently has `kWritable` armed for this fd.
+ /// TEST ONLY.
+ [[nodiscard]] bool WriteArmedForTest() const {
+ std::lock_guard