diff --git a/CHANGELOG.md b/CHANGELOG.md index afd17c8..032e78b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -83,6 +83,71 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 installing `metrics-exporter-prometheus` and dumping the exposition payload. +### Added — feature-gated binary wire protocol (#59) + +- **New `wire` feature flag** in `Cargo.toml` plus an optional + dependency on `zerocopy = "0.8"` (with `derive`). Disabled by + default; the crate's existing JSON and bincode paths are + unchanged — the wire protocol is purely additive. +- **Length-prefixed framing** — every frame on the wire is + `[len:u32 LE | kind:u8 | payload]`. `len` covers `kind + payload` + (it does NOT include the 4-byte `len` prefix itself). All + multi-byte integers are little-endian. Implementation in + `src/wire/framing.rs` with `encode_frame` / `decode_frame`. +- **`MessageKind` enum** (`#[repr(u8)]`, `#[non_exhaustive]`) with + stable explicit discriminants documented as stable across + `0.7.x`: + + | Code | Direction | Message | Payload size | + |--------|-----------|-----------------|-------------:| + | `0x01` | inbound | `NewOrder` | 48 B | + | `0x02` | inbound | `CancelOrder` | 24 B | + | `0x03` | inbound | `CancelReplace` | 40 B | + | `0x04` | inbound | `MassCancel` | 24 B | + | `0x81` | outbound | `ExecReport` | 44 B | + | `0x82` | outbound | `TradePrint` | 48 B | + | `0x83` | outbound | `BookUpdate` | 32 B | + +- **Inbound messages** are `#[repr(C, packed)]` and derive the + `zerocopy` traits (`FromBytes`, `IntoBytes`, `Unaligned`, + `Immutable`, `KnownLayout`). Decoding is safe — the crate keeps + `#![deny(unsafe_code)]` on the lib root. Each struct ships a + compile-time `const _: () = assert!(size_of::<…>() == N)` size + guard. Exposed: `NewOrderWire`, `CancelOrderWire`, + `CancelReplaceWire`, `MassCancelWire` and the matching + `decode_*` helpers. +- **Outbound messages** use explicit byte-cursor encoders + (`Vec::extend_from_slice`) rather than packed structs. + Outbound is I/O-dominated so the cost of a few dozen bytes of + field-by-field copy is dwarfed by socket overhead, and the + layout is free to evolve. Exposed: `ExecReport` + + `encode_exec_report` + `status_to_wire`, + `TradePrintWire` + `encode_trade_print`, + `BookUpdateWire` + `encode_book_update`. +- **Wire ↔ domain mapping** at the boundary — + `impl TryFrom<&NewOrderWire> for OrderType<()>` performs the + conversion, copies each packed field into a local first + (taking a reference to a packed field is undefined behaviour), + and returns `WireError::InvalidPayload` on unknown + side / TIF / order_type bytes or a negative price. +- **Errors** routed through a manual-`Display` + `#[non_exhaustive] WireError` (no `thiserror`, matches the + crate's existing manual style for the wire surface): variants + `Truncated`, `UnknownKind(u8)`, `InvalidPayload(&'static str)`. +- **`doc/wire-protocol.md`** with per-message offset / size / + field / type / notes tables, the `MessageKind` discriminant + table, the framing rule, and the LE-endianness statement. +- **Round-trip `proptest` tests** in every + `src/wire/{inbound,outbound}/*.rs` module — encode through the + framer, decode back, assert byte-for-byte equality. +- **Crate-root re-exports** under `#[cfg(feature = "wire")]` — + callers reach types via `orderbook_rs::wire::*`. +- **Example** `examples/src/bin/wire_roundtrip.rs` (gated by + `required-features = ["wire"]`) — builds a `NewOrderWire`, + encodes it through the framer, decodes it back, converts to a + domain `OrderType<()>`, and prints every field via + `tracing::info!`. + ### Added — HDR-histogram tail-latency bench suite (#56) - **Six new bench binaries** under `benches/order_book/*_hdr.rs` that diff --git a/Cargo.toml b/Cargo.toml index 68fbc3c..48edd6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ bincode = { workspace = true, optional = true } crc32fast = { workspace = true, optional = true } memmap2 = { workspace = true, optional = true } metrics = { workspace = true, optional = true } +zerocopy = { version = "0.8", features = ["derive"], optional = true } [features] default = [] @@ -58,6 +59,7 @@ bincode = ["dep:bincode"] journal = ["dep:crc32fast", "dep:memmap2"] alloc-counters = [] metrics = ["dep:metrics"] +wire = ["dep:zerocopy"] [dev-dependencies] criterion = { version = "0.8", features = ["html_reports"] } diff --git a/README.md b/README.md index 8e99024..cf6efe2 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,41 @@ This order book engine is built with the following design principles: regression detection. - **`BENCH.md`** gains an "Allocation profile" section. +#### v0.7.0 — Feature-gated binary wire protocol + +- **New `wire` feature flag** behind which a small, + length-prefixed binary protocol lives — every frame is + `[len:u32 LE | kind:u8 | payload]`, `len` covers + `kind + payload`, and all multi-byte integers are + little-endian. Disabled by default; the existing JSON and + bincode paths are unchanged. The protocol is additive. +- **`MessageKind`** — `#[repr(u8)]` enum with stable explicit + discriminants. Inbound: `NewOrder = 0x01`, + `CancelOrder = 0x02`, `CancelReplace = 0x03`, + `MassCancel = 0x04`. Outbound: `ExecReport = 0x81`, + `TradePrint = 0x82`, `BookUpdate = 0x83`. +- **Zero-copy inbound** — `NewOrderWire`, `CancelOrderWire`, + `CancelReplaceWire`, `MassCancelWire` are + `#[repr(C, packed)]` with `zerocopy::{FromBytes, IntoBytes, + Unaligned, Immutable, KnownLayout}` derives. Each ships a + `const _: () = assert!(size_of::<…>() == N)` guard. Decoding + is safe — `#![deny(unsafe_code)]` stays on. +- **Byte-cursor outbound** — `ExecReport`, `TradePrintWire`, + `BookUpdateWire` are encoded via explicit + `extend_from_slice` calls. Outbound is I/O-dominated; this + keeps the layout free to evolve. +- **`TryFrom<&NewOrderWire> for OrderType<()>`** — boundary + mapping that copies each packed field into a stack local + first (taking a reference to a packed field is UB), validates + the side / TIF / order_type discriminants, and rejects + negative prices via `WireError::InvalidPayload`. +- **`doc/wire-protocol.md`** with per-message layout tables, + discriminant table, framing rule, and endianness statement. +- **Round-trip `proptest` coverage** in every + `src/wire/{inbound,outbound}/*.rs` module. +- Example: `examples/src/bin/wire_roundtrip.rs` + (`required-features = ["wire"]`). + #### v0.7.0 — HDR-histogram tail-latency bench suite - **Six new `*_hdr` bench binaries** under diff --git a/doc/wire-protocol.md b/doc/wire-protocol.md new file mode 100644 index 0000000..80865db --- /dev/null +++ b/doc/wire-protocol.md @@ -0,0 +1,199 @@ +# Binary Wire Protocol (feature `wire`) + +> Status: MVP / additive. JSON and bincode paths are unchanged. Enable +> with `--features wire`. + +The binary wire protocol is a small, fixed-layout, little-endian framing +used by gateways to talk to the engine without going through +`serde_json`. It is intentionally lean — the MVP covers four inbound +order-entry messages and three outbound execution / market-data +messages. A full TCP gateway is out of scope. + +## Framing + +Every frame on the wire has the layout: + +``` ++-------------------+--------+--------------------------+ +| len (u32 LE) | kind | payload | +| 4 B | 1 B | len - 1 B | ++-------------------+--------+--------------------------+ +``` + +- `len` is the byte length of `kind + payload`. **It does NOT include + the 4-byte `len` prefix itself.** The minimum legal `len` is `1` + (kind byte present, zero-byte payload). +- All multi-byte integers on the wire are **little-endian**. +- Frames have no separator and no trailer — the next frame begins + immediately after the previous one. Decoders should advance their + read cursor by the `bytes_consumed` value returned from + [`decode_frame`](../src/wire/framing.rs). + +## `MessageKind` discriminants + +Wire codes are stable across `0.7.x` patch releases. Inbound messages +occupy the low half of the byte (`0x01..=0x7F`); outbound messages +occupy the high half (`0x80..=0xFF`). Code `0x00` is reserved as a +"no-message" sentinel. + +| Code | Direction | Message | Fixed payload size | +|--------|-----------|-----------------|-------------------:| +| `0x01` | inbound | `NewOrder` | 48 B | +| `0x02` | inbound | `CancelOrder` | 24 B | +| `0x03` | inbound | `CancelReplace` | 40 B | +| `0x04` | inbound | `MassCancel` | 24 B | +| `0x81` | outbound | `ExecReport` | 44 B | +| `0x82` | outbound | `TradePrint` | 48 B | +| `0x83` | outbound | `BookUpdate` | 32 B | + +## Inbound layouts + +Inbound messages are `#[repr(C, packed)]` and derive +`zerocopy::{FromBytes, IntoBytes, Unaligned, Immutable, KnownLayout}`, +so the gateway can validate-and-cast `&[u8]` into a typed reference +without copying. Decoding is safe — `zerocopy` performs the layout +validation, no `unsafe` is required at any wire call site. + +### `NewOrder` (`0x01`) — 48 B + +| Offset | Size | Field | Type | Notes | +|-------:|-----:|-----------------|------|--------------------------------------| +| 0 | 8 | `client_ts` | u64 | client-side timestamp (ms) | +| 8 | 8 | `order_id` | u64 | unique order id | +| 16 | 8 | `account_id` | u64 | numeric account id | +| 24 | 8 | `price` | i64 | tick-scaled limit price | +| 32 | 8 | `qty` | u64 | quantity | +| 40 | 1 | `side` | u8 | `0` Buy, `1` Sell | +| 41 | 1 | `time_in_force` | u8 | `0` GTC, `1` IOC, `2` FOK, `3` DAY | +| 42 | 1 | `order_type` | u8 | `0` Standard (only one in MVP) | +| 43 | 5 | `_pad` | u8×5 | reserved, must be zero | +| **48** | | **total** | | | + +`TryFrom<&NewOrderWire> for OrderType<()>` performs the wire → domain +conversion. `account_id` is encoded into the low 8 bytes of the +domain `Hash32` `user_id` so the field round-trips across the +boundary; gateways performing STP must use a non-zero `account_id`. + +### `CancelOrder` (`0x02`) — 24 B + +| Offset | Size | Field | Type | Notes | +|-------:|-----:|--------------|------|----------------------------| +| 0 | 8 | `client_ts` | u64 | client-side timestamp (ms) | +| 8 | 8 | `order_id` | u64 | order id to cancel | +| 16 | 8 | `account_id` | u64 | numeric account id | +| **24** | | **total** | | | + +### `CancelReplace` (`0x03`) — 40 B + +| Offset | Size | Field | Type | Notes | +|-------:|-----:|--------------|------|-----------------------------| +| 0 | 8 | `client_ts` | u64 | client-side timestamp (ms) | +| 8 | 8 | `order_id` | u64 | original order id | +| 16 | 8 | `account_id` | u64 | numeric account id | +| 24 | 8 | `new_price` | i64 | replacement limit price | +| 32 | 8 | `new_qty` | u64 | replacement quantity | +| **40** | | **total** | | | + +### `MassCancel` (`0x04`) — 24 B + +| Offset | Size | Field | Type | Notes | +|-------:|-----:|--------------|------|--------------------------------------| +| 0 | 8 | `client_ts` | u64 | client-side timestamp (ms) | +| 8 | 8 | `account_id` | u64 | numeric account id | +| 16 | 1 | `scope` | u8 | `0` All, `1` ByAccount, `2` BySide | +| 17 | 7 | `_pad` | u8×7 | for `BySide`, `_pad[0] & 1` = side | +| **24** | | **total** | | | + +For `scope == BySide`, the low bit of `_pad[0]` encodes the side +(`0` = Buy, `1` = Sell). Other padding bits must be zero. + +## Outbound layouts + +Outbound messages use byte-cursor encoders rather than packed structs. +Outbound is I/O-dominated, so the cost of a few dozen bytes of explicit +field-by-field copying into a `Vec` is dwarfed by socket overhead, +and the layout stays free to evolve without exposing a packed type to +callers. + +### `ExecReport` (`0x81`) — 44 B + +| Offset | Size | Field | Type | Notes | +|-------:|-----:|------------------|------|----------------------------------| +| 0 | 8 | `engine_seq` | u64 | global engine sequence | +| 8 | 8 | `order_id` | u64 | order id | +| 16 | 1 | `status` | u8 | see `STATUS_*` constants below | +| 17 | 8 | `filled_qty` | u64 | cumulative filled quantity | +| 25 | 8 | `remaining_qty` | u64 | quantity still resting | +| 33 | 8 | `price` | i64 | tick-scaled price | +| 41 | 2 | `reject_reason` | u16 | reject code, `0` if not rejected | +| 43 | 1 | `_pad` | u8 | reserved, must be zero | +| **44** | | **total** | | | + +`status` discriminants (mirror of `OrderStatus`): + +| Code | `OrderStatus` | +|-----:|---------------------| +| 0 | `Open` | +| 1 | `PartiallyFilled` | +| 2 | `Filled` | +| 3 | `Cancelled` | +| 4 | `Rejected` | + +The `reject_reason` field carries the `RejectReason` numeric code +(stable across `0.7.x`); see `src/orderbook/reject_reason.rs`. + +### `TradePrint` (`0x82`) — 48 B + +| Offset | Size | Field | Type | Notes | +|-------:|-----:|---------------|------|------------------------------| +| 0 | 8 | `engine_seq` | u64 | global engine sequence | +| 8 | 8 | `maker_id` | u64 | maker order id (resting) | +| 16 | 8 | `taker_id` | u64 | taker order id (incoming) | +| 24 | 8 | `price` | i64 | tick-scaled fill price | +| 32 | 8 | `qty` | u64 | matched quantity | +| 40 | 8 | `ts` | u64 | engine timestamp (ms) | +| **48** | | **total** | | | + +### `BookUpdate` (`0x83`) — 32 B + +| Offset | Size | Field | Type | Notes | +|-------:|-----:|--------------|------|------------------------------------| +| 0 | 8 | `engine_seq` | u64 | global engine sequence | +| 8 | 1 | `side` | u8 | `0` Buy, `1` Sell | +| 9 | 8 | `price` | i64 | tick-scaled level price | +| 17 | 8 | `qty` | u64 | new total quantity at level (`0` = wiped) | +| 25 | 7 | `_pad` | u8×7 | reserved, must be zero | +| **32** | | **total** | | (rounded to 32 B; trailing pad) | + +The trailing 7-byte pad rounds the message to a comfortable 32 B block +and leaves room for forward-compatible field additions without bumping +the wire code. + +## Endianness + +All multi-byte integers are little-endian. The packed inbound structs +use native-endian primitives, so their memory layout matches the +on-wire byte order only on little-endian targets — accordingly, +`feature = "wire"` is currently restricted to little-endian platforms +via a `compile_error!` in `src/wire/inbound/mod.rs`. Big-endian +support would require switching the packed inbound fields to +endian-aware types (e.g. `zerocopy::little_endian::*`) and is not +implemented in `0.7.x`. + +## Round-trip tests + +Every inbound and outbound message has a `proptest` round-trip test +that builds a representative shape, encodes through the framer, and +decodes back. See: + +- `src/wire/inbound/new_order.rs` +- `src/wire/inbound/cancel.rs` +- `src/wire/inbound/cancel_replace.rs` +- `src/wire/inbound/mass_cancel.rs` +- `src/wire/outbound/exec_report.rs` +- `src/wire/outbound/trade_print.rs` +- `src/wire/outbound/book_update.rs` + +A runnable end-to-end demo lives in +`examples/src/bin/wire_roundtrip.rs` (gated on +`required-features = ["wire"]`). diff --git a/examples/Cargo.toml b/examples/Cargo.toml index ead7c1a..8809364 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" default = [] special_orders = ["orderbook-rs/special_orders"] metrics = ["orderbook-rs/metrics", "dep:metrics", "dep:metrics-exporter-prometheus"] +wire = ["orderbook-rs/wire"] [dependencies] orderbook-rs = { workspace = true } @@ -26,3 +27,7 @@ required-features = ["special_orders"] [[bin]] name = "prometheus_export" required-features = ["metrics"] + +[[bin]] +name = "wire_roundtrip" +required-features = ["wire"] diff --git a/examples/src/bin/wire_roundtrip.rs b/examples/src/bin/wire_roundtrip.rs new file mode 100644 index 0000000..e9f8835 --- /dev/null +++ b/examples/src/bin/wire_roundtrip.rs @@ -0,0 +1,99 @@ +// examples/src/bin/wire_roundtrip.rs +// +// Demonstrates encoding and decoding of the binary wire protocol (issue #59). +// +// 1. Build a `NewOrderWire` with realistic field values. +// 2. Encode it inside a length-prefixed frame via `encode_frame`. +// 3. Decode the frame, validate the kind byte, and decode the payload. +// 4. Convert the decoded `NewOrderWire` into a domain `OrderType<()>` and +// print every field via `tracing::info!`. + +use orderbook_rs::wire::{ + MessageKind, NewOrderWire, decode_frame, decode_new_order, encode_frame, + inbound::new_order::{ORDER_TYPE_STANDARD, SIDE_BUY, TIF_GTC}, +}; +use pricelevel::{OrderType, setup_logger}; +use tracing::info; + +fn main() { + let _ = setup_logger(); + info!("Wire roundtrip example"); + + let original = NewOrderWire { + client_ts: 1_716_000_000_000, + order_id: 4242, + account_id: 7, + price: 100_500, + qty: 25, + side: SIDE_BUY, + time_in_force: TIF_GTC, + order_type: ORDER_TYPE_STANDARD, + _pad: [0u8; 5], + }; + + // 1. Encode the message into a length-prefixed frame. + let mut frame = Vec::new(); + encode_payload(&original, &mut frame); + + info!( + bytes = frame.len(), + "encoded NewOrder frame ({} bytes)", + frame.len() + ); + + // 2. Decode the frame back. + let (kind_byte, payload, consumed) = decode_frame(&frame).expect("decode frame"); + let kind = MessageKind::from_u8(kind_byte).expect("known kind"); + info!( + kind = ?kind, + kind_byte = format!("0x{kind_byte:02x}"), + consumed, + "decoded frame header" + ); + assert_eq!(kind, MessageKind::NewOrder); + assert_eq!(consumed, frame.len()); + + let decoded = decode_new_order(payload).expect("decode NewOrder payload"); + + // 3. Mirror packed fields into stack locals (taking a reference to a + // packed field is undefined behaviour). + let ts = { decoded.client_ts }; + let oid = { decoded.order_id }; + let acct = { decoded.account_id }; + let px = { decoded.price }; + let qty = { decoded.qty }; + info!( + ts, + oid, acct, px, qty, "decoded NewOrderWire fields (round-trip OK)" + ); + + // 4. Convert wire → domain. + let domain: OrderType<()> = (&decoded).try_into().expect("convert to OrderType"); + match domain { + OrderType::Standard { + price, + quantity, + side, + time_in_force, + .. + } => { + info!( + price = %price, + quantity = %quantity, + side = %side, + time_in_force = %time_in_force, + "domain OrderType::Standard built from wire" + ); + } + _ => { + tracing::error!("expected Standard variant from MVP wire decoder"); + } + } + + info!("Wire roundtrip example complete"); +} + +fn encode_payload(order: &NewOrderWire, out: &mut Vec) { + encode_frame(MessageKind::NewOrder.as_u8(), order.as_payload_bytes(), out) + .expect("encode_frame should not fail on Vec"); +} diff --git a/src/lib.rs b/src/lib.rs index c7e8a1d..98c54f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -84,6 +84,42 @@ //! demonstrates installing the `metrics-exporter-prometheus` //! recorder and dumping the exposition payload. //! +//! ### v0.7.0 — Feature-gated binary wire protocol +//! +//! - **New `wire` feature flag** behind which a small, +//! length-prefixed binary protocol lives — every frame is +//! `[len:u32 LE | kind:u8 | payload]`, `len` covers +//! `kind + payload`, and all multi-byte integers are +//! little-endian. Disabled by default; the existing JSON and +//! bincode paths are unchanged. The protocol is additive. +//! - **`MessageKind`** — `#[repr(u8)]` enum with stable explicit +//! discriminants. Inbound: `NewOrder = 0x01`, +//! `CancelOrder = 0x02`, `CancelReplace = 0x03`, +//! `MassCancel = 0x04`. Outbound: `ExecReport = 0x81`, +//! `TradePrint = 0x82`, `BookUpdate = 0x83`. +//! - **Zero-copy inbound** — `NewOrderWire`, `CancelOrderWire`, +//! `CancelReplaceWire`, `MassCancelWire` are +//! `#[repr(C, packed)]` with `zerocopy::{FromBytes, IntoBytes, +//! Unaligned, Immutable, KnownLayout}` derives. Each ships a +//! `const _: () = assert!(size_of::<…>() == N)` guard. Decoding +//! is safe — `zerocopy` performs the layout validation, no +//! `unsafe` is required at any wire call site. +//! - **Byte-cursor outbound** — `ExecReport`, `TradePrintWire`, +//! `BookUpdateWire` are encoded via explicit +//! `extend_from_slice` calls. Outbound is I/O-dominated; this +//! keeps the layout free to evolve. +//! - **`TryFrom<&NewOrderWire> for OrderType<()>`** — boundary +//! mapping that copies each packed field into a stack local +//! first (taking a reference to a packed field is UB), validates +//! the side / TIF / order_type discriminants, and rejects +//! negative prices via `WireError::InvalidPayload`. +//! - **`doc/wire-protocol.md`** with per-message layout tables, +//! discriminant table, framing rule, and endianness statement. +//! - **Round-trip `proptest` coverage** in every +//! `src/wire/{inbound,outbound}/*.rs` module. +//! - Example: `examples/src/bin/wire_roundtrip.rs` +//! (`required-features = ["wire"]`). +//! //! ### v0.7.0 — HDR-histogram tail-latency bench suite //! //! - **Six new `*_hdr` bench binaries** under @@ -447,6 +483,19 @@ pub mod orderbook; pub mod prelude; pub mod utils; +/// Feature-gated binary wire protocol. +/// +/// Length-prefixed `[len:u32 LE | kind:u8 | payload]` framing with +/// fixed-size, little-endian payloads. Inbound messages are +/// `#[repr(C, packed)]` and decode via `zerocopy`; outbound messages +/// use explicit byte-cursor encoders. See `doc/wire-protocol.md` for +/// the canonical layout tables. +/// +/// Enabled with `--features wire`. The protocol is additive — JSON and +/// bincode paths are unchanged. +#[cfg(feature = "wire")] +pub mod wire; + #[cfg(feature = "bincode")] pub use orderbook::BincodeEventSerializer; #[cfg(feature = "journal")] diff --git a/src/wire/error.rs b/src/wire/error.rs new file mode 100644 index 0000000..36ffb14 --- /dev/null +++ b/src/wire/error.rs @@ -0,0 +1,42 @@ +//! Errors raised by the binary wire protocol codec. +//! +//! Manual `Display` implementation to avoid pulling in `thiserror` for the +//! `wire` feature surface — keeps the dependency footprint minimal. + +/// Errors that can be raised when framing, decoding, or validating a binary +/// wire message. +/// +/// `WireError` is `#[non_exhaustive]` — additional variants may be added in +/// future minor releases without a breaking change. +#[derive(Debug, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub enum WireError { + /// The buffer is shorter than the framing or payload length declares. + Truncated, + /// The frame's `kind` byte does not map to a known [`MessageKind`]. + /// + /// The raw byte is preserved for telemetry and rejection reporting. + /// + /// [`MessageKind`]: super::MessageKind + UnknownKind(u8), + /// The payload's length does not match the fixed size required by the + /// declared `MessageKind`, or a packed field carries an invalid + /// discriminant. + /// + /// The static string is a stable, tracing-friendly description of the + /// failure site (e.g. `"NewOrder: payload size mismatch"`). + InvalidPayload(&'static str), +} + +impl std::fmt::Display for WireError { + #[cold] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + WireError::Truncated => f.write_str("wire frame truncated"), + WireError::UnknownKind(byte) => write!(f, "wire unknown kind: 0x{byte:02x}"), + WireError::InvalidPayload(reason) => write!(f, "wire invalid payload: {reason}"), + } + } +} + +impl std::error::Error for WireError {} diff --git a/src/wire/framing.rs b/src/wire/framing.rs new file mode 100644 index 0000000..c886b65 --- /dev/null +++ b/src/wire/framing.rs @@ -0,0 +1,169 @@ +//! Length-prefixed binary frame codec. +//! +//! Every frame on the wire has the layout: +//! +//! ```text +//! [len: u32 LE][kind: u8][payload: …] +//! ``` +//! +//! `len` is the byte length of `kind + payload` — it does NOT include the +//! `len` field itself. All multi-byte integers on the wire are little-endian. +//! +//! Framing is symmetric for inbound and outbound traffic; the only thing that +//! differs is which `kind` discriminants are valid in each direction (see +//! [`super::MessageKind`]). + +use super::error::WireError; +use std::io::{self, Write}; + +/// Size in bytes of the length prefix. +const LEN_PREFIX: usize = 4; +/// Size in bytes of the kind byte. +const KIND_SIZE: usize = 1; +/// Minimum frame size: a `len` prefix plus a single `kind` byte (zero-byte +/// payload). +const MIN_FRAME_SIZE: usize = LEN_PREFIX + KIND_SIZE; + +/// Encodes a frame into `out`. +/// +/// Writes `len` (4 bytes, little-endian, value `1 + payload.len()`), the +/// `kind` byte, and the payload, in that order. +/// +/// # Errors +/// +/// Propagates any [`io::Error`] returned by the underlying writer, and +/// returns [`io::ErrorKind::InvalidInput`] when `kind + payload` does not +/// fit in the wire-format `u32` length prefix — guarantees the declared +/// frame length always matches the bytes written. +/// +/// # Panics +/// +/// Does not panic. +#[inline] +pub fn encode_frame(kind: u8, payload: &[u8], out: &mut W) -> io::Result<()> { + // `len` is the size of `kind + payload`. Reject payloads whose encoded + // body length cannot be represented in the wire-format `u32` prefix so we + // never emit a frame whose declared length disagrees with the bytes + // written. + let body_len_usize = payload + .len() + .checked_add(KIND_SIZE) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "frame payload too large"))?; + let body_len = u32::try_from(body_len_usize) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame payload too large"))?; + out.write_all(&body_len.to_le_bytes())?; + out.write_all(&[kind])?; + out.write_all(payload)?; + Ok(()) +} + +/// Decodes a single frame from the start of `buf`. +/// +/// On success returns `(kind, payload, bytes_consumed)`. `bytes_consumed` +/// includes the `len` prefix and the `kind` byte, so callers can advance +/// their read cursor by exactly that many bytes. +/// +/// # Errors +/// +/// Returns [`WireError::Truncated`] if `buf` is shorter than the framing +/// header or shorter than the body length declared by the header. +#[inline] +pub fn decode_frame(buf: &[u8]) -> Result<(u8, &[u8], usize), WireError> { + if buf.len() < MIN_FRAME_SIZE { + return Err(WireError::Truncated); + } + // SAFETY-style note: the bounds check above guarantees `buf[..4]` and + // `buf[4]` are in bounds. We avoid `[..]` indexing in production by + // using `get` everywhere; clippy::indexing_slicing is treated as a hard + // rule in this crate. + let len_bytes = buf.get(..LEN_PREFIX).ok_or(WireError::Truncated)?; + let mut len_arr = [0u8; LEN_PREFIX]; + len_arr.copy_from_slice(len_bytes); + let body_len = u32::from_le_bytes(len_arr) as usize; + + if body_len < KIND_SIZE { + return Err(WireError::InvalidPayload("frame body shorter than kind")); + } + + let total = LEN_PREFIX + .checked_add(body_len) + .ok_or(WireError::Truncated)?; + if buf.len() < total { + return Err(WireError::Truncated); + } + + let kind = *buf.get(LEN_PREFIX).ok_or(WireError::Truncated)?; + let payload_start = LEN_PREFIX + KIND_SIZE; + let payload_end = LEN_PREFIX + body_len; + let payload = buf + .get(payload_start..payload_end) + .ok_or(WireError::Truncated)?; + Ok((kind, payload, total)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn roundtrip_empty_payload() { + let mut buf = Vec::new(); + encode_frame(0x01, &[], &mut buf).expect("encode empty payload"); + let (kind, payload, consumed) = decode_frame(&buf).expect("decode empty payload"); + assert_eq!(kind, 0x01); + assert!(payload.is_empty()); + assert_eq!(consumed, buf.len()); + } + + #[test] + fn roundtrip_with_payload() { + let mut buf = Vec::new(); + let payload = [1u8, 2, 3, 4, 5]; + encode_frame(0x42, &payload, &mut buf).expect("encode payload"); + let (kind, decoded, consumed) = decode_frame(&buf).expect("decode payload"); + assert_eq!(kind, 0x42); + assert_eq!(decoded, &payload); + assert_eq!(consumed, buf.len()); + } + + #[test] + fn truncated_header_returns_truncated() { + // Only 3 bytes — shorter than the 5-byte minimum frame. + let buf = [0x05, 0x00, 0x00]; + assert_eq!(decode_frame(&buf), Err(WireError::Truncated)); + } + + #[test] + fn truncated_payload_returns_truncated() { + // Body length declares 10 bytes but we only have the 5-byte header. + let buf = [0x0A, 0x00, 0x00, 0x00, 0x01]; + assert_eq!(decode_frame(&buf), Err(WireError::Truncated)); + } + + #[test] + fn zero_body_length_is_invalid() { + // `len = 0` means there isn't even a kind byte — protocol violation. + let buf = [0x00, 0x00, 0x00, 0x00, 0x00]; + assert!(matches!( + decode_frame(&buf), + Err(WireError::InvalidPayload(_)) + )); + } + + #[test] + fn decode_consumes_only_one_frame_at_a_time() { + let mut buf = Vec::new(); + encode_frame(0x01, &[0xAA, 0xBB], &mut buf).expect("encode frame 1"); + encode_frame(0x02, &[0xCC], &mut buf).expect("encode frame 2"); + + let (k1, p1, used1) = decode_frame(&buf).expect("decode frame 1"); + assert_eq!(k1, 0x01); + assert_eq!(p1, &[0xAA, 0xBB]); + + let rest = buf.get(used1..).expect("rest of buffer"); + let (k2, p2, used2) = decode_frame(rest).expect("decode frame 2"); + assert_eq!(k2, 0x02); + assert_eq!(p2, &[0xCC]); + assert_eq!(used1 + used2, buf.len()); + } +} diff --git a/src/wire/inbound/cancel.rs b/src/wire/inbound/cancel.rs new file mode 100644 index 0000000..bb21ce6 --- /dev/null +++ b/src/wire/inbound/cancel.rs @@ -0,0 +1,94 @@ +//! `CancelOrder` inbound message. +//! +//! See `doc/wire-protocol.md` for the canonical layout. + +use crate::wire::error::WireError; +use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned}; + +/// Inbound `CancelOrder` packed wire layout. +/// +/// All fields are little-endian primitives. Total size: **24 bytes**. +/// +/// | Offset | Size | Field | Type | Notes | +/// |-------:|-----:|--------------|------|----------------------------| +/// | 0 | 8 | `client_ts` | u64 | client-side timestamp (ms) | +/// | 8 | 8 | `order_id` | u64 | order id to cancel | +/// | 16 | 8 | `account_id` | u64 | numeric account id | +#[derive( + Debug, Clone, Copy, PartialEq, Eq, FromBytes, IntoBytes, Unaligned, Immutable, KnownLayout, +)] +#[repr(C, packed)] +pub struct CancelOrderWire { + /// Client-supplied timestamp (milliseconds since the Unix epoch). + pub client_ts: u64, + /// Order id to cancel. + pub order_id: u64, + /// Numeric account identifier supplied by the client. + pub account_id: u64, +} + +const _: () = assert!(core::mem::size_of::() == 24); + +impl CancelOrderWire { + /// Returns the packed byte representation of `self`. + #[must_use] + #[inline] + pub fn as_payload_bytes(&self) -> &[u8] { + ::as_bytes(self) + } +} + +/// Decodes a `CancelOrder` payload (24 bytes). +/// +/// # Errors +/// +/// Returns [`WireError::InvalidPayload`] when the buffer length differs from +/// 24 bytes. +#[inline] +pub fn decode_cancel_order(payload: &[u8]) -> Result { + let view = CancelOrderWire::ref_from_bytes(payload) + .map_err(|_| WireError::InvalidPayload("CancelOrder: payload size mismatch"))?; + Ok(*view) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::wire::framing::{decode_frame, encode_frame}; + use proptest::prelude::*; + use zerocopy::IntoBytes; + + proptest! { + #[test] + fn roundtrip_through_frame( + client_ts in any::(), + order_id in any::(), + account_id in any::(), + ) { + let original = CancelOrderWire { client_ts, order_id, account_id }; + let mut framed = Vec::new(); + encode_frame(0x02, original.as_bytes(), &mut framed).expect("encode_frame"); + + let (kind, payload, _) = decode_frame(&framed).expect("decode_frame"); + prop_assert_eq!(kind, 0x02u8); + let decoded = decode_cancel_order(payload).expect("decode_cancel_order"); + prop_assert_eq!({ decoded.client_ts }, client_ts); + prop_assert_eq!({ decoded.order_id }, order_id); + prop_assert_eq!({ decoded.account_id }, account_id); + } + } + + #[test] + fn rejects_wrong_size() { + let buf = [0u8; 23]; + assert!(matches!( + decode_cancel_order(&buf), + Err(WireError::InvalidPayload(_)) + )); + let buf = [0u8; 25]; + assert!(matches!( + decode_cancel_order(&buf), + Err(WireError::InvalidPayload(_)) + )); + } +} diff --git a/src/wire/inbound/cancel_replace.rs b/src/wire/inbound/cancel_replace.rs new file mode 100644 index 0000000..160a54d --- /dev/null +++ b/src/wire/inbound/cancel_replace.rs @@ -0,0 +1,105 @@ +//! `CancelReplace` inbound message. +//! +//! See `doc/wire-protocol.md` for the canonical layout. + +use crate::wire::error::WireError; +use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned}; + +/// Inbound `CancelReplace` packed wire layout. +/// +/// All fields are little-endian primitives. Total size: **40 bytes**. +/// +/// | Offset | Size | Field | Type | Notes | +/// |-------:|-----:|--------------|------|-----------------------------| +/// | 0 | 8 | `client_ts` | u64 | client-side timestamp (ms) | +/// | 8 | 8 | `order_id` | u64 | original order id | +/// | 16 | 8 | `account_id` | u64 | numeric account id | +/// | 24 | 8 | `new_price` | i64 | replacement limit price | +/// | 32 | 8 | `new_qty` | u64 | replacement quantity | +#[derive( + Debug, Clone, Copy, PartialEq, Eq, FromBytes, IntoBytes, Unaligned, Immutable, KnownLayout, +)] +#[repr(C, packed)] +pub struct CancelReplaceWire { + /// Client-supplied timestamp (milliseconds since the Unix epoch). + pub client_ts: u64, + /// Original order id to replace. + pub order_id: u64, + /// Numeric account identifier supplied by the client. + pub account_id: u64, + /// Replacement limit price (tick-scaled). + pub new_price: i64, + /// Replacement quantity. + pub new_qty: u64, +} + +const _: () = assert!(core::mem::size_of::() == 40); + +impl CancelReplaceWire { + /// Returns the packed byte representation of `self`. + #[must_use] + #[inline] + pub fn as_payload_bytes(&self) -> &[u8] { + ::as_bytes(self) + } +} + +/// Decodes a `CancelReplace` payload (40 bytes). +/// +/// # Errors +/// +/// Returns [`WireError::InvalidPayload`] when the buffer length differs from +/// 40 bytes. +#[inline] +pub fn decode_cancel_replace(payload: &[u8]) -> Result { + let view = CancelReplaceWire::ref_from_bytes(payload) + .map_err(|_| WireError::InvalidPayload("CancelReplace: payload size mismatch"))?; + Ok(*view) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::wire::framing::{decode_frame, encode_frame}; + use proptest::prelude::*; + use zerocopy::IntoBytes; + + proptest! { + #[test] + fn roundtrip_through_frame( + client_ts in any::(), + order_id in any::(), + account_id in any::(), + new_price in any::(), + new_qty in any::(), + ) { + let original = CancelReplaceWire { + client_ts, + order_id, + account_id, + new_price, + new_qty, + }; + let mut framed = Vec::new(); + encode_frame(0x03, original.as_bytes(), &mut framed).expect("encode_frame"); + + let (kind, payload, _) = decode_frame(&framed).expect("decode_frame"); + prop_assert_eq!(kind, 0x03u8); + let decoded = decode_cancel_replace(payload).expect("decode_cancel_replace"); + prop_assert_eq!({ decoded.client_ts }, client_ts); + prop_assert_eq!({ decoded.order_id }, order_id); + prop_assert_eq!({ decoded.account_id }, account_id); + prop_assert_eq!({ decoded.new_price }, new_price); + prop_assert_eq!({ decoded.new_qty }, new_qty); + } + } + + #[test] + fn rejects_wrong_size() { + let buf = [0u8; 39]; + assert!(matches!( + decode_cancel_replace(&buf), + Err(WireError::InvalidPayload(_)) + )); + } +} diff --git a/src/wire/inbound/mass_cancel.rs b/src/wire/inbound/mass_cancel.rs new file mode 100644 index 0000000..3a32cc8 --- /dev/null +++ b/src/wire/inbound/mass_cancel.rs @@ -0,0 +1,159 @@ +//! `MassCancel` inbound message. +//! +//! See `doc/wire-protocol.md` for the canonical layout. + +use crate::wire::error::WireError; +use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned}; + +/// Cancel-all scope. Wire code `0x00`. +pub const SCOPE_ALL: u8 = 0; +/// Cancel by account scope. Wire code `0x01`. +pub const SCOPE_BY_ACCOUNT: u8 = 1; +/// Cancel by side scope. Wire code `0x02`. The side itself is encoded in the +/// low bit of `_pad[0]` — `0` = Buy, `1` = Sell. +pub const SCOPE_BY_SIDE: u8 = 2; + +/// Inbound `MassCancel` packed wire layout. +/// +/// All fields are little-endian primitives. Total size: **24 bytes**. +/// +/// | Offset | Size | Field | Type | Notes | +/// |-------:|-----:|--------------|--------|------------------------------------| +/// | 0 | 8 | `client_ts` | u64 | client-side timestamp (ms) | +/// | 8 | 8 | `account_id` | u64 | numeric account id | +/// | 16 | 1 | `scope` | u8 | `0` All, `1` ByAccount, `2` BySide | +/// | 17 | 7 | `_pad` | u8×7 | for `BySide`, `_pad[0] & 1` = side | +#[derive( + Debug, Clone, Copy, PartialEq, Eq, FromBytes, IntoBytes, Unaligned, Immutable, KnownLayout, +)] +#[repr(C, packed)] +pub struct MassCancelWire { + /// Client-supplied timestamp (milliseconds since the Unix epoch). + pub client_ts: u64, + /// Numeric account identifier supplied by the client. + pub account_id: u64, + /// Cancellation scope: `0` All, `1` ByAccount, `2` BySide. + pub scope: u8, + /// Reserved padding. For `scope == BySide`, the low bit of `_pad[0]` + /// encodes the side (`0` = Buy, `1` = Sell). Other bits must be zero. + pub _pad: [u8; 7], +} + +const _: () = assert!(core::mem::size_of::() == 24); + +impl MassCancelWire { + /// Returns the packed byte representation of `self`. + #[must_use] + #[inline] + pub fn as_payload_bytes(&self) -> &[u8] { + ::as_bytes(self) + } +} + +/// Decodes a `MassCancel` payload (24 bytes). +/// +/// # Errors +/// +/// Returns [`WireError::InvalidPayload`] when the buffer length differs from +/// 24 bytes, when the `scope` byte is outside the documented range, or when +/// reserved padding bits are non-zero (for `BySide` only the low bit of +/// `_pad[0]` is allowed). +#[inline] +pub fn decode_mass_cancel(payload: &[u8]) -> Result { + let view = MassCancelWire::ref_from_bytes(payload) + .map_err(|_| WireError::InvalidPayload("MassCancel: payload size mismatch"))?; + let scope = { view.scope }; + let pad = { view._pad }; + match scope { + SCOPE_ALL | SCOPE_BY_ACCOUNT => { + if pad.iter().any(|&byte| byte != 0) { + return Err(WireError::InvalidPayload( + "MassCancel: non-zero reserved padding", + )); + } + } + SCOPE_BY_SIDE => { + // Only the low bit of `_pad[0]` carries the side; every other + // padding bit must be zero. + let head = *pad.first().ok_or(WireError::Truncated)?; + if head & !1 != 0 { + return Err(WireError::InvalidPayload( + "MassCancel: reserved bits set in BySide pad[0]", + )); + } + if pad.iter().skip(1).any(|&byte| byte != 0) { + return Err(WireError::InvalidPayload( + "MassCancel: non-zero reserved padding", + )); + } + } + _ => return Err(WireError::InvalidPayload("MassCancel: unknown scope")), + } + Ok(*view) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::wire::framing::{decode_frame, encode_frame}; + use proptest::prelude::*; + use zerocopy::IntoBytes; + + proptest! { + #[test] + fn roundtrip_through_frame( + client_ts in any::(), + account_id in any::(), + scope in 0u8..=2u8, + side_bit in 0u8..=1u8, + ) { + let mut pad = [0u8; 7]; + if scope == SCOPE_BY_SIDE + && let Some(slot) = pad.get_mut(0) + { + *slot = side_bit; + } + let original = MassCancelWire { + client_ts, + account_id, + scope, + _pad: pad, + }; + + let mut framed = Vec::new(); + encode_frame(0x04, original.as_bytes(), &mut framed).expect("encode_frame"); + + let (kind, payload, _) = decode_frame(&framed).expect("decode_frame"); + prop_assert_eq!(kind, 0x04u8); + let decoded = decode_mass_cancel(payload).expect("decode_mass_cancel"); + prop_assert_eq!({ decoded.client_ts }, client_ts); + prop_assert_eq!({ decoded.account_id }, account_id); + prop_assert_eq!({ decoded.scope }, scope); + prop_assert_eq!({ decoded._pad }, pad); + } + } + + #[test] + fn rejects_unknown_scope() { + let bad = MassCancelWire { + client_ts: 0, + account_id: 0, + scope: 9, + _pad: [0u8; 7], + }; + let bytes = bad.as_bytes(); + assert!(matches!( + decode_mass_cancel(bytes), + Err(WireError::InvalidPayload(_)) + )); + } + + #[test] + fn rejects_wrong_size() { + let buf = [0u8; 23]; + assert!(matches!( + decode_mass_cancel(&buf), + Err(WireError::InvalidPayload(_)) + )); + } +} diff --git a/src/wire/inbound/mod.rs b/src/wire/inbound/mod.rs new file mode 100644 index 0000000..424164f --- /dev/null +++ b/src/wire/inbound/mod.rs @@ -0,0 +1,33 @@ +//! Inbound (gateway → engine) wire messages. +//! +//! Each message is a fixed-size, `#[repr(C, packed)]` struct that derives +//! the `zerocopy` traits needed to validate-and-cast `&[u8]` into a typed +//! reference without copying. The decoder helpers (`decode_*`) verify the +//! payload length and return an owned, packed copy of the wire struct. +//! +//! All fields are little-endian primitives — the packed struct memory +//! layout matches the on-wire byte order only on little-endian targets. +//! See `doc/wire-protocol.md` for the canonical layout tables. + +#[cfg(target_endian = "big")] +compile_error!( + "feature `wire` requires a little-endian target; the inbound zerocopy \ + structs are interpreted directly from protocol bytes and would decode \ + incorrectly on big-endian platforms. Use endian-aware field types \ + before enabling this feature on big-endian hosts." +); + +pub mod cancel; +pub mod cancel_replace; +pub mod mass_cancel; +pub mod new_order; + +pub use cancel::{CancelOrderWire, decode_cancel_order}; +pub use cancel_replace::{CancelReplaceWire, decode_cancel_replace}; +pub use mass_cancel::{ + MassCancelWire, SCOPE_ALL, SCOPE_BY_ACCOUNT, SCOPE_BY_SIDE, decode_mass_cancel, +}; +pub use new_order::{ + NewOrderWire, ORDER_TYPE_STANDARD, SIDE_BUY, SIDE_SELL, TIF_DAY, TIF_FOK, TIF_GTC, TIF_IOC, + decode_new_order, +}; diff --git a/src/wire/inbound/new_order.rs b/src/wire/inbound/new_order.rs new file mode 100644 index 0000000..2cc34e6 --- /dev/null +++ b/src/wire/inbound/new_order.rs @@ -0,0 +1,294 @@ +//! `NewOrder` inbound message. +//! +//! See `doc/wire-protocol.md` for the canonical layout. + +use crate::wire::error::WireError; +use pricelevel::{Hash32, Id, OrderType, Price, Quantity, Side, TimeInForce, TimestampMs}; +use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned}; + +/// Wire codes for the `side` field of [`NewOrderWire`]. +pub const SIDE_BUY: u8 = 0; +/// Wire codes for the `side` field of [`NewOrderWire`]. +pub const SIDE_SELL: u8 = 1; + +/// Wire codes for the `time_in_force` field. +pub const TIF_GTC: u8 = 0; +/// Wire codes for the `time_in_force` field. +pub const TIF_IOC: u8 = 1; +/// Wire codes for the `time_in_force` field. +pub const TIF_FOK: u8 = 2; +/// Wire codes for the `time_in_force` field. +pub const TIF_DAY: u8 = 3; + +/// Wire codes for the `order_type` field. +pub const ORDER_TYPE_STANDARD: u8 = 0; + +/// Inbound `NewOrder` packed wire layout. +/// +/// All fields are little-endian primitives. Total size: **48 bytes**. +/// +/// | Offset | Size | Field | Type | Notes | +/// |-------:|-----:|-----------------|------|--------------------------------| +/// | 0 | 8 | `client_ts` | u64 | client-side timestamp (ms) | +/// | 8 | 8 | `order_id` | u64 | unique order id | +/// | 16 | 8 | `account_id` | u64 | numeric account id | +/// | 24 | 8 | `price` | i64 | tick-scaled limit price | +/// | 32 | 8 | `qty` | u64 | quantity | +/// | 40 | 1 | `side` | u8 | `0` Buy, `1` Sell | +/// | 41 | 1 | `time_in_force` | u8 | `0` GTC, `1` IOC, `2` FOK, `3` DAY | +/// | 42 | 1 | `order_type` | u8 | `0` Standard (only one in MVP) | +/// | 43 | 5 | `_pad` | u8×5 | reserved, must be zero | +#[derive( + Debug, Clone, Copy, PartialEq, Eq, FromBytes, IntoBytes, Unaligned, Immutable, KnownLayout, +)] +#[repr(C, packed)] +pub struct NewOrderWire { + /// Client-supplied timestamp (milliseconds since the Unix epoch). + pub client_ts: u64, + /// Unique order identifier supplied by the client. + pub order_id: u64, + /// Numeric account identifier supplied by the client. + pub account_id: u64, + /// Tick-scaled limit price. + pub price: i64, + /// Order quantity. + pub qty: u64, + /// Side: `0` = Buy, `1` = Sell. + pub side: u8, + /// Time-in-force: `0` GTC / `1` IOC / `2` FOK / `3` DAY. + pub time_in_force: u8, + /// Order type: `0` Standard (only Standard is supported in the MVP). + pub order_type: u8, + /// Reserved padding. Must be zero. + pub _pad: [u8; 5], +} + +const _: () = assert!(core::mem::size_of::() == 48); + +impl NewOrderWire { + /// Returns the packed byte representation of `self`. + /// + /// Equivalent to `::as_bytes(&self)` but + /// callable without importing `zerocopy` at the call site. + #[must_use] + #[inline] + pub fn as_payload_bytes(&self) -> &[u8] { + ::as_bytes(self) + } +} + +/// Decodes a `NewOrder` payload (48 bytes). +/// +/// # Errors +/// +/// Returns [`WireError::InvalidPayload`] when the buffer length differs from +/// 48 bytes. +#[inline] +pub fn decode_new_order(payload: &[u8]) -> Result { + let view = NewOrderWire::ref_from_bytes(payload) + .map_err(|_| WireError::InvalidPayload("NewOrder: payload size mismatch"))?; + Ok(*view) +} + +impl TryFrom<&NewOrderWire> for OrderType<()> { + type Error = WireError; + + fn try_from(value: &NewOrderWire) -> Result { + // Copy each packed field into a local first — taking a reference to a + // packed field is undefined behavior. The `{ value.field }` syntax + // forces a copy. + let order_id = { value.order_id }; + let account_id = { value.account_id }; + let client_ts = { value.client_ts }; + let price_raw = { value.price }; + let qty = { value.qty }; + let side_byte = { value.side }; + let tif_byte = { value.time_in_force }; + let kind_byte = { value.order_type }; + let pad = { value._pad }; + + if pad.iter().any(|&byte| byte != 0) { + return Err(WireError::InvalidPayload( + "NewOrder: non-zero reserved padding", + )); + } + if price_raw < 0 { + return Err(WireError::InvalidPayload("NewOrder: negative price")); + } + + let side = match side_byte { + SIDE_BUY => Side::Buy, + SIDE_SELL => Side::Sell, + _ => return Err(WireError::InvalidPayload("NewOrder: unknown side")), + }; + let time_in_force = match tif_byte { + TIF_GTC => TimeInForce::Gtc, + TIF_IOC => TimeInForce::Ioc, + TIF_FOK => TimeInForce::Fok, + TIF_DAY => TimeInForce::Day, + _ => { + return Err(WireError::InvalidPayload("NewOrder: unknown time_in_force")); + } + }; + if kind_byte != ORDER_TYPE_STANDARD { + return Err(WireError::InvalidPayload( + "NewOrder: unsupported order_type", + )); + } + + // Encode the numeric account_id into the low 8 bytes of a Hash32 so + // it is preserved across the wire/domain boundary without colliding + // with `Hash32::zero()` (which is the "no STP" sentinel). + let mut user_bytes = [0u8; 32]; + if let Some(slot) = user_bytes.get_mut(0..8) { + slot.copy_from_slice(&account_id.to_le_bytes()); + } + let user_id = Hash32::new(user_bytes); + + Ok(OrderType::Standard { + id: Id::from_u64(order_id), + price: Price::new(u128::from(price_raw as u64)), + quantity: Quantity::new(qty), + side, + user_id, + timestamp: TimestampMs::new(client_ts), + time_in_force, + extra_fields: (), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::wire::framing::{decode_frame, encode_frame}; + use proptest::prelude::*; + use zerocopy::IntoBytes; + + proptest! { + #[test] + fn roundtrip_through_frame( + client_ts in any::(), + order_id in any::(), + account_id in any::(), + price in 0i64..i64::MAX, + qty in any::(), + side in 0u8..=1u8, + tif in 0u8..=3u8, + ) { + let original = NewOrderWire { + client_ts, + order_id, + account_id, + price, + qty, + side, + time_in_force: tif, + order_type: ORDER_TYPE_STANDARD, + _pad: [0u8; 5], + }; + + let mut framed = Vec::new(); + encode_frame(0x01, original.as_bytes(), &mut framed).expect("encode_frame"); + + let (kind, payload, used) = decode_frame(&framed).expect("decode_frame"); + prop_assert_eq!(kind, 0x01u8); + prop_assert_eq!(used, framed.len()); + + let decoded = decode_new_order(payload).expect("decode_new_order"); + // Read packed fields via copy. + prop_assert_eq!({ decoded.client_ts }, client_ts); + prop_assert_eq!({ decoded.order_id }, order_id); + prop_assert_eq!({ decoded.account_id }, account_id); + prop_assert_eq!({ decoded.price }, price); + prop_assert_eq!({ decoded.qty }, qty); + prop_assert_eq!({ decoded.side }, side); + prop_assert_eq!({ decoded.time_in_force }, tif); + prop_assert_eq!({ decoded.order_type }, ORDER_TYPE_STANDARD); + prop_assert_eq!({ decoded._pad }, [0u8; 5]); + } + } + + #[test] + fn rejects_short_payload() { + let buf = [0u8; 47]; + assert!(matches!( + decode_new_order(&buf), + Err(WireError::InvalidPayload(_)) + )); + } + + #[test] + fn rejects_long_payload() { + let buf = [0u8; 49]; + assert!(matches!( + decode_new_order(&buf), + Err(WireError::InvalidPayload(_)) + )); + } + + #[test] + fn try_from_rejects_unknown_side() { + let wire = NewOrderWire { + client_ts: 0, + order_id: 1, + account_id: 2, + price: 100, + qty: 5, + side: 9, + time_in_force: TIF_GTC, + order_type: ORDER_TYPE_STANDARD, + _pad: [0u8; 5], + }; + let res: Result, _> = (&wire).try_into(); + assert!(matches!(res, Err(WireError::InvalidPayload(_)))); + } + + #[test] + fn try_from_rejects_negative_price() { + let wire = NewOrderWire { + client_ts: 0, + order_id: 1, + account_id: 2, + price: -1, + qty: 5, + side: SIDE_BUY, + time_in_force: TIF_GTC, + order_type: ORDER_TYPE_STANDARD, + _pad: [0u8; 5], + }; + let res: Result, _> = (&wire).try_into(); + assert!(matches!(res, Err(WireError::InvalidPayload(_)))); + } + + #[test] + fn try_from_builds_standard_order() { + let wire = NewOrderWire { + client_ts: 1_700_000_000_000, + order_id: 42, + account_id: 7, + price: 9_999, + qty: 10, + side: SIDE_SELL, + time_in_force: TIF_IOC, + order_type: ORDER_TYPE_STANDARD, + _pad: [0u8; 5], + }; + let order: OrderType<()> = (&wire).try_into().expect("convert to OrderType"); + match order { + OrderType::Standard { + price, + quantity, + side, + time_in_force, + .. + } => { + assert_eq!(price.as_u128(), 9_999); + assert_eq!(quantity.as_u64(), 10); + assert_eq!(side, Side::Sell); + assert_eq!(time_in_force, TimeInForce::Ioc); + } + _ => panic!("expected Standard variant"), + } + } +} diff --git a/src/wire/mod.rs b/src/wire/mod.rs new file mode 100644 index 0000000..ad6821f --- /dev/null +++ b/src/wire/mod.rs @@ -0,0 +1,178 @@ +//! Feature-gated binary wire protocol. +//! +//! Enabled via `--features wire`. The protocol is **additive** — `JSON` +//! and `bincode` paths are unchanged; existing callers see no behaviour +//! change. +//! +//! # Framing +//! +//! Every frame is `[len:u32 LE][kind:u8][payload …]`. `len` is the byte +//! length of `kind + payload` (it does NOT include the 4-byte `len` prefix +//! itself). All multi-byte integers are little-endian. +//! +//! # Direction +//! +//! Inbound (`0x01..=0x7F`) is gateway → engine. Outbound (`0x80..=0xFF`) +//! is engine → gateway. +//! +//! | Code | Direction | Message | Fixed payload size | +//! |---------|-----------|-----------------|-------------------:| +//! | `0x01` | inbound | `NewOrder` | 48 B | +//! | `0x02` | inbound | `CancelOrder` | 24 B | +//! | `0x03` | inbound | `CancelReplace` | 40 B | +//! | `0x04` | inbound | `MassCancel` | 24 B | +//! | `0x81` | outbound | `ExecReport` | 44 B | +//! | `0x82` | outbound | `TradePrint` | 48 B | +//! | `0x83` | outbound | `BookUpdate` | 32 B | +//! +//! # Inbound zero-copy +//! +//! Inbound messages are `#[repr(C, packed)]` and derive the `zerocopy` +//! traits required to validate-and-cast a `&[u8]` into a typed reference +//! without copying. Decoding is safe (no `unsafe` is required at this +//! layer); it returns [`WireError::InvalidPayload`] on size mismatch. +//! +//! # Outbound byte-cursor +//! +//! Outbound messages are encoded via explicit byte-cursor (`Vec` + +//! `extend_from_slice`). Outbound is I/O-dominated, so the marginal cost +//! of copying a few dozen bytes is negligible compared to socket +//! overhead, and we keep the layout free to evolve without exposing a +//! packed type to callers. +//! +//! See `doc/wire-protocol.md` for the canonical layout tables. + +pub mod error; +pub mod framing; +pub mod inbound; +pub mod outbound; + +pub use error::WireError; +pub use framing::{decode_frame, encode_frame}; +pub use inbound::{ + CancelOrderWire, CancelReplaceWire, MassCancelWire, NewOrderWire, decode_cancel_order, + decode_cancel_replace, decode_mass_cancel, decode_new_order, +}; +pub use outbound::{ + BookUpdateWire, ExecReport, TradePrintWire, decode_book_update, decode_exec_report, + decode_trade_print, encode_book_update, encode_exec_report, encode_trade_print, status_to_wire, +}; + +/// Kind discriminants for every binary wire message. +/// +/// Wire codes are stable across `0.7.x` patch releases. Inbound messages +/// occupy the low half of the byte (`0x01..=0x7F`); outbound messages +/// occupy the high half (`0x80..=0xFF`). Variant `0x00` is reserved as a +/// "no-message" sentinel and is intentionally absent from the enum. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[repr(u8)] +#[non_exhaustive] +pub enum MessageKind { + /// Inbound: submit a new order. Payload: [`NewOrderWire`] (48 B). + NewOrder = 0x01, + /// Inbound: cancel an existing order. Payload: [`CancelOrderWire`] (24 B). + CancelOrder = 0x02, + /// Inbound: cancel-and-replace an existing order. Payload: + /// [`CancelReplaceWire`] (40 B). + CancelReplace = 0x03, + /// Inbound: mass cancel by scope. Payload: [`MassCancelWire`] (24 B). + MassCancel = 0x04, + /// Outbound: execution report for an order's lifecycle event. Payload: + /// [`ExecReport`] (44 B). + ExecReport = 0x81, + /// Outbound: trade print announcing a fill. Payload: [`TradePrintWire`] + /// (48 B). + TradePrint = 0x82, + /// Outbound: incremental book level update. Payload: [`BookUpdateWire`] + /// (32 B). + BookUpdate = 0x83, +} + +impl MessageKind { + /// Resolves a raw kind byte to a typed [`MessageKind`]. + /// + /// # Errors + /// + /// Returns [`WireError::UnknownKind`] for any byte outside the + /// documented set. + #[inline] + pub fn from_u8(byte: u8) -> Result { + match byte { + 0x01 => Ok(Self::NewOrder), + 0x02 => Ok(Self::CancelOrder), + 0x03 => Ok(Self::CancelReplace), + 0x04 => Ok(Self::MassCancel), + 0x81 => Ok(Self::ExecReport), + 0x82 => Ok(Self::TradePrint), + 0x83 => Ok(Self::BookUpdate), + other => Err(WireError::UnknownKind(other)), + } + } + + /// Returns the raw kind byte for this variant. + #[must_use] + #[inline] + pub const fn as_u8(self) -> u8 { + self as u8 + } + + /// Returns `true` if this is an inbound (gateway → engine) message. + #[must_use] + #[inline] + pub const fn is_inbound(self) -> bool { + (self as u8) < 0x80 + } + + /// Returns `true` if this is an outbound (engine → gateway) message. + #[must_use] + #[inline] + pub const fn is_outbound(self) -> bool { + (self as u8) >= 0x80 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn from_u8_round_trip() { + for kind in [ + MessageKind::NewOrder, + MessageKind::CancelOrder, + MessageKind::CancelReplace, + MessageKind::MassCancel, + MessageKind::ExecReport, + MessageKind::TradePrint, + MessageKind::BookUpdate, + ] { + let byte = kind.as_u8(); + let resolved = MessageKind::from_u8(byte).expect("resolve known kind"); + assert_eq!(resolved, kind); + } + } + + #[test] + fn from_u8_rejects_unknown() { + assert_eq!( + MessageKind::from_u8(0x00), + Err(WireError::UnknownKind(0x00)) + ); + assert_eq!( + MessageKind::from_u8(0x05), + Err(WireError::UnknownKind(0x05)) + ); + assert_eq!( + MessageKind::from_u8(0xFF), + Err(WireError::UnknownKind(0xFF)) + ); + } + + #[test] + fn direction_classification() { + assert!(MessageKind::NewOrder.is_inbound()); + assert!(!MessageKind::NewOrder.is_outbound()); + assert!(MessageKind::ExecReport.is_outbound()); + assert!(!MessageKind::ExecReport.is_inbound()); + } +} diff --git a/src/wire/outbound/book_update.rs b/src/wire/outbound/book_update.rs new file mode 100644 index 0000000..680b164 --- /dev/null +++ b/src/wire/outbound/book_update.rs @@ -0,0 +1,158 @@ +//! `BookUpdate` outbound message. +//! +//! See `doc/wire-protocol.md` for the canonical layout. + +use crate::wire::error::WireError; + +/// Wire codes for the `side` field. +pub const SIDE_BUY: u8 = 0; +/// Wire codes for the `side` field. +pub const SIDE_SELL: u8 = 1; + +/// Fixed payload size in bytes for a `BookUpdateWire` (with trailing pad). +pub const BOOK_UPDATE_SIZE: usize = 32; + +/// Outbound `BookUpdate` message body. +/// +/// Total payload size: **32 bytes** (26 bytes of fields + 6 bytes of trailing +/// pad to round to a 32-byte block — keeps the message a comfortable +/// cache-line slice and leaves room for forward-compatible additions). +/// +/// | Offset | Size | Field | Type | Notes | +/// |-------:|-----:|--------------|------|-----------------------------| +/// | 0 | 8 | `engine_seq` | u64 | global engine sequence | +/// | 8 | 1 | `side` | u8 | `0` Buy, `1` Sell | +/// | 9 | 8 | `price` | i64 | tick-scaled level price | +/// | 17 | 8 | `qty` | u64 | new total quantity at level | +/// | 25 | 1 | `_pad0` | u8 | reserved | +/// | 26 | 6 | `_pad` | u8×6 | reserved, must be zero | +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct BookUpdateWire { + /// Global engine sequence (monotonic across outbound streams). + pub engine_seq: u64, + /// Side of the level: `0` = Buy, `1` = Sell. + pub side: u8, + /// Tick-scaled level price. + pub price: i64, + /// New total quantity resting at this level (`0` if the level was wiped). + pub qty: u64, +} + +/// Encodes a `BookUpdate` payload (32 bytes) into `out`. The trailing 7-byte +/// pad is zero-filled. +#[inline] +pub fn encode_book_update(update: &BookUpdateWire, out: &mut Vec) { + out.reserve(BOOK_UPDATE_SIZE); + out.extend_from_slice(&update.engine_seq.to_le_bytes()); + out.push(update.side); + out.extend_from_slice(&update.price.to_le_bytes()); + out.extend_from_slice(&update.qty.to_le_bytes()); + // 7 bytes of trailing pad to round to 32. + out.extend_from_slice(&[0u8; 7]); +} + +/// Decodes a `BookUpdate` payload. +/// +/// # Errors +/// +/// Returns [`WireError::InvalidPayload`] when the buffer length differs from +/// [`BOOK_UPDATE_SIZE`]. +#[inline] +pub fn decode_book_update(payload: &[u8]) -> Result { + if payload.len() != BOOK_UPDATE_SIZE { + return Err(WireError::InvalidPayload( + "BookUpdate: payload size mismatch", + )); + } + let read_u64 = |offset: usize| -> Result { + let slot = payload + .get(offset..offset + 8) + .ok_or(WireError::Truncated)?; + let mut arr = [0u8; 8]; + arr.copy_from_slice(slot); + Ok(u64::from_le_bytes(arr)) + }; + let read_i64 = |offset: usize| -> Result { + let slot = payload + .get(offset..offset + 8) + .ok_or(WireError::Truncated)?; + let mut arr = [0u8; 8]; + arr.copy_from_slice(slot); + Ok(i64::from_le_bytes(arr)) + }; + + let engine_seq = read_u64(0)?; + let side = *payload.get(8).ok_or(WireError::Truncated)?; + if side != SIDE_BUY && side != SIDE_SELL { + return Err(WireError::InvalidPayload("BookUpdate: unknown side")); + } + let price = read_i64(9)?; + let qty = read_u64(17)?; + let pad = payload.get(25..32).ok_or(WireError::Truncated)?; + if pad.iter().any(|&byte| byte != 0) { + return Err(WireError::InvalidPayload( + "BookUpdate: non-zero reserved padding", + )); + } + Ok(BookUpdateWire { + engine_seq, + side, + price, + qty, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::wire::framing::{decode_frame, encode_frame}; + use proptest::prelude::*; + + #[test] + fn payload_size_constant() { + let upd = BookUpdateWire { + engine_seq: 0, + side: SIDE_BUY, + price: 0, + qty: 0, + }; + let mut buf = Vec::new(); + encode_book_update(&upd, &mut buf); + assert_eq!(buf.len(), BOOK_UPDATE_SIZE); + } + + proptest! { + #[test] + fn roundtrip_through_frame( + engine_seq in any::(), + side in 0u8..=1u8, + price in any::(), + qty in any::(), + ) { + let original = BookUpdateWire { + engine_seq, + side, + price, + qty, + }; + let mut payload = Vec::new(); + encode_book_update(&original, &mut payload); + let mut framed = Vec::new(); + encode_frame(0x83, &payload, &mut framed).expect("encode_frame"); + + let (kind, decoded_payload, _) = decode_frame(&framed).expect("decode_frame"); + prop_assert_eq!(kind, 0x83u8); + let decoded = decode_book_update(decoded_payload).expect("decode_book_update"); + prop_assert_eq!(decoded, original); + } + } + + #[test] + fn rejects_wrong_size() { + let buf = [0u8; BOOK_UPDATE_SIZE - 1]; + assert!(matches!( + decode_book_update(&buf), + Err(WireError::InvalidPayload(_)) + )); + } +} diff --git a/src/wire/outbound/exec_report.rs b/src/wire/outbound/exec_report.rs new file mode 100644 index 0000000..5c2d4b6 --- /dev/null +++ b/src/wire/outbound/exec_report.rs @@ -0,0 +1,252 @@ +//! `ExecReport` outbound message. +//! +//! Outbound encoders use an explicit byte-cursor (`Vec::extend_from_slice`) +//! rather than `#[repr(C, packed)]`. This is I/O-dominated traffic — the cost +//! of a few dozen bytes of explicit copying is dwarfed by socket overhead, and +//! we get freedom to evolve the layout without exposing a packed type. +//! +//! See `doc/wire-protocol.md` for the canonical layout. + +use crate::orderbook::order_state::OrderStatus; +use crate::wire::error::WireError; + +/// Wire code for `OrderStatus::Open`. +pub const STATUS_OPEN: u8 = 0; +/// Wire code for `OrderStatus::PartiallyFilled`. +pub const STATUS_PARTIALLY_FILLED: u8 = 1; +/// Wire code for `OrderStatus::Filled`. +pub const STATUS_FILLED: u8 = 2; +/// Wire code for `OrderStatus::Cancelled`. +pub const STATUS_CANCELLED: u8 = 3; +/// Wire code for `OrderStatus::Rejected`. +pub const STATUS_REJECTED: u8 = 4; + +/// Fixed payload size in bytes for an `ExecReport`. +pub const EXEC_REPORT_SIZE: usize = 44; + +/// Outbound `ExecReport` message body. +/// +/// Total payload size: **44 bytes**. +/// +/// | Offset | Size | Field | Type | Notes | +/// |-------:|-----:|------------------|------|----------------------------------| +/// | 0 | 8 | `engine_seq` | u64 | global engine sequence | +/// | 8 | 8 | `order_id` | u64 | order id | +/// | 16 | 1 | `status` | u8 | see `STATUS_*` constants | +/// | 17 | 8 | `filled_qty` | u64 | cumulative filled quantity | +/// | 25 | 8 | `remaining_qty` | u64 | quantity still resting | +/// | 33 | 8 | `price` | i64 | tick-scaled price | +/// | 41 | 2 | `reject_reason` | u16 | reject code, `0` if not rejected | +/// | 43 | 1 | `_pad` | u8 | reserved, must be zero | +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ExecReport { + /// Global engine sequence (monotonic across outbound streams). + pub engine_seq: u64, + /// Order id. + pub order_id: u64, + /// Status discriminant — see `STATUS_*` constants. + pub status: u8, + /// Cumulative filled quantity for this order. + pub filled_qty: u64, + /// Quantity still resting on the book. + pub remaining_qty: u64, + /// Tick-scaled price. + pub price: i64, + /// Numeric reject code. `0` when the report is not a rejection. + pub reject_reason: u16, + /// Reserved. Must be zero. + pub _pad: u8, +} + +/// Maps an [`OrderStatus`] to its wire-side discriminant. +/// +/// The mapping is stable across `0.7.x` patch releases. +#[must_use] +#[inline] +pub fn status_to_wire(status: &OrderStatus) -> u8 { + match status { + OrderStatus::Open => STATUS_OPEN, + OrderStatus::PartiallyFilled { .. } => STATUS_PARTIALLY_FILLED, + OrderStatus::Filled { .. } => STATUS_FILLED, + OrderStatus::Cancelled { .. } => STATUS_CANCELLED, + OrderStatus::Rejected { .. } => STATUS_REJECTED, + } +} + +/// Encodes an `ExecReport` payload (44 bytes) into `out`. +#[inline] +pub fn encode_exec_report(report: &ExecReport, out: &mut Vec) { + out.reserve(EXEC_REPORT_SIZE); + out.extend_from_slice(&report.engine_seq.to_le_bytes()); + out.extend_from_slice(&report.order_id.to_le_bytes()); + out.push(report.status); + out.extend_from_slice(&report.filled_qty.to_le_bytes()); + out.extend_from_slice(&report.remaining_qty.to_le_bytes()); + out.extend_from_slice(&report.price.to_le_bytes()); + out.extend_from_slice(&report.reject_reason.to_le_bytes()); + out.push(report._pad); +} + +/// Decodes an `ExecReport` payload. +/// +/// # Errors +/// +/// Returns [`WireError::InvalidPayload`] when the buffer length differs from +/// [`EXEC_REPORT_SIZE`]. +#[inline] +pub fn decode_exec_report(payload: &[u8]) -> Result { + if payload.len() != EXEC_REPORT_SIZE { + return Err(WireError::InvalidPayload( + "ExecReport: payload size mismatch", + )); + } + let read_u64 = |offset: usize| -> Result { + let slot = payload + .get(offset..offset + 8) + .ok_or(WireError::Truncated)?; + let mut arr = [0u8; 8]; + arr.copy_from_slice(slot); + Ok(u64::from_le_bytes(arr)) + }; + let read_i64 = |offset: usize| -> Result { + let slot = payload + .get(offset..offset + 8) + .ok_or(WireError::Truncated)?; + let mut arr = [0u8; 8]; + arr.copy_from_slice(slot); + Ok(i64::from_le_bytes(arr)) + }; + let read_u16 = |offset: usize| -> Result { + let slot = payload + .get(offset..offset + 2) + .ok_or(WireError::Truncated)?; + let mut arr = [0u8; 2]; + arr.copy_from_slice(slot); + Ok(u16::from_le_bytes(arr)) + }; + + let engine_seq = read_u64(0)?; + let order_id = read_u64(8)?; + let status = *payload.get(16).ok_or(WireError::Truncated)?; + if status > STATUS_REJECTED { + return Err(WireError::InvalidPayload("ExecReport: unknown status")); + } + let filled_qty = read_u64(17)?; + let remaining_qty = read_u64(25)?; + let price = read_i64(33)?; + let reject_reason = read_u16(41)?; + let pad = *payload.get(43).ok_or(WireError::Truncated)?; + if pad != 0 { + return Err(WireError::InvalidPayload( + "ExecReport: non-zero reserved padding", + )); + } + Ok(ExecReport { + engine_seq, + order_id, + status, + filled_qty, + remaining_qty, + price, + reject_reason, + _pad: pad, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::orderbook::reject_reason::RejectReason; + use crate::wire::framing::{decode_frame, encode_frame}; + use proptest::prelude::*; + + #[test] + fn payload_size_constant() { + let report = ExecReport { + engine_seq: 0, + order_id: 0, + status: STATUS_OPEN, + filled_qty: 0, + remaining_qty: 0, + price: 0, + reject_reason: 0, + _pad: 0, + }; + let mut buf = Vec::new(); + encode_exec_report(&report, &mut buf); + assert_eq!(buf.len(), EXEC_REPORT_SIZE); + } + + #[test] + fn status_to_wire_covers_all_variants() { + assert_eq!(status_to_wire(&OrderStatus::Open), STATUS_OPEN); + assert_eq!( + status_to_wire(&OrderStatus::PartiallyFilled { + original_quantity: 10, + filled_quantity: 4 + }), + STATUS_PARTIALLY_FILLED + ); + assert_eq!( + status_to_wire(&OrderStatus::Filled { + filled_quantity: 10 + }), + STATUS_FILLED + ); + assert_eq!( + status_to_wire(&OrderStatus::Cancelled { + filled_quantity: 0, + reason: crate::orderbook::order_state::CancelReason::UserRequested, + }), + STATUS_CANCELLED + ); + assert_eq!( + status_to_wire(&OrderStatus::Rejected { + reason: RejectReason::KillSwitchActive + }), + STATUS_REJECTED + ); + } + + proptest! { + #[test] + fn roundtrip_through_frame( + engine_seq in any::(), + order_id in any::(), + status in 0u8..=4u8, + filled_qty in any::(), + remaining_qty in any::(), + price in any::(), + reject_reason in any::(), + ) { + let original = ExecReport { + engine_seq, + order_id, + status, + filled_qty, + remaining_qty, + price, + reject_reason, + _pad: 0, + }; + let mut payload = Vec::new(); + encode_exec_report(&original, &mut payload); + let mut framed = Vec::new(); + encode_frame(0x81, &payload, &mut framed).expect("encode_frame"); + + let (kind, decoded_payload, _) = decode_frame(&framed).expect("decode_frame"); + prop_assert_eq!(kind, 0x81u8); + let decoded = decode_exec_report(decoded_payload).expect("decode_exec_report"); + prop_assert_eq!(decoded, original); + } + } + + #[test] + fn rejects_short_payload() { + let buf = [0u8; EXEC_REPORT_SIZE - 1]; + assert!(matches!( + decode_exec_report(&buf), + Err(WireError::InvalidPayload(_)) + )); + } +} diff --git a/src/wire/outbound/mod.rs b/src/wire/outbound/mod.rs new file mode 100644 index 0000000..cb3e5a9 --- /dev/null +++ b/src/wire/outbound/mod.rs @@ -0,0 +1,22 @@ +//! Outbound (engine → gateway) wire messages. +//! +//! Outbound messages use byte-cursor encoders rather than packed structs. +//! Outbound traffic is I/O-dominated, so the cost of explicit field-by-field +//! copying into a `Vec` is negligible compared to the socket overhead, +//! and we keep the layout free to evolve without exposing a packed type to +//! callers. +//! +//! All fields are little-endian primitives. See `doc/wire-protocol.md` for +//! the canonical layout tables. + +pub mod book_update; +pub mod exec_report; +pub mod trade_print; + +pub use book_update::{BOOK_UPDATE_SIZE, BookUpdateWire, decode_book_update, encode_book_update}; +pub use exec_report::{ + EXEC_REPORT_SIZE, ExecReport, STATUS_CANCELLED, STATUS_FILLED, STATUS_OPEN, + STATUS_PARTIALLY_FILLED, STATUS_REJECTED, decode_exec_report, encode_exec_report, + status_to_wire, +}; +pub use trade_print::{TRADE_PRINT_SIZE, TradePrintWire, decode_trade_print, encode_trade_print}; diff --git a/src/wire/outbound/trade_print.rs b/src/wire/outbound/trade_print.rs new file mode 100644 index 0000000..862e982 --- /dev/null +++ b/src/wire/outbound/trade_print.rs @@ -0,0 +1,149 @@ +//! `TradePrint` outbound message. +//! +//! See `doc/wire-protocol.md` for the canonical layout. + +use crate::wire::error::WireError; + +/// Fixed payload size in bytes for a `TradePrintWire`. +pub const TRADE_PRINT_SIZE: usize = 48; + +/// Outbound `TradePrint` message body. +/// +/// Total payload size: **48 bytes**. +/// +/// | Offset | Size | Field | Type | Notes | +/// |-------:|-----:|---------------|------|------------------------------| +/// | 0 | 8 | `engine_seq` | u64 | global engine sequence | +/// | 8 | 8 | `maker_id` | u64 | maker order id | +/// | 16 | 8 | `taker_id` | u64 | taker order id | +/// | 24 | 8 | `price` | i64 | tick-scaled fill price | +/// | 32 | 8 | `qty` | u64 | matched quantity | +/// | 40 | 8 | `ts` | u64 | engine timestamp (ms) | +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TradePrintWire { + /// Global engine sequence (monotonic across outbound streams). + pub engine_seq: u64, + /// Maker order id (the resting side of the match). + pub maker_id: u64, + /// Taker order id (the incoming side of the match). + pub taker_id: u64, + /// Tick-scaled fill price. + pub price: i64, + /// Matched quantity. + pub qty: u64, + /// Engine timestamp in milliseconds. + pub ts: u64, +} + +/// Encodes a `TradePrint` payload (48 bytes) into `out`. +#[inline] +pub fn encode_trade_print(trade: &TradePrintWire, out: &mut Vec) { + out.reserve(TRADE_PRINT_SIZE); + out.extend_from_slice(&trade.engine_seq.to_le_bytes()); + out.extend_from_slice(&trade.maker_id.to_le_bytes()); + out.extend_from_slice(&trade.taker_id.to_le_bytes()); + out.extend_from_slice(&trade.price.to_le_bytes()); + out.extend_from_slice(&trade.qty.to_le_bytes()); + out.extend_from_slice(&trade.ts.to_le_bytes()); +} + +/// Decodes a `TradePrint` payload. +/// +/// # Errors +/// +/// Returns [`WireError::InvalidPayload`] when the buffer length differs from +/// [`TRADE_PRINT_SIZE`]. +#[inline] +pub fn decode_trade_print(payload: &[u8]) -> Result { + if payload.len() != TRADE_PRINT_SIZE { + return Err(WireError::InvalidPayload( + "TradePrint: payload size mismatch", + )); + } + let read_u64 = |offset: usize| -> Result { + let slot = payload + .get(offset..offset + 8) + .ok_or(WireError::Truncated)?; + let mut arr = [0u8; 8]; + arr.copy_from_slice(slot); + Ok(u64::from_le_bytes(arr)) + }; + let read_i64 = |offset: usize| -> Result { + let slot = payload + .get(offset..offset + 8) + .ok_or(WireError::Truncated)?; + let mut arr = [0u8; 8]; + arr.copy_from_slice(slot); + Ok(i64::from_le_bytes(arr)) + }; + + Ok(TradePrintWire { + engine_seq: read_u64(0)?, + maker_id: read_u64(8)?, + taker_id: read_u64(16)?, + price: read_i64(24)?, + qty: read_u64(32)?, + ts: read_u64(40)?, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::wire::framing::{decode_frame, encode_frame}; + use proptest::prelude::*; + + #[test] + fn payload_size_constant() { + let trade = TradePrintWire { + engine_seq: 0, + maker_id: 0, + taker_id: 0, + price: 0, + qty: 0, + ts: 0, + }; + let mut buf = Vec::new(); + encode_trade_print(&trade, &mut buf); + assert_eq!(buf.len(), TRADE_PRINT_SIZE); + } + + proptest! { + #[test] + fn roundtrip_through_frame( + engine_seq in any::(), + maker_id in any::(), + taker_id in any::(), + price in any::(), + qty in any::(), + ts in any::(), + ) { + let original = TradePrintWire { + engine_seq, + maker_id, + taker_id, + price, + qty, + ts, + }; + let mut payload = Vec::new(); + encode_trade_print(&original, &mut payload); + let mut framed = Vec::new(); + encode_frame(0x82, &payload, &mut framed).expect("encode_frame"); + + let (kind, decoded_payload, _) = decode_frame(&framed).expect("decode_frame"); + prop_assert_eq!(kind, 0x82u8); + let decoded = decode_trade_print(decoded_payload).expect("decode_trade_print"); + prop_assert_eq!(decoded, original); + } + } + + #[test] + fn rejects_wrong_size() { + let buf = [0u8; TRADE_PRINT_SIZE - 1]; + assert!(matches!( + decode_trade_print(&buf), + Err(WireError::InvalidPayload(_)) + )); + } +}