diff --git a/docs/architecture/ADR-001-static-allocations.md b/docs/architecture/ADR-001-static-allocations.md new file mode 100644 index 00000000..df65003c --- /dev/null +++ b/docs/architecture/ADR-001-static-allocations.md @@ -0,0 +1,80 @@ +# ADR-001: Static Allocations via Box::leak + +## Status + +Accepted + +## Context + +The graph-gateway uses Axum as its HTTP framework. Axum's state management requires types to implement `Clone` and have `'static` lifetime. Several gateway components are heavyweight singletons that: + +1. Are initialized once at startup +2. Never need to be deallocated (process lifetime) +3. Are expensive to clone (contain channels, cryptographic keys, etc.) + +These components include: + +- `ReceiptSigner` - TAP receipt signing with private keys +- `Budgeter` - PID controller state for fee management +- `Chains` - Chain head tracking with per-chain state +- `Eip712Domain` (attestation domains) - EIP-712 signing domains + +## Decision + +Use `Box::leak()` to convert owned `Box` into `&'static T` references for singleton components. + +```rust +// Example from main.rs +let receipt_signer: &'static ReceiptSigner = Box::leak(Box::new(ReceiptSigner::new(...))); + +let chains: &'static Chains = Box::leak(Box::new(Chains::new(...))); +``` + +## Consequences + +### Positive + +1. **Zero-cost sharing**: `&'static T` is `Copy`, so passing to handlers has no overhead +2. **No Arc overhead**: Avoids atomic reference counting on every request +3. **Simpler lifetimes**: No need to propagate lifetime parameters through handler types +4. **Explicit intent**: Makes it clear these are process-lifetime singletons + +### Negative + +1. **Memory never freed**: The leaked memory is never reclaimed. Acceptable because: + - Components live for the entire process lifetime anyway + - Total leaked memory is small and bounded (< 1 KB) + - Process termination reclaims all memory + +2. **Not suitable for tests**: Tests that need fresh state must use different patterns. Currently mitigated by limited test coverage. + +## Alternatives Considered + +### `Arc` (Rejected) + +```rust +let receipt_signer: Arc = Arc::new(ReceiptSigner::new(...)); +``` + +Problems: + +- Atomic operations on every clone (per-request overhead) +- More complex to share across Axum handlers +- Implies shared ownership when sole ownership is the intent + +### `once_cell::sync::Lazy` (Rejected) + +```rust +static RECEIPT_SIGNER: Lazy = Lazy::new(|| ...); +``` + +Problems: + +- Requires initialization logic in static context +- Cannot use async initialization +- Configuration not available at static init time + +## References + +- [Axum State Documentation](https://docs.rs/axum/latest/axum/extract/struct.State.html) +- [Box::leak documentation](https://doc.rust-lang.org/std/boxed/struct.Box.html#method.leak) diff --git a/docs/architecture/ADR-002-type-state-pattern.md b/docs/architecture/ADR-002-type-state-pattern.md new file mode 100644 index 00000000..2b906ea2 --- /dev/null +++ b/docs/architecture/ADR-002-type-state-pattern.md @@ -0,0 +1,131 @@ +# ADR-002: Type-State Pattern for Indexer Processing + +## Status + +Accepted + +## Context + +Indexer information flows through multiple processing stages, with each stage enriching the data: + +1. **Raw** - Basic indexer info from network subgraph +2. **Version resolved** - After fetching indexer-service version +3. **Progress resolved** - After fetching indexing progress (block height) +4. **Cost resolved** - After fetching cost model/fee info + +Processing order matters: we need version info before we can query for progress (different API versions), and we need progress before cost resolution makes sense (stale indexers are filtered). + +A naive approach would use `Option` fields that get populated: + +```rust +struct IndexingInfo { + indexer: IndexerId, + deployment: DeploymentId, + version: Option, // Filled in stage 2 + progress: Option, // Filled in stage 3 + fee: Option, // Filled in stage 4 +} +``` + +This leads to `unwrap()` calls throughout the codebase and runtime errors when accessing fields before they're populated. + +## Decision + +Use the type-state pattern with generic parameters to encode processing stage at compile time. + +```rust +// Type markers for processing stages +struct Unresolved; +struct VersionResolved(Version); +struct ProgressResolved { version: Version, block: BlockNumber } +struct FullyResolved { version: Version, block: BlockNumber, fee: GRT } + +// Generic struct parameterized by stage +struct IndexingInfo { + indexer: IndexerId, + deployment: DeploymentId, + stage: Stage, +} + +// Stage transitions are explicit methods +impl IndexingInfo { + fn resolve_version(self, version: Version) -> IndexingInfo { + IndexingInfo { + indexer: self.indexer, + deployment: self.deployment, + stage: VersionResolved(version), + } + } +} +``` + +See `src/network/indexer_processing.rs` for the actual implementation. + +## Consequences + +### Positive + +1. **Compile-time safety**: Impossible to access version info before it's resolved +2. **Self-documenting**: Function signatures show required processing stage +3. **No runtime overhead**: Type parameters are erased at compile time +4. **Explicit transitions**: Stage changes are visible method calls, not silent mutations + +### Negative + +1. **Verbose types**: `IndexingInfo` is longer than `IndexingInfo` +2. **Learning curve**: Pattern is less common, may confuse new contributors +3. **More boilerplate**: Stage transition methods must be written explicitly + +## Pattern Usage + +```rust +// Functions declare their required stage in the signature +fn select_candidate(info: &IndexingInfo) -> Score { + // Safe to access info.stage.fee - compiler guarantees it exists + calculate_score(info.stage.fee, info.stage.block) +} + +// Processing pipeline +async fn process_indexer(raw: IndexingInfo) -> Result> { + let with_version = raw.resolve_version(fetch_version(&raw.indexer).await?); + let with_progress = with_version.resolve_progress(fetch_progress(&with_version).await?); + let fully_resolved = with_progress.resolve_cost(fetch_cost(&with_progress).await?); + Ok(fully_resolved) +} +``` + +## Alternatives Considered + +### Builder Pattern (Rejected) + +```rust +IndexingInfoBuilder::new(indexer, deployment) + .version(v) + .progress(p) + .fee(f) + .build() +``` + +Problems: + +- Runtime validation only +- `build()` must check all fields are set +- No compile-time guarantee of processing order + +### Separate Structs (Rejected) + +```rust +struct RawIndexingInfo { ... } +struct ResolvedIndexingInfo { ... } +``` + +Problems: + +- Code duplication across struct definitions +- Harder to share common logic +- Type relationships not explicit + +## References + +- [Typestate Pattern in Rust](https://cliffle.com/blog/rust-typestate/) +- [Parse, don't validate](https://lexi-lambda.github.io/blog/2019/11/05/parse-don-t-validate/) diff --git a/docs/architecture/ADR-003-pid-budget-controller.md b/docs/architecture/ADR-003-pid-budget-controller.md new file mode 100644 index 00000000..657bc999 --- /dev/null +++ b/docs/architecture/ADR-003-pid-budget-controller.md @@ -0,0 +1,149 @@ +# ADR-003: PID Controller for Fee Budget Management + +## Status + +Accepted + +## Context + +The gateway must manage query fee budgets to balance: + +1. **Cost efficiency** - Minimize fees paid to indexers +2. **Query success rate** - Ensure queries succeed by offering competitive fees +3. **Responsiveness** - Adapt quickly to market conditions + +Static fee budgets fail because: + +- Too low: Indexers reject queries, degraded service +- Too high: Overpaying, wasted budget +- Market conditions change: Indexer fees fluctuate based on demand + +We need a dynamic system that automatically adjusts fee budgets based on observed success rates. + +## Decision + +Implement a PID (Proportional-Integral-Derivative) controller to dynamically adjust fee budgets based on query success rate. + +### PID Controller Overview + +The PID controller continuously adjusts the fee budget using three terms: + +``` +adjustment = Kp * error + Ki * integral + Kd * derivative + +where: + error = target_success_rate - actual_success_rate + integral = sum of past errors + derivative = rate of error change +``` + +- **P (Proportional)**: Immediate response to current error +- **I (Integral)**: Corrects persistent bias over time +- **D (Derivative)**: Dampens oscillations, smooths response + +### Implementation + +See `src/budgets.rs` for implementation: + +```rust +pub struct Budgeter { + controller: PidController, + decay_buffer: DecayBuffer, + budget_per_query: f64, +} + +impl Budgeter { + pub fn feedback(&self, success: bool) { + self.decay_buffer.record(success); + let success_rate = self.decay_buffer.success_rate(); + let adjustment = self.controller.update(success_rate); + self.budget_per_query *= adjustment; + } +} +``` + +### Decay Buffer + +Success rate is calculated using exponential decay to weight recent observations more heavily: + +``` +weighted_sum = sum(success_i * decay^i) +weighted_count = sum(decay^i) +success_rate = weighted_sum / weighted_count +``` + +This provides: + +- Fast response to changing conditions +- Natural forgetting of stale data +- Bounded memory usage + +## Consequences + +### Positive + +1. **Self-tuning**: Budget automatically converges to optimal level +2. **Adaptive**: Responds to market changes without manual intervention +3. **Stable**: PID controllers are well-understood and tuneable +4. **Observable**: Budget changes can be monitored via metrics + +### Negative + +1. **Tuning required**: PID gains (Kp, Ki, Kd) must be tuned for the system +2. **Oscillation risk**: Poorly tuned controller can oscillate +3. **Complexity**: More complex than static budgets +4. **Cold start**: Initial budget must be set heuristically + +## Tuning Parameters + +Current parameters (may need adjustment based on production data): + +| Parameter | Value | Purpose | +| --------- | ----- | ----------------------------------------- | +| Kp | 0.1 | Proportional gain - immediate response | +| Ki | 0.01 | Integral gain - bias correction | +| Kd | 0.05 | Derivative gain - oscillation damping | +| Target | 0.95 | Target success rate (95%) | +| Decay | 0.99 | Decay factor for success rate calculation | + +## Alternatives Considered + +### Static Budget (Rejected) + +```rust +const BUDGET_PER_QUERY: GRT = GRT::from_wei(1_000_000); +``` + +Problems: + +- Cannot adapt to market conditions +- Requires manual intervention to change +- Either overpays or fails queries + +### Threshold-based Adjustment (Rejected) + +```rust +if success_rate < 0.9 { budget *= 1.1; } +if success_rate > 0.95 { budget *= 0.9; } +``` + +Problems: + +- Oscillates around thresholds +- Step changes cause instability +- No derivative term to dampen oscillations + +### Machine Learning Model (Rejected) + +Train a model to predict optimal budget based on features. + +Problems: + +- Requires training data +- Black box behavior +- Overkill for this use case + +## References + +- [PID Controller (Wikipedia)](https://en.wikipedia.org/wiki/PID_controller) +- [Control Theory for Software Engineers](https://blog.acolyer.org/2015/05/01/feedback-control-for-computer-systems/) diff --git a/src/auth.rs b/src/auth.rs index d54e47a7..5258c407 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -1,3 +1,32 @@ +//! API Key Authentication +//! +//! Handles API key validation, payment status checks, and domain authorization. +//! +//! # Authentication Flow +//! +//! 1. Extract API key from `Authorization: Bearer ` header +//! 2. Parse and validate key format (32-char hex string → 16 bytes) +//! 3. Look up key in `api_keys` map (from Studio API or Kafka) +//! 4. Check payment status (`QueryStatus::Active`, `ServiceShutoff`, `MonthlyCapReached`) +//! 5. Verify origin domain against authorized domains list +//! 6. Return [`AuthSettings`] with user address and authorized subgraphs +//! +//! # Special API Keys +//! +//! Keys in `special_api_keys` bypass payment checks. Used for admin/monitoring. +//! +//! # Domain Authorization +//! +//! The `domains` field supports wildcards: +//! - `"example.com"` → exact match only +//! - `"*.example.com"` → matches `foo.example.com`, `bar.example.com` +//! - Empty list → all domains authorized +//! +//! # API Key Sources +//! +//! - [`studio_api`]: Poll HTTP endpoint periodically +//! - [`kafka`]: Stream updates from Kafka topic + pub mod kafka; pub mod studio_api; diff --git a/src/auth/README.md b/src/auth/README.md new file mode 100644 index 00000000..e8d1f8ab --- /dev/null +++ b/src/auth/README.md @@ -0,0 +1,78 @@ +# Auth Module + +API key authentication and authorization. + +## Module Overview + +| File | Purpose | +| --------------- | ------------------------------------------------- | +| `mod.rs` | `AuthContext`, `AuthSettings`, API key validation | +| `studio_api.rs` | Fetch API keys from HTTP endpoint | +| `kafka.rs` | Stream API keys from Kafka topic | + +## Authentication Flow + +``` +Request + | + v +Extract API key from Authorization header + | + v +Validate format (32-char hex string) + | + v +Special key? --> Skip payment check + | + v +Look up in api_keys map + | + v +Check QueryStatus (Active/ServiceShutoff/MonthlyCapReached) + | + v +Verify domain authorization + | + v +Return AuthSettings +``` + +## Key Types + +- `AuthContext` - Shared auth state (api_keys map, special keys, payment_required flag) +- `AuthSettings` - Per-request auth info (key, user, authorized subgraphs) +- `ApiKey` - API key definition with permissions +- `QueryStatus` - Payment status enum + +## API Key Sources + +### Studio API (`studio_api.rs`) + +Polls an HTTP endpoint periodically to fetch API keys. + +```json +{ + "url": "https://api.example.com/gateway-api-keys", + "auth": "Bearer ", + "special": ["admin-key-1"] +} +``` + +### Kafka (`kafka.rs`) + +Streams API key updates from a Kafka topic. + +```json +{ + "topic": "gateway-api-keys", + "special": ["admin-key-1"] +} +``` + +## Domain Authorization + +The `domains` field on API keys supports: + +- Exact match: `"example.com"` +- Wildcard: `"*.example.com"` (matches `foo.example.com`) +- Empty list: all domains authorized diff --git a/src/block_constraints.rs b/src/block_constraints.rs index c501603a..191c4c77 100644 --- a/src/block_constraints.rs +++ b/src/block_constraints.rs @@ -1,3 +1,59 @@ +//! Block Constraint Resolution +//! +//! Parses GraphQL queries to extract block constraints and rewrites queries +//! to include gateway probe metadata for chain head tracking. +//! +//! # Block Constraint Types +//! +//! GraphQL queries can specify which block to query against using the `block` argument: +//! +//! | Constraint | GraphQL Example | Behavior | +//! |------------|-----------------|----------| +//! | `Unconstrained` | `{ tokens { id } }` | Use any recent block | +//! | `Number` | `{ tokens(block: {number: 100}) { ... } }` | Exact block number required | +//! | `Hash` | `{ tokens(block: {hash: "0x..."}) { ... } }` | Exact block by hash required | +//! | `NumberGTE` | `{ tokens(block: {number_gte: 100}) { ... } }` | Block >= N | +//! +//! # Query Rewriting +//! +//! The gateway injects a `_gateway_probe_` field into queries to capture the actual +//! block used by the indexer. This probe is used for: +//! +//! - Chain head tracking (updating known block numbers) +//! - Indexer performance feedback +//! - Block constraint validation +//! +//! ```text +//! # Original query +//! { tokens { id } } +//! +//! # Rewritten query (via rewrite_query) +//! { +//! tokens { id } +//! _gateway_probe_: _meta { block { hash number timestamp } } +//! } +//! ``` +//! +//! The probe response is stripped from the client response by [`IndexerClient`]. +//! +//! # Block Requirements +//! +//! The [`resolve_block_requirements`] function analyzes a query to determine: +//! +//! - `range`: For exact constraints (`number`, `hash`), the min/max block range needed +//! - `number_gte`: Maximum `number_gte` constraint (if any) +//! - `latest`: Whether the query benefits from using the latest block +//! +//! This information is used by indexer selection to filter candidates that don't +//! have the required blocks indexed. +//! +//! # Validation +//! +//! Queries are rejected if they request blocks before the subgraph's `startBlock` +//! from the manifest (for exact constraints only - `number_gte: 0` is always allowed). +//! +//! [`IndexerClient`]: crate::indexer_client::IndexerClient + use std::{ collections::{BTreeMap, BTreeSet}, fmt::Write as _, diff --git a/src/blocks.rs b/src/blocks.rs index 738f9fff..6840b3c8 100644 --- a/src/blocks.rs +++ b/src/blocks.rs @@ -1,3 +1,7 @@ +//! Block Type Definitions +//! +//! Core types for representing blockchain blocks and query constraints. + use serde::Deserialize; use thegraph_core::alloy::primitives::{BlockHash, BlockNumber, BlockTimestamp}; diff --git a/src/budgets.rs b/src/budgets.rs index 71fdb324..5859235c 100644 --- a/src/budgets.rs +++ b/src/budgets.rs @@ -1,3 +1,62 @@ +//! Query Fee Budget Management +//! +//! Implements a PID controller to dynamically adjust minimum indexer fees +//! to hit a target average fee per query. +//! +//! # Overview +//! +//! The gateway has a `query_fees_target` configuration (e.g., $0.0001 per query). +//! This module adjusts the `min_indexer_fees` parameter in real-time to achieve +//! that target across all queries. +//! +//! # PID Controller +//! +//! Uses an integral-only controller (PI controller with k_p=0, k_d=0): +//! +//! ```text +//! error = (target - actual) / target +//! integral = sum of error over recent history (with decay) +//! control_variable = integral * k_i * target +//! ``` +//! +//! Where: +//! - `k_i = 0.2` (integral gain) +//! - `target` = configured `query_fees_target` +//! - `actual` = average fees from recent queries +//! +//! The control variable (`min_indexer_fees`) is clamped to `[10e-6, target]`. +//! +//! # Decay Buffer +//! +//! Historical error values are stored in a 6-frame [`DecayBuffer`] with exponential +//! decay factor 4. This weights recent samples more heavily while maintaining +//! some memory of past behavior. +//! +//! The decay formula for frame `i`: +//! ```text +//! frame[i] = frame[i] * (1 - 4^(-i)) * decay + frame[i-1] * 4^(-(i-1)) * decay +//! ``` +//! +//! # Actor Model +//! +//! The budgeter runs as a background actor that: +//! +//! 1. Receives fee feedback via [`Budgeter::feedback`] channel after each query +//! 2. Every second, calculates the new `min_indexer_fees` value +//! 3. Publishes the new value via [`Budgeter::min_indexer_fees`] watch channel +//! +//! # Usage +//! +//! ```ignore +//! let budgeter = Budgeter::new(USD(query_fees_target)); +//! +//! // After each query, report the fees paid +//! budgeter.feedback.send(USD(fees_paid)); +//! +//! // Read current minimum fees when selecting indexers +//! let min_fee = *budgeter.min_indexer_fees.borrow(); +//! ``` + use std::time::Duration; use ordered_float::NotNan; diff --git a/src/bytes.rs b/src/bytes.rs index 1967944e..b9d60746 100644 --- a/src/bytes.rs +++ b/src/bytes.rs @@ -1,3 +1,13 @@ +//! Byte Manipulation Utilities +//! +//! Provides the [`concat_bytes!`] macro for concatenating byte slices at compile time. +//! +//! # Example +//! +//! ```ignore +//! let signature = concat_bytes!(65, [&[v], &r[..], &s[..]]); +//! ``` + // See https://doc.rust-lang.org/std/macro.concat_bytes.html #[macro_export] macro_rules! concat_bytes { diff --git a/src/chain.rs b/src/chain.rs index 63670918..38ac0842 100644 --- a/src/chain.rs +++ b/src/chain.rs @@ -1,3 +1,27 @@ +//! Single Chain Head Tracking +//! +//! Tracks recent blocks for a single blockchain, determining consensus among indexers. +//! +//! # Consensus Algorithm +//! +//! The [`Chain::consensus_blocks`] method returns blocks with simple majority consensus: +//! - At each block number, count indexers reporting each block hash +//! - If one hash has strictly more indexers than all other hashes at that number, +//! it's considered the consensus block +//! - Used to determine the "true" chain head among potentially forking indexers +//! +//! # Block Storage +//! +//! - Stores up to 512 blocks ([`MAX_LEN`]) +//! - When full, evicts blocks with the lowest block number +//! - Each block tracks which indexers reported it +//! +//! # Block Production Rate +//! +//! The [`Chain::blocks_per_minute`] method estimates block production rate from +//! the timestamp difference between oldest and newest consensus blocks. +//! Defaults to 6 blocks/minute if insufficient data. + use std::{ collections::{BTreeMap, BTreeSet}, iter, diff --git a/src/chains.rs b/src/chains.rs index e65d6e1e..8d1c258c 100644 --- a/src/chains.rs +++ b/src/chains.rs @@ -1,3 +1,47 @@ +//! Multi-Chain Head Tracking +//! +//! Manages chain head tracking across all indexed blockchains. +//! +//! # Architecture +//! +//! ```text +//! ┌──────────────┐ +//! │ Chains │ Main container, maps chain name → ChainReader +//! │ (with │ +//! │ aliases) │ +//! └──────┬───────┘ +//! │ +//! ▼ +//! ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ +//! │ ChainReader │ │ ChainReader │ │ ChainReader │ +//! │ "ethereum" │ │ "polygon" │ │ "arbitrum" │ +//! └──────┬───────┘ └──────────────┘ └──────────────┘ +//! │ +//! ▼ +//! ┌──────────────┐ +//! │ Actor │ Background task updating Chain state +//! │ (per chain) │ +//! └──────────────┘ +//! ``` +//! +//! # Chain Aliases +//! +//! Supports aliases like `"mainnet"` → `"ethereum"` configured in gateway config. +//! +//! # Usage +//! +//! ```ignore +//! let chains = Chains::new(aliases); +//! let eth = chains.chain("ethereum"); +//! +//! // Report a block from an indexer +//! eth.notify(block, indexer); +//! +//! // Read chain state +//! let chain = eth.read(); +//! let head = chain.latest(); +//! ``` + use std::{ collections::{BTreeMap, HashMap}, time::Duration, diff --git a/src/client_query.rs b/src/client_query.rs index 393ecb24..334e2624 100644 --- a/src/client_query.rs +++ b/src/client_query.rs @@ -1,3 +1,67 @@ +//! Client Query Handler +//! +//! This module handles GraphQL queries from clients, orchestrating the full +//! query lifecycle from request to response. +//! +//! # Query Flow +//! +//! 1. **Authorization**: Verify API key and check subgraph/deployment permissions +//! 2. **Resolution**: Map subgraph/deployment ID to available indexers via [`NetworkService`] +//! 3. **Block Requirements**: Parse query to extract block constraints (specific block, range, latest) +//! 4. **Candidate Selection**: Build list of eligible indexers filtered by: +//! - Block requirements (indexer must have required blocks) +//! - Indexing progress (how far behind chain head) +//! - Historical performance (success rate, latency) +//! - Fee constraints (within budget) +//! 5. **Query Execution**: Send queries to up to 3 indexers in parallel +//! 6. **Response Handling**: Return first successful response, or aggregate errors +//! +//! # Endpoints +//! +//! | Path | Handler | Description | +//! |------|---------|-------------| +//! | `/api/subgraphs/id/{id}` | [`handle_query`] | Query by subgraph ID (resolves to latest deployment) | +//! | `/api/deployments/id/{id}` | [`handle_query`] | Query by deployment ID directly | +//! | `/api/deployments/id/{id}/indexers/id/{indexer}` | [`handle_indexer_query`] | Query specific indexer (for cross-checking) | +//! +//! # Indexer Selection Algorithm +//! +//! The [`build_candidates_list`] function selects indexers using these criteria: +//! +//! 1. **Version Selection**: Choose the latest subgraph version where at least one indexer +//! is within 30 blocks of chain head. Falls back to latest version if none qualify. +//! +//! 2. **Block Range Filtering**: For queries with block constraints, exclude indexers +//! whose indexing progress doesn't cover the required range. +//! +//! 3. **Freshness Filtering**: For "latest block" queries, exclude indexers more than +//! 30 minutes behind chain head (if any indexers are within that threshold). +//! +//! 4. **Final Selection**: The `indexer_selection` crate scores candidates by: +//! - Success rate (historical) +//! - Latency (response time) +//! - Fee (lower is better) +//! - Stake (higher is better) +//! +//! # Fee Calculation +//! +//! ```text +//! budget = query_fees_target * grt_per_usd * 1e18 (in GRT wei) +//! min_fee = min_indexer_fees / num_selections (spread across parallel queries) +//! actual_fee = max(indexer_advertised_fee, min_fee) +//! ``` +//! +//! The gateway "over-pays" to hit the target average fee across all indexer attempts. +//! +//! # Error Handling +//! +//! - [`Error::NoIndexers`]: No indexers allocated to the subgraph/deployment +//! - [`Error::BadIndexers`]: Indexers exist but all failed (aggregates per-indexer errors) +//! - [`Error::BadQuery`]: GraphQL parsing or validation failed +//! - [`Error::SubgraphNotFound`]: Subgraph/deployment ID not found or no valid versions +//! +//! [`NetworkService`]: crate::network::NetworkService + use std::{ cmp::max, collections::{BTreeMap, HashMap}, @@ -142,7 +206,9 @@ async fn resolve_subgraph_info( Err(Error::SubgraphNotFound(anyhow!("no valid versions",))) } Ok(None) => Err(Error::SubgraphNotFound(anyhow!("{selector}",))), - Ok(Some(info)) if info.indexings.is_empty() => Err(Error::NoIndexers), + Ok(Some(info)) if info.indexings.is_empty() => { + Err(Error::NoIndexers(selector.to_string())) + } Ok(Some(info)) => Ok(info), } } @@ -154,7 +220,9 @@ async fn resolve_subgraph_info( Err(Error::SubgraphNotFound(anyhow!("no allocations",))) } Ok(None) => Err(Error::SubgraphNotFound(anyhow!("{selector}",))), - Ok(Some(info)) if info.indexings.is_empty() => Err(Error::NoIndexers), + Ok(Some(info)) if info.indexings.is_empty() => { + Err(Error::NoIndexers(selector.to_string())) + } Ok(Some(info)) => { if !auth.is_any_deployment_subgraph_authorized(&info.subgraphs) { Err(Error::Auth(anyhow!("deployment not authorized by user"))) @@ -672,7 +740,7 @@ pub async fn handle_indexer_query( let indexing = subgraph .indexings .get(&indexing_id) - .ok_or_else(|| Error::NoIndexers)? + .ok_or_else(|| Error::NoIndexers(deployment.to_string()))? .as_ref() .map_err(|err| bad_indexers(IndexerError::Unavailable(err.clone())))?; diff --git a/src/client_query/README.md b/src/client_query/README.md new file mode 100644 index 00000000..49ff8034 --- /dev/null +++ b/src/client_query/README.md @@ -0,0 +1,75 @@ +# Client Query Module + +Client GraphQL query handling and indexer selection. + +## Module Overview + +| File | Purpose | +| ------------------- | ------------------------------------------------------ | +| `mod.rs` | Query handlers, indexer selection, response processing | +| `context.rs` | `Context` struct - shared services for handlers | +| `query_selector.rs` | Axum extractor for subgraph/deployment IDs | + +## Query Endpoints + +| Path | Handler | Description | +| ------------------------------------------------ | ---------------------- | ---------------------- | +| `/api/subgraphs/id/{id}` | `handle_query` | Query by subgraph ID | +| `/api/deployments/id/{id}` | `handle_query` | Query by deployment ID | +| `/api/deployments/id/{id}/indexers/id/{indexer}` | `handle_indexer_query` | Query specific indexer | + +Legacy paths (with API key in URL) are also supported via middleware. + +## Query Flow + +``` +1. Authorization + - Validate API key + - Check subgraph permissions + +2. Resolution + - NetworkService.resolve_with_*() + - Get available indexers + +3. Block Requirements + - Parse query for block constraints + - Filter indexers by block availability + +4. Candidate Selection + - Score by: success_rate, latency, fee, stake + - Select top 3 candidates + +5. Query Execution + - Send to up to 3 indexers in parallel + - Return first successful response + +6. Response Handling + - Strip _gateway_probe_ field + - Report to Kafka + - Update performance tracking +``` + +## Indexer Selection Algorithm + +See `build_candidates_list` function: + +1. Choose deployment version where indexers are within 30 blocks of chain head +2. Filter by block requirements (exact number, hash, or number_gte) +3. Filter by freshness (exclude indexers >30min behind for "latest" queries) +4. Score and select via `indexer_selection` crate + +## Context Struct + +The `Context` struct holds all services needed for query processing: + +| Field | Type | Purpose | +| -------------------- | ---------------------------- | ------------------------------- | +| `indexer_client` | `IndexerClient` | HTTP client for indexer queries | +| `receipt_signer` | `&'static ReceiptSigner` | TAP receipt signing | +| `budgeter` | `&'static Budgeter` | Fee budget management | +| `grt_per_usd` | `watch::Receiver<...>` | Exchange rate | +| `chains` | `&'static Chains` | Chain head tracking | +| `network` | `NetworkService` | Subgraph resolution | +| `indexing_perf` | `IndexingPerformance` | Performance tracking | +| `attestation_domain` | `&'static Eip712Domain` | Attestation verification | +| `reporter` | `mpsc::UnboundedSender<...>` | Kafka reporting | diff --git a/src/client_query/context.rs b/src/client_query/context.rs index 644c67f9..c5fa2bdb 100644 --- a/src/client_query/context.rs +++ b/src/client_query/context.rs @@ -1,3 +1,15 @@ +//! Query Handler Context +//! +//! Shared context passed to all query handlers via Axum state. +//! +//! # Lifetime Requirements +//! +//! Several fields use `&'static` references because Axum's state must be +//! `Clone + Send + Sync + 'static`. These are singletons initialized once +//! at startup and never deallocated (via `Box::leak`). +//! +//! See [`main`](crate::main) module documentation for rationale. + use ordered_float::NotNan; use thegraph_core::alloy::dyn_abi::Eip712Domain; use tokio::sync::{mpsc, watch}; diff --git a/src/client_query/query_selector.rs b/src/client_query/query_selector.rs index 54c5097b..6d993666 100644 --- a/src/client_query/query_selector.rs +++ b/src/client_query/query_selector.rs @@ -1,3 +1,16 @@ +//! Query Selector Extraction +//! +//! Axum extractor for parsing subgraph/deployment IDs from URL paths. +//! +//! # Supported Paths +//! +//! - `/api/subgraphs/id/{subgraph_id}` → `QuerySelector::Subgraph` +//! - `/api/deployments/id/{deployment_id}` → `QuerySelector::Deployment` +//! +//! # Error Handling +//! +//! Invalid IDs return a GraphQL error response (HTTP 200 with error body). + use std::collections::HashMap; use anyhow::anyhow; diff --git a/src/config.rs b/src/config.rs index f9244072..9ae6ab07 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,3 +1,72 @@ +//! Gateway Configuration +//! +//! JSON-based configuration loaded at startup from a file path passed as a CLI argument. +//! +//! # Example Configuration +//! +//! ```json +//! { +//! "api_keys": { +//! "url": "https://api.example.com/gateway-api-keys", +//! "auth": "Bearer ", +//! "special": ["admin-key-1"] +//! }, +//! "attestations": { +//! "chain_id": "42161", +//! "dispute_manager": "0x...", +//! "legacy_dispute_manager": "0x..." +//! }, +//! "blocklist": [ +//! { "deployment": "Qm...", "public_poi": "0x...", "block": 12345678 }, +//! { "deployment": "Qm...", "indexer": "0x..." } +//! ], +//! "chain_aliases": { "mainnet": "ethereum" }, +//! "exchange_rate_provider": "https://eth-mainnet.g.alchemy.com/v2/", +//! "graph_env_id": "production", +//! "ip_blocker_db": "/path/to/ip-blocklist.csv", +//! "kafka": { +//! "bootstrap.servers": "localhost:9092", +//! "security.protocol": "SASL_SSL" +//! }, +//! "log_json": true, +//! "min_graph_node_version": "0.35.0", +//! "min_indexer_version": "1.0.0", +//! "trusted_indexers": [ +//! { "url": "https://indexer.example.com/", "auth": "Bearer " } +//! ], +//! "payment_required": true, +//! "port_api": 8000, +//! "port_metrics": 8001, +//! "query_fees_target": 0.0001, +//! "receipts": { +//! "chain_id": "42161", +//! "payer": "0x...", +//! "signer": "0x", +//! "verifier": "0x...", +//! "legacy_verifier": "0x..." +//! }, +//! "subgraph_service": "0x..." +//! } +//! ``` +//! +//! # API Keys Configuration +//! +//! The `api_keys` field supports three variants: +//! +//! 1. **Endpoint**: Fetch from HTTP endpoint (polls periodically) +//! 2. **KafkaTopic**: Stream from Kafka topic (note: typo `KakfaTopic` preserved for compatibility) +//! 3. **Fixed**: Static list of API keys for testing +//! +//! # IP Blocklist +//! +//! The optional `ip_blocker_db` field points to a CSV file with format: +//! ```csv +//! 192.168.1.0/24,US +//! 10.0.0.0/8,Internal +//! ``` +//! +//! Only the IP network (first column) is used; the country/label is ignored. + use std::{ collections::{BTreeMap, HashSet}, path::{Path, PathBuf}, @@ -35,7 +104,7 @@ pub struct Config { pub graph_env_id: String, /// File path of CSV containing rows of `IpNetwork,Country` pub ip_blocker_db: Option, - /// See https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md + /// See #[serde(default)] pub kafka: KafkaConfig, /// Format log output as JSON diff --git a/src/errors.rs b/src/errors.rs index feaef679..29bea56e 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,3 +1,36 @@ +//! Client-Facing Error Types +//! +//! Defines error types returned to GraphQL clients and internal error tracking. +//! +//! # Error Hierarchy +//! +//! ```text +//! Error (client-facing) +//! ├── Internal(anyhow::Error) -- Unexpected server errors +//! ├── Auth(anyhow::Error) -- Authentication/authorization failures +//! ├── SubgraphNotFound(anyhow::Error) -- Subgraph/deployment not found +//! ├── BadQuery(anyhow::Error) -- Invalid GraphQL query +//! ├── NoIndexers(String) -- No indexers allocated (includes query selector) +//! └── BadIndexers(IndexerErrors) -- All indexers failed +//! │ +//! └── IndexerError +//! ├── Unavailable(UnavailableReason) +//! ├── Timeout +//! └── BadResponse(String) +//! ``` +//! +//! # Response Format +//! +//! All errors implement [`IntoResponse`] and are serialized as GraphQL errors: +//! +//! ```json +//! { +//! "errors": [{ "message": "auth error: API key not found" }] +//! } +//! ``` +//! +//! [`IntoResponse`]: axum::response::IntoResponse + use std::{ collections::BTreeMap, fmt::{self, Write as _}, @@ -24,8 +57,8 @@ pub enum Error { #[error("bad query: {0:#}")] BadQuery(anyhow::Error), /// There are no indexers allocated to the requested subgraph or deployment. - #[error("no indexers found")] - NoIndexers, + #[error("no indexers found for {0}")] + NoIndexers(String), /// Indexers are available, but failed to return a suitable result. #[error("bad indexers: {0}")] BadIndexers(IndexerErrors), diff --git a/src/exchange_rate.rs b/src/exchange_rate.rs index 05d2a73b..9f423f7a 100644 --- a/src/exchange_rate.rs +++ b/src/exchange_rate.rs @@ -1,3 +1,25 @@ +//! GRT/USD Exchange Rate +//! +//! Fetches the GRT/USD exchange rate from Chainlink price feeds. +//! +//! # Data Source +//! +//! Uses the Chainlink GRT/USD price feed on Arbitrum: +//! - Contract: `0x0F38D86FceF4955B705F35c9e41d1A16e0637c73` +//! - Feed: +//! +//! # Update Frequency +//! +//! Polls the price feed every 60 seconds. The rate is inverted from USD/GRT +//! (Chainlink format) to GRT/USD (used for fee calculations). +//! +//! # Usage +//! +//! ```ignore +//! let grt_per_usd = exchange_rate::grt_per_usd(rpc_url).await; +//! let rate = *grt_per_usd.borrow(); +//! ``` + use std::time::Duration; use ChainlinkPriceFeed::ChainlinkPriceFeedInstance; diff --git a/src/graphql.rs b/src/graphql.rs index e03c2d0b..a3f51a82 100644 --- a/src/graphql.rs +++ b/src/graphql.rs @@ -1,3 +1,21 @@ +//! GraphQL Response Utilities +//! +//! Helpers for creating GraphQL-compliant error responses. +//! +//! # Response Format +//! +//! All errors are returned as HTTP 200 with GraphQL error body: +//! +//! ```json +//! { +//! "data": null, +//! "errors": [{ "message": "error message here" }] +//! } +//! ``` +//! +//! This follows the GraphQL spec where transport errors (HTTP 4xx/5xx) are +//! reserved for network issues, while query errors are returned as GraphQL errors. + use axum::http::{Response, StatusCode}; use headers::ContentType; use thegraph_graphql_http::http::response::{IntoError as IntoGraphqlResponseError, ResponseBody}; diff --git a/src/indexer_client.rs b/src/indexer_client.rs index 104ae2dc..8294a2ec 100644 --- a/src/indexer_client.rs +++ b/src/indexer_client.rs @@ -1,3 +1,31 @@ +//! Indexer HTTP Client +//! +//! Sends GraphQL queries to indexers and processes their responses. +//! +//! # Query Flow +//! +//! 1. Build HTTP POST request to `{indexer_url}/subgraphs/id/{deployment}` +//! 2. Attach auth header (`tap-receipt` for paid, `Authorization` for free) +//! 3. Send request with 20s timeout (configured in main.rs) +//! 4. Parse response payload containing `graphQLResponse` and `attestation` +//! 5. Rewrite response: strip `_gateway_probe_` field, extract block info +//! 6. Verify attestation signature (for paid queries) +//! 7. Return [`IndexerResponse`] with client response and metadata +//! +//! # Response Rewriting +//! +//! The gateway injects a `_gateway_probe_` field into queries (see [`block_constraints`]). +//! This field is stripped from the client response but used to extract the actual +//! block number/hash/timestamp for chain head tracking. +//! +//! # Error Detection +//! +//! - **Timeout**: Request exceeded timeout +//! - **BadResponse**: HTTP error, parse error, or unattestable response +//! - **MissingBlock**: Indexer doesn't have the requested block (parsed from error message) +//! +//! [`block_constraints`]: crate::block_constraints + use http::{StatusCode, header::CONTENT_TYPE}; use reqwest::header::AUTHORIZATION; use serde::{Deserialize, Serialize}; diff --git a/src/indexing_performance.rs b/src/indexing_performance.rs index 9f9e48aa..a08145c8 100644 --- a/src/indexing_performance.rs +++ b/src/indexing_performance.rs @@ -1,3 +1,51 @@ +//! Indexer Performance Tracking +//! +//! Tracks success rates and latency for each indexer+deployment pair to inform +//! indexer selection decisions. +//! +//! # Data Model +//! +//! For each `(IndexerId, DeploymentId)` pair, tracks: +//! - `response`: Success rate and latency statistics (via `indexer_selection::Performance`) +//! - `latest_block`: Most recent block number seen from this indexer +//! +//! # Double Buffer Pattern +//! +//! Uses a double-buffer ([`DoubleBuffer`]) to allow lock-free reads: +//! +//! ```text +//! ┌──────────────┐ ┌──────────────┐ +//! │ Buffer A │◄────│ Readers │ (try_read on both) +//! │ (RwLock) │ │ │ +//! └──────────────┘ └──────────────┘ +//! ┌──────────────┐ +//! │ Buffer B │◄────Writer (updates both sequentially) +//! │ (RwLock) │ +//! └──────────────┘ +//! ``` +//! +//! The writer updates both buffers, readers try to acquire a read lock on either. +//! This guarantees readers never block for long. +//! +//! # Update Sources +//! +//! 1. **Query Feedback**: After each indexer query, report success/failure and latency +//! 2. **Network Updates**: When `NetworkService` publishes new indexing progress +//! 3. **Decay Timer**: Every second, decay historical statistics +//! +//! # Usage +//! +//! ```ignore +//! let perf = IndexingPerformance::new(network); +//! +//! // Report query result +//! perf.feedback(indexer, deployment, success, latency_ms, latest_block); +//! +//! // Read latest performance data +//! let snapshots = perf.latest(); +//! let snapshot = snapshots.get(&(indexer, deployment)); +//! ``` + use std::{collections::HashMap, ops::Deref, time::Duration}; use parking_lot::RwLock; diff --git a/src/kafka.rs b/src/kafka.rs index c6195d1a..f2fd4f42 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -1,3 +1,16 @@ +//! Kafka Consumer Utilities +//! +//! Helpers for consuming data from Kafka topics. +//! +//! # Functions +//! +//! - [`assign_partitions`]: Assign all partitions of a topic to a consumer +//! - [`latest_timestamp`]: Get the timestamp of the most recent message +//! +//! Used by [`auth::kafka`] for API key streaming. +//! +//! [`auth::kafka`]: crate::auth::kafka + use std::time::Duration; use anyhow::{Context as _, anyhow}; diff --git a/src/main.rs b/src/main.rs index 21716633..ffaa6904 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,46 @@ +//! Graph Gateway Entry Point +//! +//! Initializes and starts the graph-gateway service, routing GraphQL queries +//! from clients to Graph Network indexers. +//! +//! # Initialization Sequence +//! +//! 1. Load configuration from JSON file (path from CLI argument) +//! 2. Initialize receipt signer from config and set up logging +//! 3. Set up HTTP client with 20-second timeout +//! 4. Initialize exchange rate provider (RPC or fixed value) +//! 5. Create attestation EIP-712 domains for signature verification +//! 6. Initialize network topology service and wait for initial data +//! 7. Initialize auth service from API keys source (endpoint, Kafka, or fixed) +//! 8. Create query budgeter with PID controller for fee management +//! 9. Start metrics server on separate port +//! 10. Start main API server with CORS and auth middleware +//! +//! # Static Allocations +//! +//! Several components use `Box::leak()` to create `&'static` references: +//! +//! - `attestation_domain` / `legacy_attestation_domain`: EIP-712 domains for attestation +//! verification. Static because they're immutable config derived from chain ID and +//! dispute manager address. +//! +//! - `receipt_signer`: TAP receipt signing service. Static because it holds the signing +//! key and is used by all query handlers. +//! +//! - `budgeter`: Fee budget controller. Static because it maintains state across all +//! requests and runs a background task. +//! +//! - `chains`: Chain head tracking. Static because it aggregates block info from all +//! query responses. +//! +//! This pattern is intentional: these are singletons that must outlive Axum's state +//! lifetime requirements and are never deallocated during the gateway's lifetime. +//! +//! # Graceful Shutdown +//! +//! The gateway handles SIGINT and SIGTERM for graceful shutdown. The Axum server +//! stops accepting new connections and waits for in-flight requests to complete. + mod auth; mod block_constraints; mod blocks; @@ -64,93 +107,51 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; #[tokio::main] async fn main() { - let conf_path = env::args() - .nth(1) - .expect("Missing argument for config path") - .parse::() - .unwrap(); - let conf = config::load_from_file(&conf_path).expect("Failed to load config"); + // Phase 1: Load configuration + let conf = load_configuration(); - let receipt_signer = PrivateKeySigner::from_bytes(&conf.receipts.signer) + // Phase 2: Initialize signing key and logging + let receipt_key = PrivateKeySigner::from_bytes(&conf.receipts.signer) .expect("failed to prepare receipt signer"); - let signer_address = receipt_signer.address(); - + let signer_address = receipt_key.address(); init_logging("graph-gateway", conf.log_json); - tracing::info!("gateway ID: {:?}", signer_address); + tracing::info!("gateway ID: {signer_address:?}"); + // Set up termination handler (will be replaced by graceful shutdown) let setup_termination = tokio::spawn(async { await_shutdown_signals().await; tracing::warn!("shutdown"); std::process::exit(1); }); - let http_client = reqwest::Client::builder() - .timeout(Duration::from_secs(20)) - .build() - .unwrap(); + // Phase 3: Initialize core services + let http_client = init_http_client(); let kafka_consumer = KafkaConsumer::new(conf.kafka.clone()); + let grt_per_usd = init_exchange_rate(&conf.exchange_rate_provider).await; - let grt_per_usd = match conf.exchange_rate_provider { - ExchangeRateProvider::Fixed(grt_per_usd) => watch::channel(grt_per_usd).1, - ExchangeRateProvider::Rpc(url) => exchange_rate::grt_per_usd(url).await, - }; - - let attestation_domain: &'static Eip712Domain = - Box::leak(Box::new(attestation::eip712_domain( - conf.attestations - .chain_id - .parse::() - .expect("failed to parse attestation domain chain_id"), - conf.attestations.dispute_manager, - ))); - - let legacy_attestation_domain: &'static Eip712Domain = - Box::leak(Box::new(attestation::eip712_domain( - conf.attestations - .chain_id - .parse::() - .expect("failed to parse attestation domain chain_id"), - conf.attestations.legacy_dispute_manager, - ))); + // Phase 4: Create attestation domains (static for Axum state lifetime) + let (attestation_domain, legacy_attestation_domain) = + init_attestation_domains(&conf.attestations); + // Phase 5: Initialize network topology service let indexer_client = IndexerClient { client: http_client.clone(), }; - let network_subgraph_client = SubgraphClient { - client: indexer_client.clone(), - indexers: conf.trusted_indexers, - latest_block: None, - page_size: 500, - max_lag_seconds: conf.network_subgraph_max_lag_seconds, - }; - let indexer_host_blocklist = match &conf.ip_blocker_db { - Some(path) => { - config::load_ip_blocklist_from_file(path).expect("failed to load IP blocker DB") - } - None => Default::default(), - }; - let indexer_blocklist = - indexer_blocklist::Blocklist::spawn(conf.blocklist, kafka_consumer.clone()); - let mut network = network::service::spawn( - http_client.clone(), - network_subgraph_client, - indexer_blocklist.clone(), - conf.min_indexer_version, - conf.min_graph_node_version, - indexer_host_blocklist, - ); - let indexing_perf = IndexingPerformance::new(network.clone()); + let (mut network, indexer_blocklist, indexing_perf) = + init_network_service(&http_client, &indexer_client, &conf, kafka_consumer.clone()); network.wait_until_ready().await; + // Phase 6: Create receipt signer (static for Axum state lifetime) let receipt_signer: &'static ReceiptSigner = Box::leak(Box::new(ReceiptSigner::new( conf.receipts.payer, - receipt_signer, + receipt_key, conf.receipts.chain_id, conf.receipts.verifier, conf.receipts.legacy_verifier, conf.subgraph_service, ))); + // Phase 7: Initialize auth service let auth_service = init_auth_service( http_client.clone(), &kafka_consumer, @@ -160,9 +161,9 @@ async fn main() { .await .expect("failed to start auth service"); + // Phase 8: Create budget controller and reporter (static for Axum state lifetime) let budgeter: &'static Budgeter = Box::leak(Box::new(Budgeter::new(USD(conf.query_fees_target)))); - let reporter = reports::Reporter::create( signer_address, conf.graph_env_id, @@ -174,6 +175,7 @@ async fn main() { ) .unwrap(); + // Phase 9: Build query handler context let ctx = Context { indexer_client, receipt_signer, @@ -187,22 +189,130 @@ async fn main() { reporter, }; + // Phase 10: Start servers let blocklist: watch::Receiver> = indexer_blocklist.blocklist; + start_metrics_server(conf.port_metrics); + let router = build_router(ctx, auth_service, signer_address, blocklist); - // Host metrics on a separate server with a port that isn't open to public requests. - tokio::spawn(async move { - let router = Router::new().route("/metrics", routing::get(handle_metrics)); - let metrics_listener = TcpListener::bind(SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - conf.port_metrics, - )) - .await - .expect("Failed to bind metrics server"); - axum::serve(metrics_listener, router.into_make_service()) - .await - .expect("Failed to start metrics server"); - }); + // Switch to graceful shutdown via Axum + setup_termination.abort(); + start_api_server(router, conf.port_api).await; + tracing::warn!("shutdown"); +} + +// ============================================================================= +// Initialization Helpers +// ============================================================================= + +/// Load configuration from CLI argument path. +fn load_configuration() -> config::Config { + let conf_path = env::args() + .nth(1) + .expect("Missing argument for config path") + .parse::() + .unwrap(); + config::load_from_file(&conf_path).expect("Failed to load config") +} +/// Create HTTP client with 20-second timeout. +fn init_http_client() -> reqwest::Client { + reqwest::Client::builder() + .timeout(Duration::from_secs(20)) + .build() + .unwrap() +} + +/// Initialize exchange rate provider (RPC or fixed value). +async fn init_exchange_rate( + provider: &ExchangeRateProvider, +) -> watch::Receiver> { + match provider { + ExchangeRateProvider::Fixed(grt_per_usd) => watch::channel(*grt_per_usd).1, + ExchangeRateProvider::Rpc(url) => exchange_rate::grt_per_usd(url.clone()).await, + } +} + +/// Create EIP-712 attestation domains. +/// +/// Returns static references because Axum state requires `'static` lifetime. +/// These domains are immutable config and never deallocated. +fn init_attestation_domains( + config: &config::AttestationConfig, +) -> (&'static Eip712Domain, &'static Eip712Domain) { + let chain_id = config + .chain_id + .parse::() + .expect("failed to parse attestation domain chain_id"); + + let attestation_domain: &'static Eip712Domain = Box::leak(Box::new( + attestation::eip712_domain(chain_id, config.dispute_manager), + )); + + let legacy_attestation_domain: &'static Eip712Domain = Box::leak(Box::new( + attestation::eip712_domain(chain_id, config.legacy_dispute_manager), + )); + + (attestation_domain, legacy_attestation_domain) +} + +/// Initialize network topology service and related components. +/// +/// Returns: +/// - `NetworkService`: Handle for subgraph/deployment resolution +/// - `Blocklist`: Indexer blocklist with Kafka updates +/// - `IndexingPerformance`: Performance tracking for indexer selection +fn init_network_service( + http_client: &reqwest::Client, + indexer_client: &IndexerClient, + conf: &config::Config, + kafka_consumer: KafkaConsumer, +) -> ( + network::NetworkService, + indexer_blocklist::Blocklist, + IndexingPerformance, +) { + let network_subgraph_client = SubgraphClient { + client: indexer_client.clone(), + indexers: conf.trusted_indexers.clone(), + latest_block: None, + page_size: 500, + max_lag_seconds: conf.network_subgraph_max_lag_seconds, + }; + + let indexer_host_blocklist = match &conf.ip_blocker_db { + Some(path) => { + config::load_ip_blocklist_from_file(path).expect("failed to load IP blocker DB") + } + None => Default::default(), + }; + + let blocklist = indexer_blocklist::Blocklist::spawn(conf.blocklist.clone(), kafka_consumer); + + let network = network::service::spawn( + http_client.clone(), + network_subgraph_client, + blocklist.clone(), + conf.min_indexer_version.clone(), + conf.min_graph_node_version.clone(), + indexer_host_blocklist, + ); + + let indexing_perf = IndexingPerformance::new(network.clone()); + + (network, blocklist, indexing_perf) +} + +// ============================================================================= +// Router Building +// ============================================================================= + +/// Build the main API router with all middleware layers. +fn build_router( + ctx: Context, + auth_service: AuthContext, + signer_address: thegraph_core::alloy::primitives::Address, + blocklist: watch::Receiver>, +) -> Router { let api = Router::new() .route( "/deployments/id/{deployment_id}", @@ -226,8 +336,7 @@ async fn main() { ) .with_state(ctx) .layer( - // ServiceBuilder works by composing all layers into one such that they run top to - // bottom, and then the response would bubble back up through the layers in reverse + // ServiceBuilder composes layers top-to-bottom, responses bubble up in reverse tower::ServiceBuilder::new() .layer( CorsLayer::new() @@ -240,37 +349,53 @@ async fn main() { .layer(RequireAuthorizationLayer::new(auth_service)), ); - let router = Router::new() + Router::new() .route("/", routing::get(|| async { "Ready to roll!" })) - // This path is required by NGINX ingress controller .route("/ready", routing::get(|| async { "Ready" })) .route( "/blocklist", routing::get(move || async move { axum::Json(blocklist.borrow().clone()) }), ) - .nest("/api", api); + .nest("/api", api) +} - // handle graceful shutdown via the axum server instead - setup_termination.abort(); +// ============================================================================= +// Server Startup +// ============================================================================= - let app_listener = TcpListener::bind(SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - conf.port_api, - )) - .await - .expect("Failed to bind API server") - // disable Nagle's algorithm - .tap_io(|stream| { - let _ = stream.set_nodelay(true); +/// Start the metrics server on a separate port. +/// +/// Runs in a background task. The metrics endpoint is not exposed publicly. +fn start_metrics_server(port: u16) { + tokio::spawn(async move { + let router = Router::new().route("/metrics", routing::get(handle_metrics)); + let listener = + TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port)) + .await + .expect("Failed to bind metrics server"); + axum::serve(listener, router.into_make_service()) + .await + .expect("Failed to start metrics server"); }); +} + +/// Start the main API server with graceful shutdown support. +async fn start_api_server(router: Router, port: u16) { + let listener = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port)) + .await + .expect("Failed to bind API server") + // Disable Nagle's algorithm for lower latency + .tap_io(|stream| { + let _ = stream.set_nodelay(true); + }); + axum::serve( - app_listener, + listener, router.into_make_service_with_connect_info::(), ) .with_graceful_shutdown(await_shutdown_signals()) .await .expect("Failed to start API server"); - tracing::warn!("shutdown"); } async fn await_shutdown_signals() { diff --git a/src/metrics.rs b/src/metrics.rs index 540f0d95..ed29a05a 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,3 +1,24 @@ +//! Prometheus Metrics +//! +//! Defines and registers all Prometheus metrics for the gateway. +//! +//! # Metrics +//! +//! | Metric | Type | Description | +//! |--------|------|-------------| +//! | `gw_client_query_ok` | Counter | Successful client queries | +//! | `gw_client_query_err` | Counter | Failed client queries | +//! | `gw_client_query_duration` | Histogram | Client query latency | +//! | `gw_avg_query_fees` | Gauge | Average indexer fees per query (USD) | +//! | `gw_indexer_query_ok` | Counter (vec) | Successful indexer queries by deployment/indexer | +//! | `gw_indexer_query_err` | Counter (vec) | Failed indexer queries by deployment/indexer | +//! | `gw_indexer_query_duration` | Histogram (vec) | Indexer query latency by deployment/indexer | +//! | `gw_blocks_per_minute` | Gauge (vec) | Chain blocks per minute by chain name | +//! +//! # Endpoint +//! +//! Metrics are exposed at `http://localhost:{port_metrics}/metrics` in Prometheus text format. + use lazy_static::lazy_static; use prometheus::{ Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec, diff --git a/src/middleware.rs b/src/middleware.rs index 39de9f7b..1345d924 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -1,3 +1,38 @@ +//! HTTP Middleware Layers +//! +//! Axum middleware for request processing, authentication, and tracing. +//! +//! # Middleware Stack (top to bottom) +//! +//! ```text +//! Request +//! │ +//! ▼ +//! ┌─────────────────────────┐ +//! │ CorsLayer │ Allow cross-origin requests +//! └───────────┬─────────────┘ +//! ▼ +//! ┌─────────────────────────┐ +//! │ RequestTracingLayer │ Add request_id span, log requests +//! └───────────┬─────────────┘ +//! ▼ +//! ┌─────────────────────────┐ +//! │ legacy_auth_adapter │ Extract API key from /{api_key}/... path +//! └───────────┬─────────────┘ +//! ▼ +//! ┌─────────────────────────┐ +//! │ RequireAuthorizationLayer│ Validate API key, add AuthSettings extension +//! └───────────┬─────────────┘ +//! ▼ +//! Handler +//! ``` +//! +//! # Modules +//! +//! - [`request_tracing`]: Adds request ID and tracing span +//! - [`require_auth`]: Enforces API key authentication +//! - [`legacy_auth`]: Adapts legacy `/{api_key}/...` URL scheme + mod legacy_auth; mod request_tracing; mod require_auth; diff --git a/src/middleware/README.md b/src/middleware/README.md new file mode 100644 index 00000000..64c345ec --- /dev/null +++ b/src/middleware/README.md @@ -0,0 +1,75 @@ +# Middleware Module + +Axum HTTP middleware layers for request processing. + +## Module Overview + +| File | Purpose | +| -------------------- | ---------------------------------------- | +| `request_tracing.rs` | Adds request ID and tracing span | +| `require_auth.rs` | Enforces API key authentication | +| `legacy_auth.rs` | Adapts legacy `/{api_key}/...` URL paths | + +## Middleware Stack + +Applied top-to-bottom for each request: + +``` +Request + | + v ++---------------------------+ +| CorsLayer | Allow cross-origin requests ++---------------------------+ + | + v ++---------------------------+ +| RequestTracingLayer | Generate request_id, create span ++---------------------------+ + | + v ++---------------------------+ +| legacy_auth_adapter | Move API key from path to header ++---------------------------+ + | + v ++---------------------------+ +| RequireAuthorizationLayer | Validate key, add AuthSettings ++---------------------------+ + | + v +Handler +``` + +## Request Tracing + +Each request gets a unique `request_id` (UUID v4) added to the tracing span. +The span includes: + +- Request method and path +- Response status +- Duration + +## Legacy Auth Adapter + +Converts legacy URL format to modern header-based auth: + +``` +/api/{api_key}/subgraphs/id/{id} + | + v +/api/subgraphs/id/{id} ++ Authorization: Bearer {api_key} +``` + +## Authorization Layer + +Validates API keys and adds `AuthSettings` to request extensions: + +```rust +// In handler +let auth: Extension = ...; +if !auth.is_subgraph_authorized(&subgraph_id) { + return Err(Error::Auth(...)); +} +``` diff --git a/src/network.rs b/src/network.rs index ac9114eb..6109987f 100644 --- a/src/network.rs +++ b/src/network.rs @@ -1,3 +1,23 @@ +//! Network Topology Management +//! +//! This module provides network topology services for resolving subgraphs and +//! deployments to their available indexers. +//! +//! # Module Structure +//! +//! | Module | Purpose | +//! |--------|---------| +//! | [`service`] | Main [`NetworkService`] interface for resolution | +//! | [`subgraph_client`] | Fetches data from network subgraph via trusted indexers | +//! | [`cost_model`] | Resolves indexer query fees | +//! | [`indexing_progress`] | Resolves indexer block progress | +//! | [`host_filter`] | IP-based indexer filtering | +//! | [`version_filter`] | Version-based indexer filtering | +//! | [`poi_filter`] | Proof-of-indexing filtering | +//! | [`indexer_blocklist`] | Manual indexer blocklist | +//! +//! See [`service`] module for architecture details. + pub use service::{NetworkService, ResolvedSubgraphInfo}; pub use snapshot::{DeploymentError, Indexing, IndexingId, SubgraphError}; use thegraph_graphql_http::graphql::{IntoDocument as _, IntoDocumentWithVariables}; diff --git a/src/network/README.md b/src/network/README.md new file mode 100644 index 00000000..167ecd6c --- /dev/null +++ b/src/network/README.md @@ -0,0 +1,68 @@ +# Network Module + +Network topology management for subgraph/deployment resolution. + +## Module Overview + +| File | Visibility | Purpose | +| ------------------------ | ---------- | ------------------------------------------------------- | +| `service.rs` | pub | `NetworkService` - main resolution interface | +| `snapshot.rs` | internal | In-memory topology state (`NetworkTopologySnapshot`) | +| `subgraph_client.rs` | pub | Fetches data from network subgraph via trusted indexers | +| `indexer_processing.rs` | internal | Processes indexer info using type-state pattern | +| `pre_processing.rs` | internal | Validates and converts raw subgraph data | +| `subgraph_processing.rs` | internal | Processes subgraph/deployment info | +| `cost_model.rs` | pub | Resolves indexer query fees | +| `indexing_progress.rs` | pub | Resolves indexer block progress | +| `host_filter.rs` | pub | IP-based indexer filtering | +| `version_filter.rs` | pub | Version-based indexer filtering | +| `poi_filter.rs` | pub | Proof-of-indexing filtering | +| `indexer_blocklist.rs` | pub | Manual indexer blocklist management | + +## Data Flow + +``` +SubgraphClient.fetch() + | + v +pre_processing::into_internal_*() + | + v +subgraph_processing::process_*() + | + v +indexer_processing::process_info() + | + v +snapshot::new_from() + | + v +NetworkTopologySnapshot (published via watch channel) +``` + +## Key Types + +- `NetworkService` - Query resolution interface (cloneable handle) +- `ResolvedSubgraphInfo` - Result of subgraph/deployment resolution +- `Indexing` - Single indexer+deployment combination with resolved info +- `IndexingId` - Unique key (indexer, deployment) for indexings +- `IndexingInfo` - Type-state pattern for indexing processing stages + +## Update Cycle + +Every 30 seconds: + +1. Fetch subgraph info from network subgraph +2. Validate and pre-process data +3. Resolve indexer info (version, POI, progress, fees) +4. Build new `NetworkTopologySnapshot` +5. Publish via watch channel + +## Filters + +Indexers can be filtered out by: + +- **HostFilter**: IP address blocklist +- **VersionFilter**: Minimum indexer-service and graph-node versions +- **PoiFilter**: Bad proof-of-indexing blocklist +- **IndexerBlocklist**: Manual per-indexer-deployment blocklist diff --git a/src/network/indexer_processing.rs b/src/network/indexer_processing.rs index a2a13b7c..d7baa333 100644 --- a/src/network/indexer_processing.rs +++ b/src/network/indexer_processing.rs @@ -1,3 +1,51 @@ +//! Indexer Information Processing +//! +//! Processes raw indexer data through multiple resolution stages using a type-state +//! pattern to ensure compile-time correctness. +//! +//! # Type-State Pattern +//! +//! The [`IndexingInfo`] type uses generic parameters to track processing state: +//! +//! ```text +//! IndexingInfo<(), ()> -- Raw allocation data only +//! │ +//! │ with_indexing_progress() +//! ▼ +//! IndexingInfo -- Has progress info (latest/min block) +//! │ +//! │ with_fee() +//! ▼ +//! IndexingInfo -- Fully resolved (= ResolvedIndexingInfo) +//! ``` +//! +//! This pattern provides compile-time guarantees that fields aren't accessed before +//! they're populated. For example, you cannot access `.progress` on `IndexingInfo<(), ()>`. +//! +//! # Processing Pipeline +//! +//! For each indexer, [`process_info`] performs these checks in order: +//! +//! 1. **Host Filter**: Check IP blocklist (rejects blocked IPs) +//! 2. **Version Filter**: Check minimum indexer-service and graph-node versions +//! 3. **Deployment Blocklist**: Check manually blocklisted indexer+deployment pairs +//! 4. **POI Filter**: Check proof-of-indexing blocklist (bad data detection) +//! 5. **Indexing Progress**: Resolve latest/min block numbers via status endpoint +//! 6. **Cost Models**: Resolve query fees via cost model endpoint +//! +//! If any check fails, the indexer is marked with an [`UnavailableReason`] and +//! excluded from query routing. +//! +//! # Key Types +//! +//! - [`IndexerRawInfo`]: Pre-processed indexer info from network subgraph +//! - [`IndexerInfo`]: Processed indexer with resolved indexings +//! - [`IndexingInfo`]: Type-state indexing info (allocation → progress → fee) +//! - [`ResolvedIndexingInfo`]: Fully resolved indexing (alias for `IndexingInfo`) +//! - [`IndexingProgress`]: Block range an indexer has indexed (latest/min block) +//! +//! [`UnavailableReason`]: crate::errors::UnavailableReason + use std::collections::{HashMap, HashSet}; use custom_debug::CustomDebug; diff --git a/src/network/service.rs b/src/network/service.rs index 06d70f07..530e578f 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -1,6 +1,64 @@ -//! The [`NetworkService`] is a `graph-gateway` specific abstraction layer providing a -//! simplified interface for resolving the subgraph-specific information required by the -//! query processing pipeline +//! Network Topology Service +//! +//! The [`NetworkService`] provides a simplified interface for resolving subgraph-specific +//! information required by the query processing pipeline. +//! +//! # Architecture +//! +//! ```text +//! ┌─────────────────┐ +//! │ SubgraphClient │ Fetches from network subgraph via trusted indexers +//! │ (every 30s) │ +//! └────────┬────────┘ +//! │ +//! ▼ +//! ┌─────────────────┐ +//! │ Pre-processing │ Validates and converts raw subgraph data +//! │ │ - Filters invalid entries +//! │ │ - Converts to internal types +//! └────────┬────────┘ +//! │ +//! ▼ +//! ┌─────────────────┐ +//! │ Indexer │ Resolves per-indexer information: +//! │ Processing │ - Host filter (IP blocklist) +//! │ │ - Version filter (min indexer-service/graph-node) +//! │ │ - POI filter (proof-of-indexing blocklist) +//! │ │ - Indexing progress (latest/min block) +//! │ │ - Cost models (query fees) +//! └────────┬────────┘ +//! │ +//! ▼ +//! ┌─────────────────┐ +//! │ NetworkTopology │ Published via watch channel +//! │ Snapshot │ - Subgraphs: SubgraphId → versions, indexings +//! │ │ - Deployments: DeploymentId → indexings +//! └─────────────────┘ +//! ``` +//! +//! # Update Cycle +//! +//! The service spawns a background task ([`spawn_updater_task`]) that: +//! +//! 1. Fetches subgraph info from the network subgraph every 30 seconds +//! 2. Validates and pre-processes the data (filters invalid entries) +//! 3. Resolves indexer information (version, POI, progress, cost models) +//! 4. Constructs a new [`NetworkTopologySnapshot`] +//! 5. Publishes the snapshot via a watch channel for consumers +//! +//! # Resolution Methods +//! +//! - [`NetworkService::resolve_with_subgraph_id`]: Resolves a subgraph ID to its +//! deployments (versions) and available indexers +//! - [`NetworkService::resolve_with_deployment_id`]: Resolves a deployment ID +//! directly to available indexers +//! +//! # Error Handling +//! +//! Resolution can fail with: +//! - [`SubgraphError::NoAllocations`]: No indexers allocated to this subgraph +//! - [`SubgraphError::NoValidVersions`]: All deployment versions are invalid +//! - [`DeploymentError::NoAllocations`]: No indexers allocated to this deployment use std::{ collections::{HashMap, HashSet}, @@ -71,7 +129,7 @@ impl ResolvedSubgraphInfo { /// The [`NetworkService`] is responsible for extracting and providing information about /// the network topology and subgraphs associated with a given query selector, e.g., a subgraph ID. /// -/// To create a new [`NetworkService`] instance, use the [`NetworkServiceBuilder`]. +/// To create a new [`NetworkService`] instance, use the [`spawn`] function. #[derive(Clone)] pub struct NetworkService { network: watch::Receiver, diff --git a/src/network/subgraph_client.rs b/src/network/subgraph_client.rs index 7117f7c5..e024425d 100644 --- a/src/network/subgraph_client.rs +++ b/src/network/subgraph_client.rs @@ -29,7 +29,7 @@ use crate::{ /// Please, DO NOT mix or merge them. /// /// -/// See: https://github.com/graphprotocol/graph-network-subgraph/blob/master/schema.graphql +/// See: pub mod types { use serde::Deserialize; use serde_with::serde_as; diff --git a/src/receipts.rs b/src/receipts.rs index 480afd10..67417763 100644 --- a/src/receipts.rs +++ b/src/receipts.rs @@ -1,3 +1,47 @@ +//! TAP Receipt Signing +//! +//! Creates and signs Timeline Aggregation Protocol (TAP) receipts for indexer payments. +//! Receipts are cryptographic commitments to pay indexers for query execution. +//! +//! # Receipt Versions +//! +//! | Version | Contract | Allocation Type | Serialization | +//! |---------|----------|-----------------|---------------| +//! | V1 | `TAP` verifier | Legacy allocations | JSON | +//! | V2 | `GraphTallyCollector` | New allocations (Horizon) | Protobuf + Base64 | +//! +//! The version is determined by the `is_legacy` flag on the allocation. Legacy +//! allocations were created before the Horizon upgrade and use the original TAP +//! verifier contract. +//! +//! # EIP-712 Domains +//! +//! Each receipt version uses a different EIP-712 domain for signing: +//! +//! - **V1**: `name="TAP", version="1", chainId=, verifyingContract=` +//! - **V2**: `name="GraphTallyCollector", version="1", chainId=, verifyingContract=` +//! +//! # Receipt Fields +//! +//! Common fields in both versions: +//! - `allocation_id` / `collection_id`: Identifies the allocation being paid +//! - `value`: Payment amount in GRT wei +//! - `timestamp_ns`: Creation time (nanoseconds since epoch) +//! - `nonce`: Random value to prevent replay attacks +//! +//! V2 adds: +//! - `payer`: Gateway's payment address +//! - `data_service`: Subgraph service contract address +//! - `service_provider`: Indexer's address +//! +//! # Usage +//! +//! ```ignore +//! let signer = ReceiptSigner::new(payer, key, chain_id, verifier, legacy_verifier, data_service); +//! let receipt = signer.create_receipt(allocation_id, indexer, fee, is_legacy)?; +//! let serialized = receipt.serialize(); // Send this to indexer +//! ``` + use std::time::SystemTime; use base64::{Engine as _, prelude::BASE64_STANDARD}; diff --git a/src/reports.rs b/src/reports.rs index e65620e6..7baae529 100644 --- a/src/reports.rs +++ b/src/reports.rs @@ -1,3 +1,34 @@ +//! Kafka Reporting +//! +//! Reports query telemetry and attestations to Kafka topics for analytics and auditing. +//! +//! # Topics +//! +//! | Topic | Content | Format | +//! |-------|---------|--------| +//! | `gateway_queries` | Client request data | Protobuf ([`ClientQueryProtobuf`]) | +//! | `gateway_attestations` | Indexer attestations | Protobuf ([`AttestationProtobuf`]) | +//! +//! # Client Query Data +//! +//! Each query report includes: +//! - Request metadata (ID, API key, user, subgraph) +//! - Response metadata (time, bytes, result) +//! - Per-indexer request details (fees, latency, result) +//! +//! # Attestation Sampling +//! +//! To avoid overwhelming the attestations topic, attestations are sampled: +//! - One attestation per (deployment, indexer) pair every 10 seconds +//! - Payloads over 100KB are truncated +//! +//! # Usage +//! +//! ```ignore +//! let reporter = Reporter::create(signer, env_id, topics, kafka_config)?; +//! reporter.send(ClientRequest { ... }); +//! ``` + use std::{collections::HashSet, time::Duration}; use anyhow::{Context, anyhow}; diff --git a/src/time.rs b/src/time.rs index 3e38b628..0ec9100e 100644 --- a/src/time.rs +++ b/src/time.rs @@ -1,3 +1,7 @@ +//! Time Utilities +//! +//! Simple timestamp helpers for consistent time handling. + use std::time::SystemTime; /// Return milliseconds since Unix epoch diff --git a/src/unattestable_errors.rs b/src/unattestable_errors.rs index 3d67285c..b478f730 100644 --- a/src/unattestable_errors.rs +++ b/src/unattestable_errors.rs @@ -1,3 +1,28 @@ +//! Unattestable Error Detection +//! +//! Identifies indexer errors that should not be attested (signed) because they +//! indicate internal issues rather than valid query results. +//! +//! # Background +//! +//! Indexers sign attestations for their query responses. However, some errors +//! from graph-node are "unattestable" - they indicate bugs or temporary issues, +//! not legitimate query results. The gateway must detect these to avoid: +//! +//! 1. Returning broken responses to clients +//! 2. Triggering disputes for non-malicious indexer behavior +//! +//! # Error Categories +//! +//! The [`UNATTESTABLE_ERROR_MESSAGE_FRAGMENTS`] list includes: +//! - Store errors (database issues) +//! - Timeout errors +//! - Query complexity limits +//! - Chain reorganization +//! - Internal panics +//! +//! See graph-node source for authoritative list. + // This list should not be necessary, but it is a temporary measure to avoid unattestable errors // from getting to users. // Derived from https://github.com/graphprotocol/graph-node/blob/master/graph/src/data/query/error.rs