diff --git a/.gitignore b/.gitignore index 8baba63..11e03cd 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,4 @@ tmp_reply.json # Internal documentation (never commit) .internalDoc/ /.windsurfrules +.full_engine_diff.patch diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e39966..afd17c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,42 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 and confined to the wrapper module; every `unsafe` block delegates immediately to the inner allocator. +### Added — Prometheus metrics feature (#60) + +- **New optional `metrics` feature flag** (default off). When + enabled, the matching core emits Prometheus-style counters and + gauges through the [`metrics`](https://docs.rs/metrics) crate's + global facade. Any compatible recorder (Prometheus exporter, + OpenTelemetry bridge, custom collector) can scrape them. +- **Surface (stable across `0.7.x`):** + - `orderbook_rejects_total{reason="..."}` — counter, + incremented exactly once per rejection. Label value is the + `RejectReason` `Display` string. + - `orderbook_depth_levels_bid` / `orderbook_depth_levels_ask` + — gauges, current count of distinct price levels per side, + refreshed on every add / cancel / modify / fill. + - `orderbook_trades_total` — counter, monotonic count of every + emitted trade transaction (one increment per `MatchResult` + transaction, summed across all listener-emitted and + internal-only matches). +- **Out-of-band emission.** Allocation-free on the happy path, + no influence on matching outcomes, no recorder dependency on + the core engine. `restore_from_snapshot_package` does **not** + rehydrate counters — operational only, process-lifetime. +- **Compile-time no-op when the feature is disabled.** Every + helper in `orderbook::metrics` compiles down to an empty + function so call-sites in the matching hot path stay + unconditional. +- **`metrics = "0.24"`** is the new optional dependency. +- Integration test `tests/metrics/` (its own test binary so the + global recorder isn't perturbed by the rest of the suite) + covers reject counts, trade counts, depth gauges, and a + determinism guard that proves metrics emission does not alter + byte-identical snapshots. +- Example `examples/src/bin/prometheus_export.rs` demonstrates + installing `metrics-exporter-prometheus` and dumping the + exposition payload. + ### Added — HDR-histogram tail-latency bench suite (#56) - **Six new bench binaries** under `benches/order_book/*_hdr.rs` that diff --git a/Cargo.toml b/Cargo.toml index 4b93ac4..68fbc3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ bytes = { workspace = true, optional = true } bincode = { workspace = true, optional = true } crc32fast = { workspace = true, optional = true } memmap2 = { workspace = true, optional = true } +metrics = { workspace = true, optional = true } [features] default = [] @@ -56,6 +57,7 @@ nats = ["dep:async-nats", "dep:bytes"] bincode = ["dep:bincode"] journal = ["dep:crc32fast", "dep:memmap2"] alloc-counters = [] +metrics = ["dep:metrics"] [dev-dependencies] criterion = { version = "0.8", features = ["html_reports"] } @@ -114,6 +116,11 @@ name = "alloc_budget" path = "tests/alloc_budget.rs" required-features = ["alloc-counters"] +[[test]] +name = "metrics_tests" +path = "tests/metrics/mod.rs" +required-features = ["metrics"] + [lib] name = "orderbook_rs" @@ -146,3 +153,4 @@ bytes = "1" bincode = { version = "2.0", features = ["serde"] } crc32fast = "1" memmap2 = "0.9" +metrics = "0.24" diff --git a/examples/Cargo.toml b/examples/Cargo.toml index bba15b7..ead7c1a 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [features] default = [] special_orders = ["orderbook-rs/special_orders"] +metrics = ["orderbook-rs/metrics", "dep:metrics", "dep:metrics-exporter-prometheus"] [dependencies] orderbook-rs = { workspace = true } @@ -15,7 +16,13 @@ uuid = { workspace = true } pricelevel = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +metrics = { version = "0.24", optional = true } +metrics-exporter-prometheus = { version = "0.18", optional = true, default-features = false } [[bin]] name = "special_orders_demo" required-features = ["special_orders"] + +[[bin]] +name = "prometheus_export" +required-features = ["metrics"] diff --git a/examples/src/bin/prometheus_export.rs b/examples/src/bin/prometheus_export.rs new file mode 100644 index 0000000..9ec7cb1 --- /dev/null +++ b/examples/src/bin/prometheus_export.rs @@ -0,0 +1,111 @@ +// examples/src/bin/prometheus_export.rs +// +// Operator demo of the optional `metrics` feature (issue #60). +// +// Builds an OrderBook, runs a small mix of accepted and rejected +// flow, then dumps the recorded counters / gauges in Prometheus +// text format via `metrics-exporter-prometheus`. +// +// Run with: +// +// cd examples +// cargo run --features metrics --bin prometheus_export +// +// Expected on stdout: a Prometheus exposition payload containing +// * orderbook_rejects_total{reason="..."} +// * orderbook_depth_levels_bid / orderbook_depth_levels_ask +// * orderbook_trades_total + +use metrics_exporter_prometheus::PrometheusBuilder; +use orderbook_rs::{OrderBook, OrderBookError}; +use pricelevel::{Hash32, Id, Side, TimeInForce, setup_logger}; +use tracing::{info, warn}; + +fn main() { + let _ = setup_logger(); + info!("Prometheus export demo"); + + // Install the Prometheus recorder. `install_recorder` returns a + // handle whose `render()` method emits the current snapshot in + // Prometheus text exposition format. In production you'd serve + // that string over HTTP at /metrics. + let handle = PrometheusBuilder::new() + .install_recorder() + .expect("install Prometheus recorder"); + + let book = OrderBook::<()>::new("BTC/USD"); + + // 1. Seed both sides with limit orders. + seed_resting_book(&book); + + // 2. Cross a couple of trades to bump `orderbook_trades_total`. + cross_some_trades(&book); + + // 3. Trigger a few rejects to populate `orderbook_rejects_total`. + trigger_rejects(&book); + + // 4. Render the Prometheus exposition payload. + let scrape = handle.render(); + info!("--- Prometheus exposition (current snapshot) ---"); + println!("{scrape}"); + info!("--- end of exposition ---"); +} + +fn seed_resting_book(book: &OrderBook<()>) { + let user = Hash32::zero(); + + let resting: [(u128, u64, Side); 6] = [ + (100, 5, Side::Buy), + (99, 8, Side::Buy), + (98, 3, Side::Buy), + (101, 5, Side::Sell), + (102, 6, Side::Sell), + (103, 4, Side::Sell), + ]; + + for (price, qty, side) in resting { + if let Err(err) = book.add_limit_order_with_user( + Id::new_uuid(), + price, + qty, + side, + TimeInForce::Gtc, + user, + None, + ) { + warn!("seed add failed: {err}"); + } + } +} + +fn cross_some_trades(book: &OrderBook<()>) { + // Aggressive buys against the resting asks. + for (limit, qty) in [(102u128, 4u64), (103, 3)] { + match book.add_limit_order( + Id::new_uuid(), + limit, + qty, + Side::Buy, + TimeInForce::Gtc, + None, + ) { + Ok(_) => info!("aggressive buy filled at limit {limit} qty {qty}"), + Err(err) => warn!("aggressive buy failed: {err}"), + } + } +} + +fn trigger_rejects(book: &OrderBook<()>) { + // Engage the kill switch to force a reject from the canonical + // taxonomy. Releases immediately so the book still serves the + // last metric render correctly. + book.engage_kill_switch(); + let result = book.add_limit_order(Id::new_uuid(), 100, 1, Side::Buy, TimeInForce::Gtc, None); + match result { + Err(OrderBookError::KillSwitchActive) => { + info!("expected KillSwitchActive reject recorded as a metric") + } + other => warn!("unexpected reject result: {other:?}"), + } + book.release_kill_switch(); +} diff --git a/src/lib.rs b/src/lib.rs index f54cc90..c7e8a1d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,6 +50,40 @@ //! regression detection. //! - **`BENCH.md`** gains an "Allocation profile" section. //! +//! ### v0.7.0 — Metrics and Observability (#60) +//! +//! - **New optional `metrics` feature** wires Prometheus-style +//! counters and gauges into the matching engine. Default `off`; +//! when enabled, every increment goes through the global +//! [`metrics`](https://docs.rs/metrics) facade so any compatible +//! recorder (Prometheus exporter, OpenTelemetry bridge, etc.) +//! can collect them. +//! - **Surface (stable across `0.7.x`):** +//! - `orderbook_rejects_total{reason="..."}` — counter, one +//! increment per rejected order. Label value is the +//! [`RejectReason`] [`Display`](std::fmt::Display) string. +//! - `orderbook_depth_levels_bid` / +//! `orderbook_depth_levels_ask` — gauges, current count of +//! distinct price levels on each side. Updated on every +//! structural mutation (add, cancel, modify, fill). +//! - `orderbook_trades_total` — counter, monotonic count of +//! every emitted trade transaction (one increment per +//! `MatchResult` transaction). +//! - **Determinism preserved.** Metrics emission is out-of-band: +//! no allocation on the happy path, no influence on matching +//! outcomes, and `restore_from_snapshot_package` deliberately +//! does **not** rehydrate counters — they are operational only +//! and live for the process lifetime. The integration test +//! `tests/metrics/` proves byte-identical snapshots between two +//! books with metrics enabled. +//! - **Compile-time no-op.** When the feature is off every helper +//! in [`orderbook::metrics`] compiles to an empty function so +//! call-sites in the matching hot path stay unconditional. +//! - Example: `examples/src/bin/prometheus_export.rs` (run with +//! `cargo run --features metrics --bin prometheus_export`) +//! demonstrates installing the `metrics-exporter-prometheus` +//! recorder and dumping the exposition payload. +//! //! ### v0.7.0 — HDR-histogram tail-latency bench suite //! //! - **Six new `*_hdr` bench binaries** under diff --git a/src/orderbook/book.rs b/src/orderbook/book.rs index f2a4076..be5480e 100644 --- a/src/orderbook/book.rs +++ b/src/orderbook/book.rs @@ -489,6 +489,27 @@ where self.engine_seq.load(Ordering::Acquire) } + /// Refresh the operational depth gauges with the current count + /// of distinct bid / ask price levels. + /// + /// Hooked from every structural mutation site so the published + /// gauge tracks the book's true level count without affecting + /// matching latency on the happy path. + /// + /// When the `metrics` feature is disabled this compiles to an + /// empty function so the `bids.len()` / `asks.len()` reads are + /// also elided — every caller is a true zero-cost no-op. + #[cfg(feature = "metrics")] + #[inline] + pub(super) fn record_depth_metric(&self) { + super::metrics::record_depth(self.bids.len() as u64, self.asks.len() as u64); + } + + /// No-op variant when the `metrics` feature is disabled. + #[cfg(not(feature = "metrics"))] + #[inline] + pub(super) fn record_depth_metric(&self) {} + /// Engage the kill switch. While engaged, every public `submit_*`, /// `add_order`, and non-cancel `update_order` call returns /// [`OrderBookError::KillSwitchActive`] before any matching, fee, @@ -2380,17 +2401,22 @@ where let match_result = OrderBook::::match_order_with_user(self, order_id, side, quantity, None, user_id)?; - // Trigger trade listener if there are transactions - if !match_result.trades().as_vec().is_empty() - && let Some(ref listener) = self.trade_listener - { - let mut trade_result = TradeResult::with_fees( - self.symbol.clone(), - match_result.clone(), - self.fee_schedule, - ); - trade_result.engine_seq = self.next_engine_seq(); - listener(&trade_result); + // Emit trade-count metric and trigger trade listener if any + // transactions printed. The metric is independent of whether + // a listener is configured; the listener emission still gates + // on `Some(ref listener)`. + let trades_emitted = match_result.trades().len() as u64; + if trades_emitted > 0 { + super::metrics::record_trades(trades_emitted); + if let Some(ref listener) = self.trade_listener { + let mut trade_result = TradeResult::with_fees( + self.symbol.clone(), + match_result.clone(), + self.fee_schedule, + ); + trade_result.engine_seq = self.next_engine_seq(); + listener(&trade_result); + } } Ok(match_result) @@ -2456,17 +2482,22 @@ where user_id, )?; - // Trigger trade listener if there are transactions - if !match_result.trades().as_vec().is_empty() - && let Some(ref listener) = self.trade_listener - { - let mut trade_result = TradeResult::with_fees( - self.symbol.clone(), - match_result.clone(), - self.fee_schedule, - ); - trade_result.engine_seq = self.next_engine_seq(); - listener(&trade_result); + // Emit trade-count metric and trigger trade listener if any + // transactions printed. The metric is independent of whether + // a listener is configured; the listener emission still gates + // on `Some(ref listener)`. + let trades_emitted = match_result.trades().len() as u64; + if trades_emitted > 0 { + super::metrics::record_trades(trades_emitted); + if let Some(ref listener) = self.trade_listener { + let mut trade_result = TradeResult::with_fees( + self.symbol.clone(), + match_result.clone(), + self.fee_schedule, + ); + trade_result.engine_seq = self.next_engine_seq(); + listener(&trade_result); + } } Ok(match_result) diff --git a/src/orderbook/mass_cancel.rs b/src/orderbook/mass_cancel.rs index 89df3c6..31b02b2 100644 --- a/src/orderbook/mass_cancel.rs +++ b/src/orderbook/mass_cancel.rs @@ -192,6 +192,8 @@ where self.special_order_tracker.clear(); self.cache.invalidate(); + // Refresh the depth gauges; both sides are now empty. + self.record_depth_metric(); MassCancelResult { cancelled_count, diff --git a/src/orderbook/matching.rs b/src/orderbook/matching.rs index afca1da..30679c6 100644 --- a/src/orderbook/matching.rs +++ b/src/orderbook/matching.rs @@ -81,6 +81,9 @@ where // Early exit if the opposite side is empty if match_side.is_empty() { if limit_price.is_none() { + crate::orderbook::metrics::record_reject( + crate::orderbook::reject_reason::RejectReason::InsufficientLiquidity, + ); return Err(OrderBookError::InsufficientLiquidity { side, requested: quantity, @@ -286,9 +289,16 @@ where } // Batch remove empty price levels + let levels_removed = !empty_price_levels.is_empty(); for price in &empty_price_levels { match_side.remove(price); } + if levels_removed { + // Refresh the operational depth gauges now that levels may + // have been removed. No-op when the `metrics` feature is + // disabled. + self.record_depth_metric(); + } // Batch remove filled orders from tracking and update state for filled_id in &filled_orders { @@ -316,6 +326,9 @@ where reason: CancelReason::SelfTradePrevention, }, ); + crate::orderbook::metrics::record_reject( + crate::orderbook::reject_reason::RejectReason::SelfTradePrevention, + ); return Err(OrderBookError::SelfTradePrevented { mode: self.stp_mode, taker_order_id: order_id, @@ -325,6 +338,9 @@ where // Check for insufficient liquidity in market orders if limit_price.is_none() && remaining_quantity == quantity { + crate::orderbook::metrics::record_reject( + crate::orderbook::reject_reason::RejectReason::InsufficientLiquidity, + ); return Err(OrderBookError::InsufficientLiquidity { side, requested: quantity, diff --git a/src/orderbook/metrics.rs b/src/orderbook/metrics.rs new file mode 100644 index 0000000..af3ec62 --- /dev/null +++ b/src/orderbook/metrics.rs @@ -0,0 +1,116 @@ +//! Operational Prometheus-style metrics for the order book core. +//! +//! Issue #60 — feature-gated, additive observability hooks. When the +//! `metrics` feature is enabled the helpers in this module forward to +//! the `metrics` crate's global recorder; when the feature is off every +//! helper compiles down to a no-op so that call-sites in the matching +//! hot path stay unconditional and allocation-free. +//! +//! # Metrics surface +//! +//! - `orderbook_rejects_total{reason="…"}` — counter, incremented on +//! every rejection that flows through [`record_reject`]. The label +//! value is the [`RejectReason`] [`Display`] string (stable across +//! `0.7.x`). +//! - `orderbook_depth_levels_bid` / `orderbook_depth_levels_ask` — +//! gauges, updated on every book change to reflect the current count +//! of distinct price levels on each side. +//! - `orderbook_trades_total` — counter, incremented exactly once per +//! emitted trade transaction (a `MatchResult` may contain several). +//! +//! # Determinism +//! +//! Metrics emission is **out-of-band**: it does not influence matching, +//! does not allocate on the happy path, and does not cross the +//! determinism boundary. `restore_from_snapshot_package` deliberately +//! does **not** rehydrate metric counters — they are operational only +//! and live for the process lifetime. +//! +//! [`RejectReason`]: crate::orderbook::reject_reason::RejectReason +//! [`Display`]: std::fmt::Display + +use crate::orderbook::reject_reason::RejectReason; + +/// Counter name: total order rejections, labelled by reject reason. +pub const REJECTS_TOTAL: &str = "orderbook_rejects_total"; + +/// Gauge name: current count of distinct bid price levels. +pub const DEPTH_LEVELS_BID: &str = "orderbook_depth_levels_bid"; + +/// Gauge name: current count of distinct ask price levels. +pub const DEPTH_LEVELS_ASK: &str = "orderbook_depth_levels_ask"; + +/// Counter name: monotonic count of every emitted trade transaction. +pub const TRADES_TOTAL: &str = "orderbook_trades_total"; + +/// Record an order rejection. +/// +/// Increments `orderbook_rejects_total` by 1 with the +/// `reason=""` label. Compiles to a no-op when +/// the `metrics` feature is disabled. +#[inline] +#[cfg(feature = "metrics")] +pub fn record_reject(reason: RejectReason) { + let label = reason.to_string(); + metrics::counter!(REJECTS_TOTAL, "reason" => label).increment(1); +} + +/// No-op when the `metrics` feature is disabled. +#[inline] +#[cfg(not(feature = "metrics"))] +pub fn record_reject(_reason: RejectReason) {} + +/// Update the bid / ask depth gauges to the supplied counts. +/// +/// Called from book-change emission paths. Compiles to a no-op when +/// the `metrics` feature is disabled. +#[inline] +#[cfg(feature = "metrics")] +pub fn record_depth(bid_levels: u64, ask_levels: u64) { + // `gauge!` accepts an `f64`; the input is a level count that + // comfortably fits in `f64` precision for any realistic book. + metrics::gauge!(DEPTH_LEVELS_BID).set(bid_levels as f64); + metrics::gauge!(DEPTH_LEVELS_ASK).set(ask_levels as f64); +} + +/// No-op when the `metrics` feature is disabled. +#[inline] +#[cfg(not(feature = "metrics"))] +pub fn record_depth(_bid_levels: u64, _ask_levels: u64) {} + +/// Record `n` newly emitted trade transactions. +/// +/// Called once per `TradeListener` callback with the number of +/// transactions in the underlying `MatchResult`. Compiles to a no-op +/// when the `metrics` feature is disabled. +#[inline] +#[cfg(feature = "metrics")] +pub fn record_trades(n: u64) { + if n == 0 { + return; + } + metrics::counter!(TRADES_TOTAL).increment(n); +} + +/// No-op when the `metrics` feature is disabled. +#[inline] +#[cfg(not(feature = "metrics"))] +pub fn record_trades(_n: u64) {} + +#[cfg(test)] +mod tests { + use super::*; + + /// All four call-sites must compile and run without panicking + /// regardless of feature state. The actual counter behaviour is + /// covered by `tests/metrics/` (feature-gated). + #[test] + fn helpers_are_callable_unconditionally() { + record_reject(RejectReason::KillSwitchActive); + record_reject(RejectReason::Other(7777)); + record_depth(0, 0); + record_depth(3, 5); + record_trades(0); + record_trades(4); + } +} diff --git a/src/orderbook/mod.rs b/src/orderbook/mod.rs index 3df9146..d8e77bc 100644 --- a/src/orderbook/mod.rs +++ b/src/orderbook/mod.rs @@ -38,6 +38,9 @@ pub mod fees; /// Mass cancel operations for bulk order removal. pub mod mass_cancel; +/// Operational Prometheus-style metrics hooks (feature-gated). +pub mod metrics; + /// Order state machine for explicit lifecycle tracking. pub mod order_state; diff --git a/src/orderbook/modifications.rs b/src/orderbook/modifications.rs index 0f81d12..31a1a9e 100644 --- a/src/orderbook/modifications.rs +++ b/src/orderbook/modifications.rs @@ -250,6 +250,11 @@ where } self.cache.invalidate(); + if is_empty { + // Refresh depth gauges now that a level was + // removed during the modification path. + self.record_depth_metric(); + } Ok(result) } else { Ok(None) // Order not found @@ -575,6 +580,10 @@ where // If the level became empty, remove it if empty_level { price_levels.remove(&price); + // Refresh the depth gauges now that a level was + // removed. No-op when the `metrics` feature is + // disabled. + self.record_depth_metric(); } } @@ -660,12 +669,24 @@ where .. } => { if visible_quantity.as_u64() % lot != 0 { + self.track_state( + order.id(), + OrderStatus::Rejected { + reason: RejectReason::InvalidQuantity, + }, + ); return Err(OrderBookError::InvalidLotSize { quantity: visible_quantity.as_u64(), lot_size: lot, }); } if hidden_quantity.as_u64() % lot != 0 { + self.track_state( + order.id(), + OrderStatus::Rejected { + reason: RejectReason::InvalidQuantity, + }, + ); return Err(OrderBookError::InvalidLotSize { quantity: hidden_quantity.as_u64(), lot_size: lot, @@ -674,6 +695,12 @@ where } _ => { if order.total_quantity() % lot != 0 { + self.track_state( + order.id(), + OrderStatus::Rejected { + reason: RejectReason::InvalidQuantity, + }, + ); return Err(OrderBookError::InvalidLotSize { quantity: order.total_quantity(), lot_size: lot, @@ -688,6 +715,12 @@ where if let Some(min) = self.min_order_size && qty < min { + self.track_state( + order.id(), + OrderStatus::Rejected { + reason: RejectReason::OrderSizeOutOfRange, + }, + ); return Err(OrderBookError::OrderSizeOutOfRange { quantity: qty, min: Some(min), @@ -697,6 +730,12 @@ where if let Some(max) = self.max_order_size && qty > max { + self.track_state( + order.id(), + OrderStatus::Rejected { + reason: RejectReason::OrderSizeOutOfRange, + }, + ); return Err(OrderBookError::OrderSizeOutOfRange { quantity: qty, min: self.min_order_size, @@ -743,6 +782,7 @@ where reason: CancelReason::InsufficientLiquidity, }, ); + crate::orderbook::metrics::record_reject(RejectReason::InsufficientLiquidity); return Err(OrderBookError::InsufficientLiquidity { side: order.side(), requested: order.total_quantity(), @@ -761,16 +801,18 @@ where order.user_id(), )?; - if !match_result.trades().as_vec().is_empty() - && let Some(ref listener) = self.trade_listener - { - let mut trade_result = TradeResult::with_fees( - self.symbol.clone(), - match_result.clone(), - self.fee_schedule, - ); - trade_result.engine_seq = self.next_engine_seq(); - listener(&trade_result) // emit trade events to listener + let trades_emitted = match_result.trades().len() as u64; + if trades_emitted > 0 { + crate::orderbook::metrics::record_trades(trades_emitted); + if let Some(ref listener) = self.trade_listener { + let mut trade_result = TradeResult::with_fees( + self.symbol.clone(), + match_result.clone(), + self.fee_schedule, + ); + trade_result.engine_seq = self.next_engine_seq(); + listener(&trade_result) // emit trade events to listener + } } // Track the incoming order's state based on matching result @@ -790,6 +832,7 @@ where reason: CancelReason::InsufficientLiquidity, }, ); + crate::orderbook::metrics::record_reject(RejectReason::InsufficientLiquidity); return Err(OrderBookError::InsufficientLiquidity { side: order.side(), requested: order.quantity(), // Now uses the trait method @@ -832,6 +875,12 @@ where self.order_locations .insert(unit_order_arc.id(), (price, side)); + // Refresh the depth gauges. The level may be brand-new + // (`get_or_insert` created it) or pre-existing — either + // way the gauge reflects current state. No-op when the + // `metrics` feature is disabled. + self.record_depth_metric(); + // Pre-trade risk hook: register the resting order with // the risk state so per-account counters are updated and // future checks see the new contribution. No-op when no diff --git a/src/orderbook/private.rs b/src/orderbook/private.rs index d13a186..cbc63f5 100644 --- a/src/orderbook/private.rs +++ b/src/orderbook/private.rs @@ -70,6 +70,10 @@ where // Track the order in the user_orders index for efficient user-based cancellation self.track_user_order(order.user_id(), order_id); + // Refresh the operational depth gauges. No-op when the + // `metrics` feature is disabled. + self.record_depth_metric(); + Ok(order) } @@ -124,15 +128,25 @@ where } } - /// Record an order state transition if a tracker is configured. + /// Record an order state transition if a tracker is configured, + /// and emit operational metrics when the transition is a + /// rejection. /// - /// This is a no-op when `order_state_tracker` is `None`. + /// Tracker recording is a no-op when `order_state_tracker` is + /// `None`. Metrics emission is unconditional but compiles to a + /// no-op when the `metrics` feature is disabled — see + /// [`crate::orderbook::metrics`]. Hooking the metric here keeps + /// every reject path in the engine on the same single emission + /// point. #[inline] pub(super) fn track_state( &self, order_id: pricelevel::Id, status: super::order_state::OrderStatus, ) { + if let super::order_state::OrderStatus::Rejected { reason } = &status { + super::metrics::record_reject(*reason); + } if let Some(ref tracker) = self.order_state_tracker { tracker.transition(order_id, status); } diff --git a/tests/metrics/metrics_tests.rs b/tests/metrics/metrics_tests.rs new file mode 100644 index 0000000..4938172 --- /dev/null +++ b/tests/metrics/metrics_tests.rs @@ -0,0 +1,253 @@ +//! Integration tests for the optional Prometheus metrics feature +//! (issue #60). +//! +//! Lives in a dedicated test binary so the global `metrics` recorder +//! is not perturbed by the broader integration suite under +//! `tests/unit/` (which constructs `OrderBook`s and triggers the +//! depth gauge updates as a side effect of every add / cancel). + +use metrics::{Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SharedString, Unit}; +use orderbook_rs::orderbook::metrics::{ + DEPTH_LEVELS_ASK, DEPTH_LEVELS_BID, REJECTS_TOTAL, TRADES_TOTAL, +}; +use orderbook_rs::{OrderBook, StubClock}; +use pricelevel::{Id, Side, TimeInForce}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex, OnceLock}; + +/// Captured counter / gauge state, keyed by metric name with a +/// `reason=…` suffix when the labels include a reason. +#[derive(Default)] +struct Captured { + counters: HashMap, + gauges: HashMap, +} + +/// Process-wide capture storage. The `metrics` crate only allows the +/// global recorder to be installed once per process — every test in +/// this file shares the same recorder and reads from this storage. +fn captured() -> &'static Mutex { + static CAPTURED: OnceLock> = OnceLock::new(); + CAPTURED.get_or_init(|| Mutex::new(Captured::default())) +} + +/// Build a "metric_name{label_value}" key, or just the metric name +/// when there are no labels — matches the format used in assertions. +fn label_key(key: &Key) -> String { + let labels: Vec = key + .labels() + .map(|l| format!("{}={}", l.key(), l.value())) + .collect(); + if labels.is_empty() { + key.name().to_string() + } else { + format!("{}{{{}}}", key.name(), labels.join(",")) + } +} + +struct CapturingCounter { + key: String, +} + +impl metrics::CounterFn for CapturingCounter { + fn increment(&self, value: u64) { + let mut g = captured().lock().expect("captured lock"); + *g.counters.entry(self.key.clone()).or_insert(0) += value; + } + fn absolute(&self, value: u64) { + let mut g = captured().lock().expect("captured lock"); + g.counters.insert(self.key.clone(), value); + } +} + +struct CapturingGauge { + key: String, +} + +impl metrics::GaugeFn for CapturingGauge { + fn increment(&self, value: f64) { + let mut g = captured().lock().expect("captured lock"); + *g.gauges.entry(self.key.clone()).or_insert(0.0) += value; + } + fn decrement(&self, value: f64) { + let mut g = captured().lock().expect("captured lock"); + *g.gauges.entry(self.key.clone()).or_insert(0.0) -= value; + } + fn set(&self, value: f64) { + let mut g = captured().lock().expect("captured lock"); + g.gauges.insert(self.key.clone(), value); + } +} + +struct CapturingHistogram; + +impl metrics::HistogramFn for CapturingHistogram { + fn record(&self, _value: f64) {} +} + +struct CapturingRecorder; + +impl Recorder for CapturingRecorder { + fn describe_counter(&self, _: KeyName, _: Option, _: SharedString) {} + fn describe_gauge(&self, _: KeyName, _: Option, _: SharedString) {} + fn describe_histogram(&self, _: KeyName, _: Option, _: SharedString) {} + fn register_counter(&self, key: &Key, _: &Metadata<'_>) -> Counter { + Counter::from_arc(std::sync::Arc::new(CapturingCounter { + key: label_key(key), + })) + } + fn register_gauge(&self, key: &Key, _: &Metadata<'_>) -> Gauge { + Gauge::from_arc(std::sync::Arc::new(CapturingGauge { + key: label_key(key), + })) + } + fn register_histogram(&self, _: &Key, _: &Metadata<'_>) -> Histogram { + Histogram::from_arc(std::sync::Arc::new(CapturingHistogram)) + } +} + +/// Install the global capturing recorder once. Calling this from every +/// test is idempotent — the second installation attempt is a no-op. +fn install_recorder() { + static INSTALLED: OnceLock<()> = OnceLock::new(); + INSTALLED.get_or_init(|| { + // `set_global_recorder` only succeeds once per process. + let _ = metrics::set_global_recorder(CapturingRecorder); + }); +} + +fn counter_value(key: &str) -> u64 { + let g = captured().lock().expect("captured lock"); + g.counters.get(key).copied().unwrap_or(0) +} + +fn gauge_value(key: &str) -> f64 { + let g = captured().lock().expect("captured lock"); + g.gauges.get(key).copied().unwrap_or(0.0) +} + +/// All tests in this module share the global `metrics` recorder and +/// the captured-state map. Take this lock at the top of every test +/// to serialize them — concurrent tests would otherwise step on each +/// other's gauge values. +fn serialized_test_lock() -> &'static Mutex<()> { + static LOCK: OnceLock> = OnceLock::new(); + LOCK.get_or_init(|| Mutex::new(())) +} + +#[test] +fn counters_increment_on_rejects_and_trades() { + let _guard = serialized_test_lock().lock().expect("serialized lock"); + install_recorder(); + let book = OrderBook::<()>::new("METRICS-TEST"); + + // Snapshot baseline counter values — other tests in this module + // share the global recorder, so we reason about deltas. + let trades_before = counter_value(TRADES_TOTAL); + let kill_rejects_before = + counter_value(&format!("{REJECTS_TOTAL}{{reason=kill switch active}}")); + + // Reject path: engage the kill switch and submit one order. + book.engage_kill_switch(); + let rej = book.add_limit_order(Id::new_uuid(), 100, 1, Side::Buy, TimeInForce::Gtc, None); + assert!(rej.is_err(), "kill-switched add_order must Err"); + book.release_kill_switch(); + + let kill_rejects_after = + counter_value(&format!("{REJECTS_TOTAL}{{reason=kill switch active}}")); + assert_eq!( + kill_rejects_after - kill_rejects_before, + 1, + "kill-switch reject must increment orderbook_rejects_total{{reason=...}} by exactly 1" + ); + + // Happy path: cross two limit orders to print a trade. + book.add_limit_order(Id::new_uuid(), 100, 5, Side::Sell, TimeInForce::Gtc, None) + .expect("seed resting ask"); + book.add_limit_order(Id::new_uuid(), 100, 5, Side::Buy, TimeInForce::Gtc, None) + .expect("aggressive buy fills the ask"); + + let trades_after = counter_value(TRADES_TOTAL); + assert!( + trades_after > trades_before, + "orderbook_trades_total must increment after a fill (before={trades_before}, after={trades_after})" + ); +} + +#[test] +fn depth_gauges_track_distinct_price_levels() { + let _guard = serialized_test_lock().lock().expect("serialized lock"); + install_recorder(); + let book = OrderBook::<()>::new("METRICS-DEPTH"); + + // Place two distinct bid levels and one ask level. + book.add_limit_order(Id::new_uuid(), 100, 1, Side::Buy, TimeInForce::Gtc, None) + .expect("bid 1"); + book.add_limit_order(Id::new_uuid(), 99, 1, Side::Buy, TimeInForce::Gtc, None) + .expect("bid 2"); + let ask_id = Id::new_uuid(); + book.add_limit_order(ask_id, 110, 1, Side::Sell, TimeInForce::Gtc, None) + .expect("ask 1"); + + assert_eq!( + gauge_value(DEPTH_LEVELS_BID) as u64, + 2, + "orderbook_depth_levels_bid must reflect two distinct bid levels" + ); + assert_eq!( + gauge_value(DEPTH_LEVELS_ASK) as u64, + 1, + "orderbook_depth_levels_ask must reflect one ask level" + ); + + // Cancel the unique ask — the ask gauge should go to 0. + book.cancel_order(ask_id).expect("cancel ask"); + + assert_eq!( + gauge_value(DEPTH_LEVELS_ASK) as u64, + 0, + "ask gauge must drop to 0 after the only ask level is removed" + ); +} + +#[test] +fn metrics_do_not_affect_order_semantics() { + // Determinism guard — issue #60 explicitly requires that metric + // emission must NOT alter matching outcomes. Build two books with + // the same symbol and identical inputs and confirm they produce + // byte-identical snapshots after the same operation sequence. + let _guard = serialized_test_lock().lock().expect("serialized lock"); + install_recorder(); + // StubClock + identical symbols + identical order ids yields a + // byte-identical state machine. If metrics emission ever bled + // back into matching, the two snapshots would diverge. + let book_a = OrderBook::<()>::with_clock("DET", Arc::new(StubClock::new())); + let book_b = OrderBook::<()>::with_clock("DET", Arc::new(StubClock::new())); + + let scenarios: [(u128, u64, Side); 6] = [ + (100, 5, Side::Sell), + (101, 3, Side::Sell), + (99, 5, Side::Buy), + (100, 4, Side::Buy), + (102, 2, Side::Sell), + (101, 3, Side::Buy), + ]; + + for (i, (price, qty, side)) in scenarios.into_iter().enumerate() { + // Use a deterministic id derived from the index so the two + // books mint structurally identical resting orders. + let id = Id::from_u64(0xC0DE_0000 + i as u64); + let _ = book_a.add_limit_order(id, price, qty, side, TimeInForce::Gtc, None); + let _ = book_b.add_limit_order(id, price, qty, side, TimeInForce::Gtc, None); + } + + let snap_a = book_a.create_snapshot(10); + let snap_b = book_b.create_snapshot(10); + + let json_a = serde_json::to_string(&snap_a).expect("serialize snap_a"); + let json_b = serde_json::to_string(&snap_b).expect("serialize snap_b"); + assert_eq!( + json_a, json_b, + "metrics emission must not affect book state — snapshots differ" + ); +} diff --git a/tests/metrics/mod.rs b/tests/metrics/mod.rs new file mode 100644 index 0000000..02d4eaa --- /dev/null +++ b/tests/metrics/mod.rs @@ -0,0 +1,7 @@ +//! Standalone integration-test binary for the optional `metrics` +//! feature (issue #60). Lives in its own crate test entry point so +//! the global `metrics` recorder isn't perturbed by the broader +//! integration suite under `tests/unit/`. + +#[cfg(feature = "metrics")] +mod metrics_tests; diff --git a/tests/unit/replay_determinism.rs b/tests/unit/replay_determinism.rs index 897f728..6e53ae0 100644 --- a/tests/unit/replay_determinism.rs +++ b/tests/unit/replay_determinism.rs @@ -4,7 +4,7 @@ //! produces byte-identical execution results and snapshot consistency. #[cfg(feature = "journal")] -mod replay_determinism { +mod inner { use orderbook_rs::orderbook::sequencer::{ InMemoryJournal, Journal, ReplayEngine, SequencerCommand, SequencerEvent, SequencerResult, snapshots_match, @@ -68,7 +68,7 @@ mod replay_determinism { ); } - /// Proptest: random sequence of adds deterministically replays. + // Proptest: random sequence of adds deterministically replays. proptest! { #[test] fn prop_replay_deterministic_across_runs( @@ -77,8 +77,7 @@ mod replay_determinism { let journal: InMemoryJournal<()> = InMemoryJournal::new(); // Build deterministic journal from add_count. - let mut seq = 0u64; - for i in 0..add_count { + for (seq, i) in (0..add_count).enumerate() { let id = Id::new_uuid(); let price = 100 + (i as u128 * 10); let order = make_standard_order( @@ -88,13 +87,12 @@ mod replay_determinism { if i % 2 == 0 { Side::Buy } else { Side::Sell }, ); let evt = SequencerEvent { - sequence_num: seq, - timestamp_ns: seq, + sequence_num: seq as u64, + timestamp_ns: seq as u64, command: SequencerCommand::AddOrder(order), result: SequencerResult::OrderAdded { order_id: id }, }; assert!(journal.append(&evt).is_ok()); - seq += 1; } // Replay multiple times.