From 3ab696cef3ce0325a4dd80eadbc641c3e6b901a7 Mon Sep 17 00:00:00 2001 From: Defnull <879658+define-null@users.noreply.github.com> Date: Tue, 13 Jan 2026 23:08:39 +0100 Subject: [PATCH 1/7] chore(NET-112): Add http api, streams and query related metrics to HotblockDB --- Cargo.lock | 19 ++ crates/hotblocks/Cargo.toml | 4 +- crates/hotblocks/src/api.rs | 388 ++++++++++++++++--------- crates/hotblocks/src/cli.rs | 4 +- crates/hotblocks/src/dataset_config.rs | 3 + crates/hotblocks/src/metrics.rs | 256 ++++++++++------ crates/hotblocks/src/query/executor.rs | 49 ++-- crates/hotblocks/src/query/response.rs | 146 ++++++++-- crates/hotblocks/src/query/running.rs | 68 ++++- crates/hotblocks/src/query/service.rs | 7 +- 10 files changed, 669 insertions(+), 275 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8298f09..d18b4b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4895,6 +4895,7 @@ dependencies = [ "clap", "flate2", "futures", + "lazy_static", "ouroboros", "prometheus-client 0.24.0", "serde", @@ -4911,6 +4912,7 @@ dependencies = [ "sqd-storage", "tikv-jemallocator", "tokio", + "tower-http", "tracing", "tracing-subscriber", "url", @@ -5467,6 +5469,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" +dependencies = [ + "bitflags 2.8.0", + "bytes", + "http 1.2.0", + "http-body 1.0.1", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", + "uuid", +] + [[package]] name = "tower-layer" version = "0.3.3" diff --git a/crates/hotblocks/Cargo.toml b/crates/hotblocks/Cargo.toml index a311847..41fad8e 100644 --- a/crates/hotblocks/Cargo.toml +++ b/crates/hotblocks/Cargo.toml @@ -11,6 +11,7 @@ bytes = { workspace = true } chrono = { workspace = true, features = ["std"] } clap = { workspace = true, features = ["derive"] } flate2 = { workspace = true } +lazy_static = "1.4.0" futures = { workspace = true } ouroboros = { workspace = true } prometheus-client = { workspace = true } @@ -28,6 +29,7 @@ sqd-query = { path = "../query", features = ["storage"] } sqd-storage = { path = "../storage" } tikv-jemallocator = "0.6.0" tokio = { workspace = true, features = ["full"] } +tower-http = { version = "0.6.1", features = ["request-id", "trace"] } tracing = { workspace = true, features = ["valuable"] } tracing-subscriber = { workspace = true, features = ["env-filter", "json", "valuable"] } -url = { workspace = true, features = ["serde"] } \ No newline at end of file +url = { workspace = true, features = ["serde"] } diff --git a/crates/hotblocks/src/api.rs b/crates/hotblocks/src/api.rs index 754138d..168c70f 100644 --- a/crates/hotblocks/src/api.rs +++ b/crates/hotblocks/src/api.rs @@ -1,12 +1,17 @@ use crate::cli::App; -use crate::errors::{BlockItemIsNotAvailable, BlockRangeMissing, Busy, QueryIsAboveTheHead, QueryKindMismatch, UnknownDataset}; +use crate::dataset_controller::DatasetController; +use crate::errors::{ + BlockItemIsNotAvailable, BlockRangeMissing, Busy, QueryIsAboveTheHead, QueryKindMismatch, + UnknownDataset, +}; use crate::query::QueryResponse; use crate::types::RetentionStrategy; use anyhow::bail; use async_stream::try_stream; use axum::body::{Body, Bytes}; -use axum::extract::Path; +use axum::extract::{Path, Request}; use axum::http::StatusCode; +use axum::http::Uri; use axum::response::{IntoResponse, Response}; use axum::routing::{get, post}; use axum::{BoxError, Extension, Json, Router}; @@ -16,8 +21,9 @@ use sqd_primitives::BlockRef; use sqd_query::{Query, UnexpectedBaseBlock}; use sqd_storage::db::DatasetId; use std::sync::Arc; -use tracing::error; - +use std::time::Instant; +use tower_http::request_id::{MakeRequestUuid, RequestId, SetRequestIdLayer}; +use tracing::{Instrument, error}; macro_rules! json_ok { ($json:expr) => { @@ -25,75 +31,170 @@ macro_rules! json_ok { }; } - macro_rules! text { ($status:expr, $($arg:tt)+) => { ($status, format!($($arg)*)).into_response() }; } - macro_rules! get_dataset { ($app:expr, $dataset_id:expr) => { match $app.data_service.get_dataset($dataset_id) { Ok(ds) => ds, - Err(err) => return text!(StatusCode::NOT_FOUND, "{}", err) + Err(err) => return text!(StatusCode::NOT_FOUND, "{}", err), } }; } - type AppRef = Arc; - pub fn build_api(app: App) -> Router { Router::new() - .route("/", get(|| async { "Welcome to SQD hot block data service!" })) + .route( + "/", + get(|| async { "Welcome to SQD hot block data service!" }), + ) .route("/datasets/{id}/stream", post(stream)) .route("/datasets/{id}/finalized-stream", post(finalized_stream)) .route("/datasets/{id}/head", get(get_head)) .route("/datasets/{id}/finalized-head", get(get_finalized_head)) - .route("/datasets/{id}/retention", get(get_retention).post(set_retention)) + .route( + "/datasets/{id}/retention", + get(get_retention).post(set_retention), + ) .route("/datasets/{id}/status", get(get_status)) .route("/datasets/{id}/metadata", get(get_metadata)) .route("/metrics", get(get_metrics)) .route("/rocksdb/stats", get(get_rocks_stats)) .route("/rocksdb/prop/{cf}/{name}", get(get_rocks_prop)) + .fallback(handle_404) + .layer(axum::middleware::from_fn(middleware)) + .layer(SetRequestIdLayer::x_request_id(MakeRequestUuid::default())) .layer(Extension(Arc::new(app))) } +pub async fn middleware(req: Request, next: axum::middleware::Next) -> impl IntoResponse { + let method = req.method().to_string(); + let path = req.uri().path().to_string(); + let version = req.version(); + let start = Instant::now(); + let request_id = req + .extensions() + .get::() + .expect("RequestId should be set by SetRequestIdLayer") + .header_value() + .to_str() + .expect("Request ID should be a valid string"); + + let span = tracing::span!(tracing::Level::INFO, "http_request", request_id); + let mut response = next.run(req).instrument(span.clone()).await; + let latency = start.elapsed(); + + let mut labels = response + .extensions_mut() + .remove::() + .map(|labels| labels.0) + .unwrap_or(Vec::new()); + labels.push(("status".to_string(), response.status().as_str().to_owned())); + + span.in_scope(|| { + tracing::info!( + target: "http_request", + method, + path, + ?version, + status = %response.status(), + ?latency, + "HTTP request processed" + ); + }); + + crate::metrics::report_http_response(&labels, latency); + + response +} + +#[derive(Clone)] +pub struct Labels(Vec<(String, String)>); + +pub struct ResponseWithMetadata { + pub labels: Labels, + pub response: Option, +} + +impl ResponseWithMetadata { + fn new() -> Self { + Self { + labels: Labels(vec![]), + response: None, + } + } + + pub fn with_dataset_id(mut self, id: DatasetId) -> Self { + self.labels + .0 + .push(("dataset_id".to_string(), id.as_str().to_owned())); + self + } + + pub fn with_endpoint(mut self, endpoint: &str) -> Self { + self.labels + .0 + .push(("endpoint".to_string(), endpoint.to_string())); + self + } + + pub fn with_response(mut self, clause: F) -> Self + where + F: FnOnce() -> Response, + { + self.response = Some(clause()); + self + } +} + +impl IntoResponse for ResponseWithMetadata { + fn into_response(self) -> Response { + let mut response = self.response.expect("response is mandatory method"); + response.extensions_mut().insert(self.labels); + response + } +} async fn stream( Extension(app): Extension, Path(dataset_id): Path, - Json(query): Json -) -> Response -{ - stream_internal(app, dataset_id, query, false).await + Json(query): Json, +) -> impl IntoResponse { + let response = stream_internal(app, dataset_id, query, false).await; + ResponseWithMetadata::new() + .with_dataset_id(dataset_id) + .with_endpoint("/stream") + .with_response(|| response) } - async fn finalized_stream( Extension(app): Extension, Path(dataset_id): Path, - Json(query): Json -) -> Response -{ - stream_internal(app, dataset_id, query, true).await + Json(query): Json, +) -> impl IntoResponse { + let response = stream_internal(app, dataset_id, query, true).await; + ResponseWithMetadata::new() + .with_dataset_id(dataset_id) + .with_endpoint("/finalized_stream") + .with_response(|| response) } - async fn stream_internal( app: AppRef, dataset_id: DatasetId, query: Query, - finalized: bool -) -> Response -{ + finalized: bool, +) -> Response { let dataset = get_dataset!(app, dataset_id); if let Err(err) = query.validate() { - return text!(StatusCode::BAD_REQUEST, "{}", err) + return text!(StatusCode::BAD_REQUEST, "{}", err); } let query_result = if finalized { @@ -114,7 +215,9 @@ async fn stream_internal( // For finalized stream, use the finalized head as the head res = res.header("x-sqd-head-number", finalized_head.number); } else { - let head_block = finalized_head.number.max(dataset.get_head_block_number().unwrap_or(0)); + let head_block = finalized_head + .number + .max(dataset.get_head_block_number().unwrap_or(0)); res = res.header("x-sqd-head-number", head_block); } res = res.header("x-sqd-finalized-head-number", finalized_head.number); @@ -123,18 +226,17 @@ async fn stream_internal( res = res.header("x-sqd-head-number", head_block); } - let body = Body::from_stream( - stream_query_response(stream) - ); + let body = Body::from_stream(stream_query_response(stream)); res.body(body).unwrap() - }, - Err(err) => error_to_response(err) + } + Err(err) => error_to_response(err), } } - -fn stream_query_response(mut stream: QueryResponse) -> impl TryStream { +fn stream_query_response( + mut stream: QueryResponse, +) -> impl TryStream { try_stream! { while let Some(pack_result) = stream.next_data_pack().await.transpose() { match pack_result { @@ -155,7 +257,6 @@ fn stream_query_response(mut stream: QueryResponse) -> impl TryStream Response { if let Some(above_the_head) = err.downcast_ref::() { let mut res = Response::builder().status(204); @@ -163,16 +264,17 @@ fn error_to_response(err: anyhow::Error) -> Response { res = res.header("x-sqd-finalized-head-number", head.number); res = res.header("x-sqd-finalized-head-hash", head.hash.as_str()); } - return res.body(Body::empty()).unwrap() + return res.body(Body::empty()).unwrap(); } if let Some(fork) = err.downcast_ref::() { return ( StatusCode::CONFLICT, Json(BaseBlockConflict { - previous_blocks: &fork.prev_blocks - }) - ).into_response() + previous_blocks: &fork.prev_blocks, + }), + ) + .into_response(); } let status_code = if err.is::() { @@ -198,75 +300,82 @@ fn error_to_response(err: anyhow::Error) -> Response { (status_code, message).into_response() } - #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct BaseBlockConflict<'a> { - previous_blocks: &'a [BlockRef] + previous_blocks: &'a [BlockRef], } - async fn get_finalized_head( Extension(app): Extension, - Path(dataset_id): Path -) -> Response -{ - json_ok! { - get_dataset!(app, dataset_id).get_finalized_head() - } + Path(dataset_id): Path, +) -> impl IntoResponse { + ResponseWithMetadata::new() + .with_dataset_id(dataset_id.clone()) + .with_endpoint("/finalized_head") + .with_response(|| { + json_ok! { + get_dataset!(app, dataset_id).get_finalized_head() + } + }) } - async fn get_head( Extension(app): Extension, - Path(dataset_id): Path -) -> Response -{ - json_ok! { - get_dataset!(app, dataset_id).get_head() - } + Path(dataset_id): Path, +) -> impl IntoResponse { + ResponseWithMetadata::new() + .with_dataset_id(dataset_id.clone()) + .with_endpoint("/head") + .with_response(|| { + json_ok! { + get_dataset!(app, dataset_id).get_head() + } + }) } - async fn get_retention( Extension(app): Extension, - Path(dataset_id): Path -) -> Response -{ - json_ok! { - get_dataset!(app, dataset_id).get_retention() - } + Path(dataset_id): Path, +) -> impl IntoResponse { + ResponseWithMetadata::new() + .with_dataset_id(dataset_id.clone()) + .with_endpoint("/head") + .with_response(|| { + json_ok! { + get_dataset!(app, dataset_id).get_retention() + } + }) } - async fn set_retention( Extension(app): Extension, Path(dataset_id): Path, - Json(strategy): Json -) -> Response -{ - let ds = get_dataset!(app, dataset_id); - if app.api_controlled_datasets.contains(&dataset_id) { - ds.retain(strategy); - text!(StatusCode::OK, "OK") - } else { - text!( - StatusCode::FORBIDDEN, - "dataset '{}' can't be managed via API", - dataset_id - ) - } + Json(strategy): Json, +) -> impl IntoResponse { + ResponseWithMetadata::new() + .with_dataset_id(dataset_id.clone()) + .with_endpoint("/head") + .with_response(|| { + let ds = get_dataset!(app, dataset_id); + if app.api_controlled_datasets.contains(&dataset_id) { + ds.retain(strategy); + text!(StatusCode::OK, "OK") + } else { + text!( + StatusCode::FORBIDDEN, + "dataset '{}' can't be managed via API", + dataset_id + ) + } + }) } - async fn get_status( Extension(app): Extension, - Path(dataset_id): Path -) -> Response -{ - let ctl = get_dataset!(app, dataset_id); - - let read_status = || -> anyhow::Result<_> { + Path(dataset_id): Path, +) -> impl IntoResponse { + let read_status = |ctl: Arc| -> anyhow::Result<_> { let db = app.db.snapshot(); let Some(label) = db.get_label(dataset_id)? else { @@ -278,7 +387,7 @@ async fn get_status( "kind": label.kind(), "retentionStrategy": ctl.get_retention(), "data": null - }}) + }}); }; let Some(last_chunk) = db.get_last_chunk(dataset_id)? else { @@ -298,68 +407,85 @@ async fn get_status( }}) }; - match read_status() { - Ok(status) => json_ok!(status), - Err(err) => text!(StatusCode::INTERNAL_SERVER_ERROR, "{:?}", err) - } + ResponseWithMetadata::new() + .with_dataset_id(dataset_id.clone()) + .with_endpoint("/status") + .with_response(|| { + let ctl = get_dataset!(app, dataset_id); + match read_status(ctl) { + Ok(status) => json_ok!(status), + Err(err) => text!(StatusCode::INTERNAL_SERVER_ERROR, "{:?}", err), + } + }) } async fn get_metadata( Extension(app): Extension, - Path(dataset_id): Path -) -> Response -{ - get_dataset!(app, dataset_id); - - let db = app.db.snapshot(); - - let first_chunk = match db.get_first_chunk(dataset_id) { - Ok(chunk) => chunk, - Err(err) => return text!(StatusCode::INTERNAL_SERVER_ERROR, "{:?}", err) - }; - - json_ok!(serde_json::json! {{ - "dataset": dataset_id, - "aliases": [], - "real_time": true, - "start_block": first_chunk.map(|chunk| chunk.first_block()), - }}) + Path(dataset_id): Path, +) -> impl IntoResponse { + ResponseWithMetadata::new() + .with_dataset_id(dataset_id.clone()) + .with_endpoint("/metadata") + .with_response(|| { + get_dataset!(app, dataset_id); + + let db = app.db.snapshot(); + + let first_chunk = match db.get_first_chunk(dataset_id) { + Ok(chunk) => chunk, + Err(err) => return text!(StatusCode::INTERNAL_SERVER_ERROR, "{:?}", err), + }; + + json_ok!(serde_json::json! {{ + "dataset": dataset_id, + "aliases": [], + "real_time": true, + "start_block": first_chunk.map(|chunk| chunk.first_block()), + }}) + }) } - -async fn get_metrics( - Extension(app): Extension -) -> Response -{ +async fn get_metrics(Extension(app): Extension) -> impl IntoResponse { let mut metrics = String::new(); prometheus_client::encoding::text::encode(&mut metrics, &app.metrics_registry) .expect("String IO is infallible"); - metrics.into_response() + ResponseWithMetadata::new() + .with_endpoint("/metrics") + .with_response(|| metrics.into_response()) } - -async fn get_rocks_stats( - Extension(app): Extension -) -> Response -{ - if let Some(stats) = app.db.get_statistics() { - stats.into_response() - } else { - text!(StatusCode::INTERNAL_SERVER_ERROR, "rocksdb stats are not enabled") - } +async fn get_rocks_stats(Extension(app): Extension) -> impl IntoResponse { + ResponseWithMetadata::new() + .with_endpoint("/rocks_stats") + .with_response(|| { + if let Some(stats) = app.db.get_statistics() { + stats.into_response() + } else { + text!( + StatusCode::INTERNAL_SERVER_ERROR, + "rocksdb stats are not enabled" + ) + } + }) } - async fn get_rocks_prop( Extension(app): Extension, - Path((cf, name)): Path<(String, String)> -) -> Response -{ - match app.db.get_property(&cf, &name) { - Ok(Some(s)) => s.into_response(), - Ok(None) => text!(StatusCode::NOT_FOUND, "property not found"), - Err(err) => text!(StatusCode::INTERNAL_SERVER_ERROR, "{}", err) - } + Path((cf, name)): Path<(String, String)>, +) -> impl IntoResponse { + ResponseWithMetadata::new() + .with_endpoint("/rocks_prop") + .with_response(|| match app.db.get_property(&cf, &name) { + Ok(Some(s)) => s.into_response(), + Ok(None) => text!(StatusCode::NOT_FOUND, "property not found"), + Err(err) => text!(StatusCode::INTERNAL_SERVER_ERROR, "{}", err), + }) +} + +async fn handle_404(uri: Uri) -> impl IntoResponse { + ResponseWithMetadata::new() + .with_endpoint("404_fallback") + .with_response(|| text!(StatusCode::NOT_FOUND, "Not found: {}", uri.path())) } diff --git a/crates/hotblocks/src/cli.rs b/crates/hotblocks/src/cli.rs index e378a83..bb1e5d3 100644 --- a/crates/hotblocks/src/cli.rs +++ b/crates/hotblocks/src/cli.rs @@ -7,6 +7,7 @@ use clap::Parser; use sqd_storage::db::{DatabaseSettings, DatasetId}; use std::collections::BTreeSet; use std::sync::Arc; +use std::time::Duration; #[derive(Parser, Debug)] @@ -100,6 +101,7 @@ impl CLI { Arc::new(builder.build()) }; + query_service.spawn_metrics_reporter(Duration::from_secs(5)); Ok(App { db, @@ -109,4 +111,4 @@ impl CLI { metrics_registry }) } -} \ No newline at end of file +} diff --git a/crates/hotblocks/src/dataset_config.rs b/crates/hotblocks/src/dataset_config.rs index 95df6c8..204e73b 100644 --- a/crates/hotblocks/src/dataset_config.rs +++ b/crates/hotblocks/src/dataset_config.rs @@ -8,11 +8,14 @@ use url::Url; #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub enum RetentionConfig { + // Fixed, starting from the block number FromBlock { number: BlockNumber, parent_hash: Option, }, + // Moving window that keeps up to N blocks Head(u64), + // Retention is set dynamically from the portal Api, None, } diff --git a/crates/hotblocks/src/metrics.rs b/crates/hotblocks/src/metrics.rs index d9f7bd0..96270d4 100644 --- a/crates/hotblocks/src/metrics.rs +++ b/crates/hotblocks/src/metrics.rs @@ -1,70 +1,104 @@ use crate::types::DBRef; use anyhow::bail; use prometheus_client::collector::Collector; -use prometheus_client::encoding::{DescriptorEncoder, EncodeLabelSet, EncodeLabelValue, LabelValueEncoder}; -use prometheus_client::metrics::counter::Counter; -use prometheus_client::metrics::MetricType; +use prometheus_client::encoding::{ + DescriptorEncoder, EncodeLabelSet, EncodeLabelValue, LabelValueEncoder, +}; +use prometheus_client::metrics::{ + MetricType, + counter::Counter, + family::Family, + gauge::Gauge, + histogram::{Histogram, exponential_buckets}, +}; use prometheus_client::registry::Registry; use sqd_storage::db::{DatasetId, ReadSnapshot}; use std::fmt::Write; -use std::sync::LazyLock; +use std::time::Duration; use tracing::error; - #[derive(Copy, Clone, Hash, Debug, Default, Ord, PartialOrd, Eq, PartialEq, EncodeLabelSet)] struct DatasetLabel { - dataset: DatasetValue + dataset: DatasetValue, } - #[derive(Copy, Clone, Hash, Debug, Default, Ord, PartialOrd, Eq, PartialEq)] struct DatasetValue(DatasetId); - impl EncodeLabelValue for DatasetValue { fn encode(&self, encoder: &mut LabelValueEncoder) -> Result<(), std::fmt::Error> { encoder.write_str(self.0.as_str()) } } - macro_rules! dataset_label { ($dataset_id:expr) => { DatasetLabel { - dataset: DatasetValue($dataset_id) + dataset: DatasetValue($dataset_id), } }; } +type Labels = Vec<(String, String)>; -macro_rules! metric { - ($name:ident, $t:ty) => { - static $name: LazyLock<$t> = LazyLock::new(Default::default); - }; +fn buckets(start: f64, count: usize) -> impl Iterator { + std::iter::successors(Some(start), |x| Some(x * 10.)) + .flat_map(|x| [x, x * 1.5, x * 2.5, x * 5.0]) + .take(count) } - -metric!(QUERY_ERROR_TOO_MANY_TASKS, Counter); -metric!(QUERY_ERROR_TOO_MANY_DATA_WAITERS, Counter); - +lazy_static::lazy_static! { + pub static ref HTTP_STATUS: Family = Default::default(); + pub static ref HTTP_TTFB: Family = + Family::new_with_constructor(|| Histogram::new(buckets(0.001, 20))); + + pub static ref QUERY_ERROR_TOO_MANY_TASKS: Counter = Default::default(); + pub static ref QUERY_ERROR_TOO_MANY_DATA_WAITERS: Counter = Default::default(); + + pub static ref ACTIVE_QUERIES: Gauge = Default::default(); + pub static ref COMPLETED_QUERIES: Counter = Default::default(); + + pub static ref STREAM_DURATIONS: Family = + Family::new_with_constructor(|| Histogram::new(exponential_buckets(0.01, 2.0, 20))); + pub static ref STREAM_BYTES: Family = + Family::new_with_constructor(|| Histogram::new(exponential_buckets(1000., 2.0, 20))); + pub static ref STREAM_BLOCKS: Family = + Family::new_with_constructor(|| Histogram::new(exponential_buckets(1., 2.0, 30))); + pub static ref STREAM_CHUNKS: Family = + Family::new_with_constructor(|| Histogram::new(buckets(1., 20))); + pub static ref STREAM_BYTES_PER_SECOND: Histogram = Histogram::new(exponential_buckets(100., 3.0, 20)); + pub static ref STREAM_BLOCKS_PER_SECOND: Family = + Family::new_with_constructor(|| Histogram::new(exponential_buckets(1., 3.0, 20))); + + pub static ref QUERIED_BYTES: Family = + Family::new_with_constructor(|| Histogram::new(exponential_buckets(1000., 2.0, 20))); + pub static ref QUERIED_BLOCKS: Family = + Family::new_with_constructor(|| Histogram::new(exponential_buckets(1., 2.0, 30))); + pub static ref QUERIED_CHUNKS: Family = + Family::new_with_constructor(|| Histogram::new(buckets(1., 20))); +} pub fn report_query_too_many_tasks_error() { QUERY_ERROR_TOO_MANY_TASKS.inc(); } - pub fn report_query_too_many_data_waiters_error() { QUERY_ERROR_TOO_MANY_DATA_WAITERS.inc(); } +pub fn report_http_response(labels: &Vec<(String, String)>, to_first_byte: Duration) { + HTTP_STATUS.get_or_create(&labels).inc(); + HTTP_TTFB + .get_or_create(&labels) + .observe(to_first_byte.as_secs_f64()); +} #[derive(Debug)] struct DatasetMetricsCollector { db: DBRef, - datasets: Vec + datasets: Vec, } - impl Collector for DatasetMetricsCollector { fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { let db = self.db.snapshot(); @@ -82,7 +116,7 @@ impl Collector for DatasetMetricsCollector { dataset_id ); Ok(()) - } + }; } } @@ -90,92 +124,150 @@ impl Collector for DatasetMetricsCollector { } } - fn collect_dataset_metrics( encoder: &mut DescriptorEncoder, db: &ReadSnapshot, - dataset_id: DatasetId -) -> anyhow::Result<()> -{ + dataset_id: DatasetId, +) -> anyhow::Result<()> { let Some(label) = db.get_label(dataset_id)? else { - return Ok(()) + return Ok(()); }; let Some(first_chunk) = db.get_first_chunk(dataset_id)? else { - return Ok(()) + return Ok(()); }; let Some(last_chunk) = db.get_last_chunk(dataset_id)? else { bail!("first chunk exists, while last does not") }; - encoder.encode_descriptor( - "hotblocks_first_block", - "First block", - None, - MetricType::Gauge - )?.encode_family( - &dataset_label!(dataset_id) - )?.encode_gauge( - &first_chunk.first_block() - )?; - - encoder.encode_descriptor( - "hotblocks_last_block", - "Last block", - None, - MetricType::Gauge - )?.encode_family( - &dataset_label!(dataset_id) - )?.encode_gauge( - &last_chunk.last_block() - )?; - - encoder.encode_descriptor( - "hotblocks_last_block_timestamp_ms", - "Timestamp of the last block", - None, - MetricType::Gauge - )?.encode_family( - &dataset_label!(dataset_id) - )?.encode_gauge( - &last_chunk.last_block_time().unwrap_or(0) - )?; - - encoder.encode_descriptor( - "hotblocks_last_finalized_block", - "Last finalized block", - None, - MetricType::Gauge - )?.encode_family( - &dataset_label!(dataset_id) - )?.encode_gauge( - &label.finalized_head().map_or(0, |h| h.number) - )?; + encoder + .encode_descriptor( + "hotblocks_first_block", + "First block", + None, + MetricType::Gauge, + )? + .encode_family(&dataset_label!(dataset_id))? + .encode_gauge(&first_chunk.first_block())?; + + encoder + .encode_descriptor( + "hotblocks_last_block", + "Last block", + None, + MetricType::Gauge, + )? + .encode_family(&dataset_label!(dataset_id))? + .encode_gauge(&last_chunk.last_block())?; + + encoder + .encode_descriptor( + "hotblocks_last_block_timestamp_ms", + "Timestamp of the last block", + None, + MetricType::Gauge, + )? + .encode_family(&dataset_label!(dataset_id))? + .encode_gauge(&last_chunk.last_block_time().unwrap_or(0))?; + + encoder + .encode_descriptor( + "hotblocks_last_finalized_block", + "Last finalized block", + None, + MetricType::Gauge, + )? + .encode_family(&dataset_label!(dataset_id))? + .encode_gauge(&label.finalized_head().map_or(0, |h| h.number))?; Ok(()) } - pub fn build_metrics_registry(db: DBRef, datasets: Vec) -> Registry { - let mut registry = Registry::default(); + let mut top_registry = Registry::default(); + let registry = top_registry.sub_registry_with_prefix("hotblocks"); registry.register( - "hotblocks_query_error_too_many_tasks", + "query_error_too_many_tasks", "Number of query tasks rejected due to task queue overflow", - QUERY_ERROR_TOO_MANY_TASKS.clone() + QUERY_ERROR_TOO_MANY_TASKS.clone(), ); registry.register( - "hotblocks_query_error_too_many_data_waiters", + "query_error_too_many_data_waiters", "Number of queries rejected, because data is not yet available and there are too many data waiters", QUERY_ERROR_TOO_MANY_DATA_WAITERS.clone() ); - registry.register_collector(Box::new(DatasetMetricsCollector { - db, - datasets - })); + registry.register( + "http_status", + "Number of sent HTTP responses", + HTTP_STATUS.clone(), + ); + registry.register( + "http_seconds_to_first_byte", + "Time to first byte of HTTP responses", + HTTP_TTFB.clone(), + ); + + registry.register( + "stream_bytes", + "Numbers of bytes per stream", + STREAM_BYTES.clone(), + ); + registry.register( + "stream_blocks", + "Numbers of blocks per stream", + STREAM_BLOCKS.clone(), + ); + registry.register( + "stream_chunks", + "Numbers of chunks per stream", + STREAM_CHUNKS.clone(), + ); + registry.register( + "stream_bytes_per_second", + "Completed streams bandwidth", + STREAM_BYTES_PER_SECOND.clone(), + ); + registry.register( + "stream_blocks_per_second", + "Completed streams speed in blocks", + STREAM_BLOCKS_PER_SECOND.clone(), + ); + registry.register( + "stream_duration_seconds", + "Durations of completed streams", + STREAM_DURATIONS.clone(), + ); + + registry.register( + "queried_bytes", + "Numbers of bytes queried per running query", + QUERIED_BYTES.clone(), + ); + registry.register( + "queried_blocks", + "Numbers of blocks per running query", + QUERIED_BLOCKS.clone(), + ); + registry.register( + "queried_chunks", + "Numbers of chunks per running query", + QUERIED_CHUNKS.clone(), + ); + registry.register( + "active_queries", + "Number of active queries", + ACTIVE_QUERIES.clone(), + ); + registry.register( + "completed_queries", + "Number of completed queries", + COMPLETED_QUERIES.clone(), + ); + top_registry.register_collector(Box::new(DatasetMetricsCollector { db, datasets })); - registry -} \ No newline at end of file + top_registry +} diff --git a/crates/hotblocks/src/query/executor.rs b/crates/hotblocks/src/query/executor.rs index f3f97d5..78d27e2 100644 --- a/crates/hotblocks/src/query/executor.rs +++ b/crates/hotblocks/src/query/executor.rs @@ -1,66 +1,81 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; +use crate::metrics::{ACTIVE_QUERIES, COMPLETED_QUERIES, report_query_too_many_tasks_error}; use std::sync::Arc; - +use std::sync::atomic::{AtomicUsize, Ordering}; +use tokio::time::{self, Duration}; #[derive(Clone)] pub struct QueryExecutor { + // number of concurrent queries in_flight: Arc, + // limit for concurrent queries max_pending_tasks: usize, - urgency: usize + urgency: usize, } - impl QueryExecutor { pub fn new(max_pending_tasks: usize, urgency: usize) -> Self { Self { in_flight: Arc::new(AtomicUsize::new(0)), max_pending_tasks, - urgency + urgency, } } pub fn get_slot(&self) -> Option { - if self.in_flight.fetch_add(1, Ordering::SeqCst) < self.max_pending_tasks { + let active_queries = self.in_flight.fetch_add(1, Ordering::SeqCst); + if active_queries < self.max_pending_tasks { Some(QuerySlot { in_flight: self.in_flight.clone(), - urgency: self.urgency + urgency: self.urgency, }) } else { self.in_flight.fetch_sub(1, Ordering::SeqCst); - crate::metrics::report_query_too_many_tasks_error(); + report_query_too_many_tasks_error(); None } } -} + pub fn spawn_metrics_reporter(&self, interval: Duration) { + let active_count = self.in_flight.clone(); + + tokio::spawn(async move { + let mut ticker = time::interval(interval); + + loop { + ticker.tick().await; + let active_queries = active_count.load(Ordering::SeqCst); + ACTIVE_QUERIES.set(active_queries as i64); + } + }); + } +} pub struct QuerySlot { in_flight: Arc, - urgency: usize + urgency: usize, } - impl Drop for QuerySlot { fn drop(&mut self) { self.in_flight.fetch_sub(1, Ordering::SeqCst); + COMPLETED_QUERIES.inc(); } } - impl QuerySlot { pub fn time_limit(&self) -> usize { let in_flight = self.in_flight.load(Ordering::SeqCst); if in_flight == 0 { - return 100 + return 100; } let time = self.urgency * sqd_polars::POOL.current_num_threads() / in_flight; time.min(100) } - + pub async fn run(self, task: F) -> R where F: FnOnce(&Self) -> R + Send + 'static, - R: Send + 'static + R: Send + 'static, { let (tx, rx) = tokio::sync::oneshot::channel(); @@ -69,7 +84,7 @@ impl QuerySlot { let result = task(&slot); let _ = tx.send(result); }); - + rx.await.expect("task panicked") } -} \ No newline at end of file +} diff --git a/crates/hotblocks/src/query/response.rs b/crates/hotblocks/src/query/response.rs index a0568bb..7f09165 100644 --- a/crates/hotblocks/src/query/response.rs +++ b/crates/hotblocks/src/query/response.rs @@ -1,22 +1,71 @@ use super::executor::{QueryExecutor, QuerySlot}; -use super::running::RunningQuery; +use super::running::{RunningQuery, RunningQueryStats}; use crate::errors::Busy; +use crate::metrics::{ + STREAM_BLOCKS, STREAM_BLOCKS_PER_SECOND, STREAM_BYTES, STREAM_BYTES_PER_SECOND, STREAM_CHUNKS, + STREAM_DURATIONS, +}; use crate::types::DBRef; use anyhow::bail; use bytes::Bytes; use sqd_primitives::BlockRef; use sqd_query::Query; use sqd_storage::db::DatasetId; +use std::time::Duration; use std::time::Instant; +const DEFAULT_QUERY_LIMIT: Duration = Duration::from_secs(10); pub struct QueryResponse { executor: QueryExecutor, runner: Option>, - start: Instant, - finalized_head: Option + finalized_head: Option, + dataset_id: DatasetId, + stats: QueryStreamStats, + time_limit: Duration, } +pub struct QueryStreamStats { + response_chunks: u64, + response_blocks: u64, + response_bytes: u64, + start_time: Instant, +} + +impl QueryStreamStats { + pub fn new() -> Self { + Self { + response_chunks: 0, + response_blocks: 0, + response_bytes: 0, + start_time: Instant::now(), + } + } + + pub fn add_running_stats(&mut self, running_stats: &RunningQueryStats) { + self.response_chunks = self.response_chunks.saturating_add(running_stats.chunks_read); + self.response_blocks = self.response_blocks.saturating_add(running_stats.blocks_read); + self.response_bytes = self.response_bytes.saturating_add(running_stats.total_buffered_bytes); + } + + fn report_metrics(&self, dataset_id: &DatasetId) { + let labels = vec![("dataset_id".to_owned(), dataset_id.as_str().to_owned())]; + + let duration = self.start_time.elapsed().as_secs_f64(); + let bytes = self.response_bytes; + let blocks = self.response_blocks; + let chunks = self.response_chunks; + + STREAM_DURATIONS.get_or_create(&labels).observe(duration); + STREAM_BYTES.get_or_create(&labels).observe(bytes as f64); + STREAM_BLOCKS.get_or_create(&labels).observe(blocks as f64); + STREAM_CHUNKS.get_or_create(&labels).observe(chunks as f64); + STREAM_BYTES_PER_SECOND.observe(bytes as f64 / duration); + STREAM_BLOCKS_PER_SECOND + .get_or_create(&labels) + .observe(blocks as f64 / duration); + } +} impl QueryResponse { pub(super) async fn new( @@ -25,25 +74,30 @@ impl QueryResponse { dataset_id: DatasetId, query: Query, only_finalized: bool, - ) -> anyhow::Result - { + time_limit: Option, + ) -> anyhow::Result { let Some(slot) = executor.get_slot() else { bail!(Busy) }; - let start = Instant::now(); - - let mut runner = slot.run(move |slot| -> anyhow::Result<_> { - let mut runner = RunningQuery::new(db, dataset_id, &query, only_finalized).map(Box::new)?; - next_run(&mut runner, slot)?; - Ok(runner) - }).await?; - + let stats = QueryStreamStats::new(); + let mut runner = slot + .run(move |slot| -> anyhow::Result<_> { + let mut runner = + RunningQuery::new(db, dataset_id, &query, only_finalized).map(Box::new)?; + next_run(&mut runner, slot)?; + Ok(runner) + }) + .await?; + + let time_limit = time_limit.unwrap_or(DEFAULT_QUERY_LIMIT); let response = Self { executor, finalized_head: runner.take_finalized_head(), runner: Some(runner), - start + stats, + dataset_id, + time_limit, }; Ok(response) @@ -55,17 +109,28 @@ impl QueryResponse { pub async fn next_data_pack(&mut self) -> anyhow::Result> { let Some(mut runner) = self.runner.take() else { - return Ok(None) + return Ok(None); }; - if !runner.has_next_chunk() || self.start.elapsed().as_secs() > 10 { - return Ok(Some(runner.finish())) + if !runner.has_next_chunk() { + return Ok(self.finish_with_runner(runner)) + } + + if self.stats.start_time.elapsed() > self.time_limit { + // Client is expected to retry the query based on the data that they have received + tracing::warn!( + "terminate query that has been running for more than {} seconds", + self.time_limit.as_secs() + ); + return Ok(self.finish_with_runner(runner)); } if runner.buffered_bytes() > 0 { let bytes = runner.take_buffered_bytes(); + self.stats.response_bytes = + self.stats.response_bytes.saturating_add(bytes.len() as u64); self.runner = Some(runner); - return Ok(Some(bytes)) + return Ok(Some(bytes)); } let Some(slot) = self.executor.get_slot() else { @@ -73,31 +138,50 @@ impl QueryResponse { bail!(Busy); }; - let (mut runner, result) = slot.run(move |slot| { - let mut runner = runner; - let result = next_run(&mut runner, slot); - (runner, result) - }).await; + let (mut runner, result) = slot + .run(move |slot| { + let mut runner = runner; + let result = next_run(&mut runner, slot); + (runner, result) + }) + .await; if let Err(err) = result { self.runner = Some(runner); - return Err(err) + return Err(err); } - if !runner.has_next_chunk() || self.start.elapsed().as_secs() > 10 { - Ok(Some(runner.finish())) - } else { + if runner.has_next_chunk() { let bytes = runner.take_buffered_bytes(); + self.stats.response_bytes = + self.stats.response_bytes.saturating_add(bytes.len() as u64); self.runner = Some(runner); Ok(Some(bytes)) + } else { + return Ok(self.finish_with_runner(runner)); } } + fn finish_with_runner(&mut self, runner: Box) -> Option { + runner.stats().report_metrics(&self.dataset_id); + self.stats.add_running_stats(runner.stats()); + + Some(runner.finish()) + } + pub fn finish(&mut self) -> Bytes { - self.runner.take().map(|runner| runner.finish()).unwrap_or_default() + self.runner + .take() + .map(|runner| self.finish_with_runner(runner).unwrap()) + .unwrap_or_default() } } +impl Drop for QueryResponse { + fn drop(&mut self) { + self.stats.report_metrics(&self.dataset_id) + } +} fn next_run(runner: &mut RunningQuery, slot: &QuerySlot) -> anyhow::Result<()> { let start = Instant::now(); @@ -109,7 +193,7 @@ fn next_run(runner: &mut RunningQuery, slot: &QuerySlot) -> anyhow::Result<()> { runner.write_next_chunk()?; if !runner.has_next_chunk() || runner.buffered_bytes() > 512 * 1024 { - return Ok(()) + return Ok(()); } elapsed = start.elapsed().as_millis(); @@ -119,7 +203,7 @@ fn next_run(runner: &mut RunningQuery, slot: &QuerySlot) -> anyhow::Result<()> { let next_chunk_eta = next_chunk_eta.min(chunk_time * 5).max(chunk_time / 5); let eta = elapsed + next_chunk_eta; if eta > slot.time_limit() as u128 { - return Ok(()) + return Ok(()); } } -} \ No newline at end of file +} diff --git a/crates/hotblocks/src/query/running.rs b/crates/hotblocks/src/query/running.rs index 87cd0f4..3db15ea 100644 --- a/crates/hotblocks/src/query/running.rs +++ b/crates/hotblocks/src/query/running.rs @@ -1,6 +1,7 @@ use crate::errors::{BlockItemIsNotAvailable, QueryKindMismatch}; use crate::errors::{BlockRangeMissing, QueryIsAboveTheHead}; use crate::query::static_snapshot::{StaticChunkIterator, StaticChunkReader, StaticSnapshot}; +use crate::metrics::{ QUERIED_BYTES, QUERIED_BLOCKS, QUERIED_CHUNKS }; use crate::types::{DBRef, DatasetKind}; use anyhow::{anyhow, bail, ensure}; use bytes::{BufMut, Bytes, BytesMut}; @@ -11,12 +12,35 @@ use sqd_query::{JsonLinesWriter, Plan, Query}; use sqd_storage::db::{Chunk as StorageChunk, DatasetId}; use std::io::Write; - struct LeftOver { chunk: StaticChunkReader, next_block: BlockNumber } +pub struct RunningQueryStats { + pub chunks_read: u64, + pub blocks_read: u64, + pub total_buffered_bytes: u64, +} + +impl RunningQueryStats { + pub fn new() -> Self { + Self { + chunks_read: 0, + blocks_read: 0, + total_buffered_bytes: 0, + } + } + + pub fn report_metrics(&self, dataset_id: &DatasetId) { + let labels = vec![("dataset_id".to_owned(), dataset_id.as_str().to_owned())]; + + QUERIED_BLOCKS.get_or_create(&labels).observe(self.blocks_read as f64); + QUERIED_CHUNKS.get_or_create(&labels).observe(self.chunks_read as f64); + QUERIED_BYTES.get_or_create(&labels).observe(self.total_buffered_bytes as f64); + } + +} pub struct RunningQuery { plan: Plan, @@ -25,7 +49,8 @@ pub struct RunningQuery { next_chunk: Option>, chunk_iterator: StaticChunkIterator, finalized_head: Option, - buf: GzEncoder> + buf: GzEncoder>, + stats: RunningQueryStats, } @@ -74,6 +99,9 @@ impl RunningQuery { last_block: first_chunk.first_block() - 1 } ); + + let mut stats = RunningQueryStats::new(); + stats.chunks_read += 1; let plan = if query.first_block() == first_chunk.first_block() { if let Some(parent_hash) = query.parent_block_hash() { @@ -121,7 +149,8 @@ impl RunningQuery { buf: GzEncoder::new( BytesMut::new().writer(), Compression::fast() - ) + ), + stats, }) } @@ -129,6 +158,10 @@ impl RunningQuery { self.finalized_head.take() } + pub fn stats(&self) -> &RunningQueryStats { + &self.stats + } + pub fn buffered_bytes(&self) -> usize { self.buf.get_ref().get_ref().len() } @@ -153,12 +186,17 @@ impl RunningQuery { /// /// Everything written to the buffer is always well-formed. pub fn write_next_chunk(&mut self) -> anyhow::Result<()> { - let chunk = if let Some(left_over) = self.left_over.take() { - self.plan.set_first_block(left_over.next_block); - left_over.chunk + let (chunk, first_block_queried) = if let Some(left_over) = self.left_over.take() { + let first_block = left_over.next_block; + self.plan.set_first_block(first_block); + (left_over.chunk, first_block) } else { - let chunk = self.next_chunk()?; - self.chunk_iterator.snapshot().create_chunk_reader(chunk) + let storage_chunk = self.next_chunk()?; + // Increment chunks_downloaded when we fetch a new chunk + self.stats.chunks_read += 1; + let chunk = self.chunk_iterator.snapshot().create_chunk_reader(storage_chunk); + let first_block = chunk.first_block(); + (chunk, first_block) }; if self.last_block.map_or(false, |end| end < chunk.last_block()) { @@ -184,11 +222,14 @@ impl RunningQuery { // no matter what, we are moving to the next chunk self.plan.set_first_block(None); self.plan.set_parent_block_hash(None); - + let Some(mut block_writer) = query_result? else { return Ok(()) }; + let blocks_written = block_writer.last_block() - first_block_queried + 1; + self.stats.blocks_read += blocks_written; + if chunk.last_block() > block_writer.last_block() && self.last_block.map_or(true, |end| end > block_writer.last_block()) { @@ -198,6 +239,8 @@ impl RunningQuery { }) } + let bytes_before = self.buf.get_ref().get_ref().len(); + let mut json_lines_writer = JsonLinesWriter::new(&mut self.buf); json_lines_writer @@ -209,7 +252,10 @@ impl RunningQuery { .expect("IO errors are not possible"); self.buf.flush().expect("IO errors are not possible"); - + + let bytes_after = self.buf.get_ref().get_ref().len(); + self.stats.total_buffered_bytes += (bytes_after - bytes_before) as u64; + Ok(()) } @@ -250,4 +296,4 @@ impl RunningQuery { }) .unwrap_or(0) as usize } -} \ No newline at end of file +} diff --git a/crates/hotblocks/src/query/service.rs b/crates/hotblocks/src/query/service.rs index c6bf9ae..401ce50 100644 --- a/crates/hotblocks/src/query/service.rs +++ b/crates/hotblocks/src/query/service.rs @@ -153,8 +153,13 @@ impl QueryService { dataset.dataset_id(), query, finalized, + None ).await } + + pub fn spawn_metrics_reporter(&self, interval: Duration) { + self.executor.spawn_metrics_reporter(interval) + } } @@ -189,4 +194,4 @@ impl<'a> Drop for WaitingSlot<'a> { fn drop(&mut self) { self.waiters.fetch_sub(1, Ordering::SeqCst); } -} \ No newline at end of file +} From b33d38e784366916130e16eebdeb49225e4f0cbf Mon Sep 17 00:00:00 2001 From: Defnull <879658+define-null@users.noreply.github.com> Date: Wed, 14 Jan 2026 13:26:15 +0100 Subject: [PATCH 2/7] Rename dataset_id to dataset_name --- crates/hotblocks/src/api.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/hotblocks/src/api.rs b/crates/hotblocks/src/api.rs index 168c70f..2d9bc91 100644 --- a/crates/hotblocks/src/api.rs +++ b/crates/hotblocks/src/api.rs @@ -133,7 +133,7 @@ impl ResponseWithMetadata { pub fn with_dataset_id(mut self, id: DatasetId) -> Self { self.labels .0 - .push(("dataset_id".to_string(), id.as_str().to_owned())); + .push(("dataset_name".to_string(), id.as_str().to_owned())); self } From e98ba2d9df15f8033ce00e09aa9685b381bf4429 Mon Sep 17 00:00:00 2001 From: Vasilii Demidenok <879658+define-null@users.noreply.github.com> Date: Wed, 14 Jan 2026 22:12:13 +0100 Subject: [PATCH 3/7] Update crates/hotblocks/src/api.rs --- crates/hotblocks/src/api.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/hotblocks/src/api.rs b/crates/hotblocks/src/api.rs index 2d9bc91..5c7e4ec 100644 --- a/crates/hotblocks/src/api.rs +++ b/crates/hotblocks/src/api.rs @@ -340,7 +340,7 @@ async fn get_retention( ) -> impl IntoResponse { ResponseWithMetadata::new() .with_dataset_id(dataset_id.clone()) - .with_endpoint("/head") + .with_endpoint("/retention") .with_response(|| { json_ok! { get_dataset!(app, dataset_id).get_retention() From 2ac1c623a447cd0d5067c283e5cc03b03ec8274c Mon Sep 17 00:00:00 2001 From: Vasilii Demidenok <879658+define-null@users.noreply.github.com> Date: Wed, 14 Jan 2026 22:12:59 +0100 Subject: [PATCH 4/7] Update crates/hotblocks/src/api.rs --- crates/hotblocks/src/api.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/hotblocks/src/api.rs b/crates/hotblocks/src/api.rs index 5c7e4ec..b9ab86f 100644 --- a/crates/hotblocks/src/api.rs +++ b/crates/hotblocks/src/api.rs @@ -355,7 +355,7 @@ async fn set_retention( ) -> impl IntoResponse { ResponseWithMetadata::new() .with_dataset_id(dataset_id.clone()) - .with_endpoint("/head") + .with_endpoint("/retention") .with_response(|| { let ds = get_dataset!(app, dataset_id); if app.api_controlled_datasets.contains(&dataset_id) { From 84de309789cc84609f7543e94529df263b66fc0f Mon Sep 17 00:00:00 2001 From: Defnull <879658+define-null@users.noreply.github.com> Date: Wed, 14 Jan 2026 22:56:16 +0100 Subject: [PATCH 5/7] Dropped query_bytes --- crates/hotblocks/src/metrics.rs | 18 ++---- crates/hotblocks/src/query/response.rs | 38 +++++++----- crates/hotblocks/src/query/running.rs | 83 ++++++++++++-------------- 3 files changed, 66 insertions(+), 73 deletions(-) diff --git a/crates/hotblocks/src/metrics.rs b/crates/hotblocks/src/metrics.rs index 96270d4..caf61b3 100644 --- a/crates/hotblocks/src/metrics.rs +++ b/crates/hotblocks/src/metrics.rs @@ -70,8 +70,6 @@ lazy_static::lazy_static! { pub static ref STREAM_BLOCKS_PER_SECOND: Family = Family::new_with_constructor(|| Histogram::new(exponential_buckets(1., 3.0, 20))); - pub static ref QUERIED_BYTES: Family = - Family::new_with_constructor(|| Histogram::new(exponential_buckets(1000., 2.0, 20))); pub static ref QUERIED_BLOCKS: Family = Family::new_with_constructor(|| Histogram::new(exponential_buckets(1., 2.0, 30))); pub static ref QUERIED_CHUNKS: Family = @@ -213,17 +211,17 @@ pub fn build_metrics_registry(db: DBRef, datasets: Vec) -> Registry { registry.register( "stream_bytes", - "Numbers of bytes per stream", + "Number of bytes per stream", STREAM_BYTES.clone(), ); registry.register( "stream_blocks", - "Numbers of blocks per stream", + "Number of blocks per stream", STREAM_BLOCKS.clone(), ); registry.register( "stream_chunks", - "Numbers of chunks per stream", + "Number of chunks per stream", STREAM_CHUNKS.clone(), ); registry.register( @@ -241,20 +239,14 @@ pub fn build_metrics_registry(db: DBRef, datasets: Vec) -> Registry { "Durations of completed streams", STREAM_DURATIONS.clone(), ); - - registry.register( - "queried_bytes", - "Numbers of bytes queried per running query", - QUERIED_BYTES.clone(), - ); registry.register( "queried_blocks", - "Numbers of blocks per running query", + "Number of blocks per running query", QUERIED_BLOCKS.clone(), ); registry.register( "queried_chunks", - "Numbers of chunks per running query", + "Number of chunks per running query", QUERIED_CHUNKS.clone(), ); registry.register( diff --git a/crates/hotblocks/src/query/response.rs b/crates/hotblocks/src/query/response.rs index 7f09165..c5d0943 100644 --- a/crates/hotblocks/src/query/response.rs +++ b/crates/hotblocks/src/query/response.rs @@ -43,27 +43,32 @@ impl QueryStreamStats { } pub fn add_running_stats(&mut self, running_stats: &RunningQueryStats) { - self.response_chunks = self.response_chunks.saturating_add(running_stats.chunks_read); - self.response_blocks = self.response_blocks.saturating_add(running_stats.blocks_read); - self.response_bytes = self.response_bytes.saturating_add(running_stats.total_buffered_bytes); + self.response_chunks = self + .response_chunks + .saturating_add(running_stats.chunks_read); + self.response_blocks = self + .response_blocks + .saturating_add(running_stats.blocks_read); } fn report_metrics(&self, dataset_id: &DatasetId) { let labels = vec![("dataset_id".to_owned(), dataset_id.as_str().to_owned())]; let duration = self.start_time.elapsed().as_secs_f64(); - let bytes = self.response_bytes; - let blocks = self.response_blocks; - let chunks = self.response_chunks; + let bytes = self.response_bytes as f64; + let blocks = self.response_blocks as f64; + let chunks = self.response_chunks as f64; STREAM_DURATIONS.get_or_create(&labels).observe(duration); - STREAM_BYTES.get_or_create(&labels).observe(bytes as f64); - STREAM_BLOCKS.get_or_create(&labels).observe(blocks as f64); - STREAM_CHUNKS.get_or_create(&labels).observe(chunks as f64); - STREAM_BYTES_PER_SECOND.observe(bytes as f64 / duration); - STREAM_BLOCKS_PER_SECOND - .get_or_create(&labels) - .observe(blocks as f64 / duration); + STREAM_BYTES.get_or_create(&labels).observe(bytes); + STREAM_BLOCKS.get_or_create(&labels).observe(blocks); + STREAM_CHUNKS.get_or_create(&labels).observe(chunks); + if duration > 0.0 { + STREAM_BYTES_PER_SECOND.observe(bytes / duration); + STREAM_BLOCKS_PER_SECOND + .get_or_create(&labels) + .observe(blocks / duration); + } } } @@ -113,7 +118,7 @@ impl QueryResponse { }; if !runner.has_next_chunk() { - return Ok(self.finish_with_runner(runner)) + return Ok(self.finish_with_runner(runner)); } if self.stats.start_time.elapsed() > self.time_limit { @@ -165,8 +170,9 @@ impl QueryResponse { fn finish_with_runner(&mut self, runner: Box) -> Option { runner.stats().report_metrics(&self.dataset_id); self.stats.add_running_stats(runner.stats()); - - Some(runner.finish()) + let bytes = runner.finish(); + self.stats.response_bytes = self.stats.response_bytes.saturating_add(bytes.len() as u64); + Some(bytes) } pub fn finish(&mut self) -> Bytes { diff --git a/crates/hotblocks/src/query/running.rs b/crates/hotblocks/src/query/running.rs index 3db15ea..31835b5 100644 --- a/crates/hotblocks/src/query/running.rs +++ b/crates/hotblocks/src/query/running.rs @@ -1,12 +1,12 @@ use crate::errors::{BlockItemIsNotAvailable, QueryKindMismatch}; use crate::errors::{BlockRangeMissing, QueryIsAboveTheHead}; +use crate::metrics::{QUERIED_BLOCKS, QUERIED_CHUNKS}; use crate::query::static_snapshot::{StaticChunkIterator, StaticChunkReader, StaticSnapshot}; -use crate::metrics::{ QUERIED_BYTES, QUERIED_BLOCKS, QUERIED_CHUNKS }; use crate::types::{DBRef, DatasetKind}; use anyhow::{anyhow, bail, ensure}; use bytes::{BufMut, Bytes, BytesMut}; -use flate2::write::GzEncoder; use flate2::Compression; +use flate2::write::GzEncoder; use sqd_primitives::{BlockNumber, BlockRef}; use sqd_query::{JsonLinesWriter, Plan, Query}; use sqd_storage::db::{Chunk as StorageChunk, DatasetId}; @@ -14,13 +14,12 @@ use std::io::Write; struct LeftOver { chunk: StaticChunkReader, - next_block: BlockNumber + next_block: BlockNumber, } pub struct RunningQueryStats { pub chunks_read: u64, pub blocks_read: u64, - pub total_buffered_bytes: u64, } impl RunningQueryStats { @@ -28,18 +27,19 @@ impl RunningQueryStats { Self { chunks_read: 0, blocks_read: 0, - total_buffered_bytes: 0, } } pub fn report_metrics(&self, dataset_id: &DatasetId) { let labels = vec![("dataset_id".to_owned(), dataset_id.as_str().to_owned())]; - QUERIED_BLOCKS.get_or_create(&labels).observe(self.blocks_read as f64); - QUERIED_CHUNKS.get_or_create(&labels).observe(self.chunks_read as f64); - QUERIED_BYTES.get_or_create(&labels).observe(self.total_buffered_bytes as f64); + QUERIED_BLOCKS + .get_or_create(&labels) + .observe(self.blocks_read as f64); + QUERIED_CHUNKS + .get_or_create(&labels) + .observe(self.chunks_read as f64); } - } pub struct RunningQuery { @@ -53,15 +53,13 @@ pub struct RunningQuery { stats: RunningQueryStats, } - impl RunningQuery { pub fn new( db: DBRef, dataset_id: DatasetId, query: &Query, only_finalized: bool, - ) -> anyhow::Result - { + ) -> anyhow::Result { let snapshot = StaticSnapshot::new(db); let finalized_head = match snapshot.get_label(dataset_id)? { @@ -79,13 +77,9 @@ impl RunningQuery { } }; - let mut chunk_iterator = StaticChunkIterator::new( - snapshot, - dataset_id, - query.first_block(), - None - ); - + let mut chunk_iterator = + StaticChunkIterator::new(snapshot, dataset_id, query.first_block(), None); + let Some(first_chunk) = chunk_iterator.next().transpose()? else { bail!(QueryIsAboveTheHead { finalized_head: None @@ -102,7 +96,7 @@ impl RunningQuery { let mut stats = RunningQueryStats::new(); stats.chunks_read += 1; - + let plan = if query.first_block() == first_chunk.first_block() { if let Some(parent_hash) = query.parent_block_hash() { ensure!( @@ -146,10 +140,7 @@ impl RunningQuery { next_chunk: Some(Ok(first_chunk)), chunk_iterator, finalized_head, - buf: GzEncoder::new( - BytesMut::new().writer(), - Compression::fast() - ), + buf: GzEncoder::new(BytesMut::new().writer(), Compression::fast()), stats, }) } @@ -177,7 +168,7 @@ impl RunningQuery { .into_inner() .freeze() } - + pub fn has_next_chunk(&self) -> bool { self.next_chunk.is_some() || self.left_over.is_some() } @@ -194,12 +185,18 @@ impl RunningQuery { let storage_chunk = self.next_chunk()?; // Increment chunks_downloaded when we fetch a new chunk self.stats.chunks_read += 1; - let chunk = self.chunk_iterator.snapshot().create_chunk_reader(storage_chunk); + let chunk = self + .chunk_iterator + .snapshot() + .create_chunk_reader(storage_chunk); let first_block = chunk.first_block(); (chunk, first_block) }; - if self.last_block.map_or(false, |end| end < chunk.last_block()) { + if self + .last_block + .map_or(false, |end| end < chunk.last_block()) + { let last_block = self.last_block; self.plan.set_last_block(last_block); } else { @@ -214,7 +211,7 @@ impl RunningQuery { item_name: err.table_name, first_block: chunk.first_block(), last_block: chunk.last_block() - }) + }); } err }); @@ -224,23 +221,23 @@ impl RunningQuery { self.plan.set_parent_block_hash(None); let Some(mut block_writer) = query_result? else { - return Ok(()) + return Ok(()); }; let blocks_written = block_writer.last_block() - first_block_queried + 1; self.stats.blocks_read += blocks_written; if chunk.last_block() > block_writer.last_block() - && self.last_block.map_or(true, |end| end > block_writer.last_block()) + && self + .last_block + .map_or(true, |end| end > block_writer.last_block()) { self.left_over = Some(LeftOver { chunk, - next_block: block_writer.last_block() + 1 + next_block: block_writer.last_block() + 1, }) } - let bytes_before = self.buf.get_ref().get_ref().len(); - let mut json_lines_writer = JsonLinesWriter::new(&mut self.buf); json_lines_writer @@ -253,9 +250,6 @@ impl RunningQuery { self.buf.flush().expect("IO errors are not possible"); - let bytes_after = self.buf.get_ref().get_ref().len(); - self.stats.total_buffered_bytes += (bytes_after - bytes_before) as u64; - Ok(()) } @@ -264,14 +258,16 @@ impl RunningQuery { bail!("no more chunks left") }; - self.next_chunk = self.chunk_iterator.next() + self.next_chunk = self + .chunk_iterator + .next() .transpose() .map(|maybe_next_chunk| { let next_chunk = maybe_next_chunk?; let is_continuous = chunk.last_block() + 1 == next_chunk.first_block(); - let is_requested = self.last_block.map_or(true, |end| { - next_chunk.first_block() <= end - }); + let is_requested = self + .last_block + .map_or(true, |end| next_chunk.first_block() <= end); if is_continuous && is_requested { Some(next_chunk) } else { @@ -285,10 +281,9 @@ impl RunningQuery { /// Size of the next chunk (in blocks) pub fn next_chunk_size(&self) -> usize { - self.left_over.as_ref() - .map(|lo| { - lo.chunk.last_block() - lo.chunk.first_block() + 1 - }) + self.left_over + .as_ref() + .map(|lo| lo.chunk.last_block() - lo.chunk.first_block() + 1) .or_else(|| { let chunk = self.next_chunk.as_ref()?.as_ref().ok()?; let size = chunk.last_block() - chunk.first_block() + 1; From db5a7897876ec19da1fbbedee7697f9de689de6d Mon Sep 17 00:00:00 2001 From: Defnull <879658+define-null@users.noreply.github.com> Date: Thu, 15 Jan 2026 11:51:27 +0100 Subject: [PATCH 6/7] Apply suggestions from review --- Cargo.lock | 1 - crates/hotblocks/Cargo.toml | 1 - crates/hotblocks/src/cli.rs | 32 +++++---- crates/hotblocks/src/metrics.rs | 91 +++++++++++++++----------- crates/hotblocks/src/query/executor.rs | 32 +++++---- crates/hotblocks/src/query/mod.rs | 4 +- crates/hotblocks/src/query/response.rs | 4 -- crates/hotblocks/src/query/service.rs | 7 +- 8 files changed, 92 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d18b4b6..becafa4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4895,7 +4895,6 @@ dependencies = [ "clap", "flate2", "futures", - "lazy_static", "ouroboros", "prometheus-client 0.24.0", "serde", diff --git a/crates/hotblocks/Cargo.toml b/crates/hotblocks/Cargo.toml index 41fad8e..c001fea 100644 --- a/crates/hotblocks/Cargo.toml +++ b/crates/hotblocks/Cargo.toml @@ -11,7 +11,6 @@ bytes = { workspace = true } chrono = { workspace = true, features = ["std"] } clap = { workspace = true, features = ["derive"] } flate2 = { workspace = true } -lazy_static = "1.4.0" futures = { workspace = true } ouroboros = { workspace = true } prometheus-client = { workspace = true } diff --git a/crates/hotblocks/src/cli.rs b/crates/hotblocks/src/cli.rs index bb1e5d3..861b9ec 100644 --- a/crates/hotblocks/src/cli.rs +++ b/crates/hotblocks/src/cli.rs @@ -1,5 +1,6 @@ use crate::data_service::{DataService, DataServiceRef}; use crate::dataset_config::{DatasetConfig, RetentionConfig}; +use crate::metrics::DatasetMetricsCollector; use crate::query::{QueryService, QueryServiceRef}; use crate::types::DBRef; use anyhow::Context; @@ -7,8 +8,6 @@ use clap::Parser; use sqd_storage::db::{DatabaseSettings, DatasetId}; use std::collections::BTreeSet; use std::sync::Arc; -use std::time::Duration; - #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -23,7 +22,7 @@ pub struct CLI { #[arg(long, value_name = "MB", default_value = "256")] pub data_cache_size: usize, - + /// Max number of threads to use for query tasks #[arg(long, value_name = "N")] pub query_threads: Option, @@ -49,16 +48,14 @@ pub struct CLI { pub rocksdb_disable_direct_io: bool, } - pub struct App { pub db: DBRef, pub data_service: DataServiceRef, pub query_service: QueryServiceRef, pub api_controlled_datasets: BTreeSet, - pub metrics_registry: prometheus_client::registry::Registry + pub metrics_registry: prometheus_client::registry::Registry, } - impl CLI { pub async fn build_app(&self) -> anyhow::Result { let datasets = DatasetConfig::read_config_file(&self.datasets) @@ -72,15 +69,15 @@ impl CLI { .map(Arc::new) .context("failed to open rocksdb database")?; - let metrics_registry = crate::metrics::build_metrics_registry( - db.clone(), - datasets.keys().copied().collect() - ); + let mut metrics_registry = crate::metrics::build_metrics_registry(); + metrics_registry.register_collector(Box::new(DatasetMetricsCollector { + db: db.clone(), + datasets: datasets.keys().copied().collect(), + })); - let api_controlled_datasets = datasets.iter() - .filter_map(|(id, cfg)| { - (cfg.retention_strategy == RetentionConfig::Api).then_some(*id) - }) + let api_controlled_datasets = datasets + .iter() + .filter_map(|(id, cfg)| (cfg.retention_strategy == RetentionConfig::Api).then_some(*id)) .collect(); let data_service = DataService::start(db.clone(), datasets) @@ -98,17 +95,18 @@ impl CLI { if let Some(ms) = self.query_urgency { builder.set_urgency(ms); } + let service = builder.build(); + metrics_registry.register_collector(Box::new(service.metrics_collector())); - Arc::new(builder.build()) + Arc::new(service) }; - query_service.spawn_metrics_reporter(Duration::from_secs(5)); Ok(App { db, data_service, query_service, api_controlled_datasets, - metrics_registry + metrics_registry, }) } } diff --git a/crates/hotblocks/src/metrics.rs b/crates/hotblocks/src/metrics.rs index caf61b3..c664951 100644 --- a/crates/hotblocks/src/metrics.rs +++ b/crates/hotblocks/src/metrics.rs @@ -1,3 +1,4 @@ +use crate::query::QueryExecutorCollector; use crate::types::DBRef; use anyhow::bail; use prometheus_client::collector::Collector; @@ -14,6 +15,7 @@ use prometheus_client::metrics::{ use prometheus_client::registry::Registry; use sqd_storage::db::{DatasetId, ReadSnapshot}; use std::fmt::Write; +use std::sync::LazyLock; use std::time::Duration; use tracing::error; @@ -47,34 +49,37 @@ fn buckets(start: f64, count: usize) -> impl Iterator { .take(count) } -lazy_static::lazy_static! { - pub static ref HTTP_STATUS: Family = Default::default(); - pub static ref HTTP_TTFB: Family = - Family::new_with_constructor(|| Histogram::new(buckets(0.001, 20))); - - pub static ref QUERY_ERROR_TOO_MANY_TASKS: Counter = Default::default(); - pub static ref QUERY_ERROR_TOO_MANY_DATA_WAITERS: Counter = Default::default(); - - pub static ref ACTIVE_QUERIES: Gauge = Default::default(); - pub static ref COMPLETED_QUERIES: Counter = Default::default(); - - pub static ref STREAM_DURATIONS: Family = - Family::new_with_constructor(|| Histogram::new(exponential_buckets(0.01, 2.0, 20))); - pub static ref STREAM_BYTES: Family = - Family::new_with_constructor(|| Histogram::new(exponential_buckets(1000., 2.0, 20))); - pub static ref STREAM_BLOCKS: Family = - Family::new_with_constructor(|| Histogram::new(exponential_buckets(1., 2.0, 30))); - pub static ref STREAM_CHUNKS: Family = - Family::new_with_constructor(|| Histogram::new(buckets(1., 20))); - pub static ref STREAM_BYTES_PER_SECOND: Histogram = Histogram::new(exponential_buckets(100., 3.0, 20)); - pub static ref STREAM_BLOCKS_PER_SECOND: Family = - Family::new_with_constructor(|| Histogram::new(exponential_buckets(1., 3.0, 20))); - - pub static ref QUERIED_BLOCKS: Family = - Family::new_with_constructor(|| Histogram::new(exponential_buckets(1., 2.0, 30))); - pub static ref QUERIED_CHUNKS: Family = - Family::new_with_constructor(|| Histogram::new(buckets(1., 20))); -} +pub static HTTP_STATUS: LazyLock> = LazyLock::new(Default::default); +pub static HTTP_TTFB: LazyLock> = + LazyLock::new(|| Family::new_with_constructor(|| Histogram::new(buckets(0.001, 20)))); + +pub static QUERY_ERROR_TOO_MANY_TASKS: LazyLock = LazyLock::new(Default::default); +pub static QUERY_ERROR_TOO_MANY_DATA_WAITERS: LazyLock = LazyLock::new(Default::default); + +pub static COMPLETED_QUERIES: LazyLock = LazyLock::new(Default::default); + +pub static STREAM_DURATIONS: LazyLock> = LazyLock::new(|| { + Family::new_with_constructor(|| Histogram::new(exponential_buckets(0.01, 2.0, 20))) +}); +pub static STREAM_BYTES: LazyLock> = LazyLock::new(|| { + Family::new_with_constructor(|| Histogram::new(exponential_buckets(1000., 2.0, 20))) +}); +pub static STREAM_BLOCKS: LazyLock> = LazyLock::new(|| { + Family::new_with_constructor(|| Histogram::new(exponential_buckets(1., 2.0, 30))) +}); +pub static STREAM_CHUNKS: LazyLock> = + LazyLock::new(|| Family::new_with_constructor(|| Histogram::new(buckets(1., 20)))); +pub static STREAM_BYTES_PER_SECOND: LazyLock = + LazyLock::new(|| Histogram::new(exponential_buckets(100., 3.0, 20))); +pub static STREAM_BLOCKS_PER_SECOND: LazyLock> = LazyLock::new(|| { + Family::new_with_constructor(|| Histogram::new(exponential_buckets(1., 3.0, 20))) +}); + +pub static QUERIED_BLOCKS: LazyLock> = LazyLock::new(|| { + Family::new_with_constructor(|| Histogram::new(exponential_buckets(1., 2.0, 30))) +}); +pub static QUERIED_CHUNKS: LazyLock> = + LazyLock::new(|| Family::new_with_constructor(|| Histogram::new(buckets(1., 20)))); pub fn report_query_too_many_tasks_error() { QUERY_ERROR_TOO_MANY_TASKS.inc(); @@ -92,9 +97,9 @@ pub fn report_http_response(labels: &Vec<(String, String)>, to_first_byte: Durat } #[derive(Debug)] -struct DatasetMetricsCollector { - db: DBRef, - datasets: Vec, +pub struct DatasetMetricsCollector { + pub db: DBRef, + pub datasets: Vec, } impl Collector for DatasetMetricsCollector { @@ -182,7 +187,7 @@ fn collect_dataset_metrics( Ok(()) } -pub fn build_metrics_registry(db: DBRef, datasets: Vec) -> Registry { +pub fn build_metrics_registry() -> Registry { let mut top_registry = Registry::default(); let registry = top_registry.sub_registry_with_prefix("hotblocks"); @@ -249,17 +254,27 @@ pub fn build_metrics_registry(db: DBRef, datasets: Vec) -> Registry { "Number of chunks per running query", QUERIED_CHUNKS.clone(), ); - registry.register( - "active_queries", - "Number of active queries", - ACTIVE_QUERIES.clone(), - ); registry.register( "completed_queries", "Number of completed queries", COMPLETED_QUERIES.clone(), ); - top_registry.register_collector(Box::new(DatasetMetricsCollector { db, datasets })); top_registry } + +impl Collector for QueryExecutorCollector { + fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { + let active_queries = self.get_active_queries(); + + encoder + .encode_descriptor( + "hotblocks_active_queries", + "Number of currently active queries", + None, + MetricType::Gauge, + )? + .encode_gauge(&active_queries)?; + Ok(()) + } +} diff --git a/crates/hotblocks/src/query/executor.rs b/crates/hotblocks/src/query/executor.rs index 78d27e2..d4d534d 100644 --- a/crates/hotblocks/src/query/executor.rs +++ b/crates/hotblocks/src/query/executor.rs @@ -1,7 +1,6 @@ -use crate::metrics::{ACTIVE_QUERIES, COMPLETED_QUERIES, report_query_too_many_tasks_error}; +use crate::metrics::{COMPLETED_QUERIES, report_query_too_many_tasks_error}; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use tokio::time::{self, Duration}; #[derive(Clone)] pub struct QueryExecutor { @@ -35,18 +34,8 @@ impl QueryExecutor { } } - pub fn spawn_metrics_reporter(&self, interval: Duration) { - let active_count = self.in_flight.clone(); - - tokio::spawn(async move { - let mut ticker = time::interval(interval); - - loop { - ticker.tick().await; - let active_queries = active_count.load(Ordering::SeqCst); - ACTIVE_QUERIES.set(active_queries as i64); - } - }); + pub fn metrics_collector(&self) -> QueryExecutorCollector { + QueryExecutorCollector::new(self.in_flight.clone()) } } @@ -88,3 +77,18 @@ impl QuerySlot { rx.await.expect("task panicked") } } + +#[derive(Debug)] +pub struct QueryExecutorCollector { + in_flight: Arc, +} + +impl QueryExecutorCollector { + pub fn new(in_flight: Arc) -> Self { + Self { in_flight } + } + + pub fn get_active_queries(&self) -> u64 { + self.in_flight.load(Ordering::SeqCst) as u64 + } +} diff --git a/crates/hotblocks/src/query/mod.rs b/crates/hotblocks/src/query/mod.rs index 32efc64..028f2ff 100644 --- a/crates/hotblocks/src/query/mod.rs +++ b/crates/hotblocks/src/query/mod.rs @@ -4,6 +4,6 @@ mod running; mod service; mod static_snapshot; - +pub use executor::QueryExecutorCollector; pub use response::*; -pub use service::*; \ No newline at end of file +pub use service::*; diff --git a/crates/hotblocks/src/query/response.rs b/crates/hotblocks/src/query/response.rs index c5d0943..857120a 100644 --- a/crates/hotblocks/src/query/response.rs +++ b/crates/hotblocks/src/query/response.rs @@ -123,10 +123,6 @@ impl QueryResponse { if self.stats.start_time.elapsed() > self.time_limit { // Client is expected to retry the query based on the data that they have received - tracing::warn!( - "terminate query that has been running for more than {} seconds", - self.time_limit.as_secs() - ); return Ok(self.finish_with_runner(runner)); } diff --git a/crates/hotblocks/src/query/service.rs b/crates/hotblocks/src/query/service.rs index 401ce50..9c1b3c2 100644 --- a/crates/hotblocks/src/query/service.rs +++ b/crates/hotblocks/src/query/service.rs @@ -3,6 +3,7 @@ use super::response::QueryResponse; use crate::dataset_controller::DatasetController; use crate::errors::{Busy, QueryIsAboveTheHead, QueryKindMismatch}; use crate::types::{DBRef, DatasetKind}; +use crate::query::QueryExecutorCollector; use anyhow::{bail, ensure}; use sqd_query::Query; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -157,9 +158,9 @@ impl QueryService { ).await } - pub fn spawn_metrics_reporter(&self, interval: Duration) { - self.executor.spawn_metrics_reporter(interval) - } + pub fn metrics_collector(&self) -> QueryExecutorCollector { + self.executor.metrics_collector() + } } From 072d22b0e0d89174449994956024e7b0a7659ff5 Mon Sep 17 00:00:00 2001 From: Defnull <879658+define-null@users.noreply.github.com> Date: Thu, 15 Jan 2026 13:44:57 +0100 Subject: [PATCH 7/7] Remove unused import --- crates/hotblocks/src/metrics.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/hotblocks/src/metrics.rs b/crates/hotblocks/src/metrics.rs index c664951..b713435 100644 --- a/crates/hotblocks/src/metrics.rs +++ b/crates/hotblocks/src/metrics.rs @@ -9,7 +9,6 @@ use prometheus_client::metrics::{ MetricType, counter::Counter, family::Family, - gauge::Gauge, histogram::{Histogram, exponential_buckets}, }; use prometheus_client::registry::Registry;