diff --git a/Cargo.lock b/Cargo.lock index db95f252d..17d5b0436 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1776,6 +1776,17 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crossterm" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "829d955a0bb380ef178a640b91779e3987da38c9aea133b20614cfed8cdea9c6" +dependencies = [ + "bitflags 2.11.0", + "parking_lot", + "rustix 0.38.44", +] + [[package]] name = "crunchy" version = "0.2.4" @@ -2090,6 +2101,7 @@ dependencies = [ "futures", "hashi", "hashi-screener", + "hashi-telemetry", "hashi-types", "nix", "prometheus", @@ -2842,6 +2854,7 @@ dependencies = [ "fastcrypto-tbls", "fjall", "futures", + "hashi-telemetry", "hashi-types", "hex", "jiff", @@ -2896,6 +2909,7 @@ dependencies = [ "aws-sdk-s3", "aws-smithy-mocks", "bitcoin", + "hashi-telemetry", "hashi-types", "hpke", "k256", @@ -2919,6 +2933,7 @@ dependencies = [ "corepc-client", "e2e-tests", "hashi-guardian", + "hashi-telemetry", "hashi-types", "hex", "hpke", @@ -2942,6 +2957,7 @@ dependencies = [ "axum", "backon", "bitcoin", + "hashi-telemetry", "hashi-types", "lru", "prometheus", @@ -2960,6 +2976,15 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "hashi-telemetry" +version = "0.1.0" +dependencies = [ + "crossterm", + "tracing", + "tracing-subscriber", +] + [[package]] name = "hashi-types" version = "0.0.0" diff --git a/Cargo.toml b/Cargo.toml index cf8e7eff1..33c8668ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,8 @@ tabled = "0.16" anyhow = "1.0.98" toml = "0.9.2" futures = "0.3.31" -tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } +tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] } +crossterm = { version = "0.28", default-features = false } age = { version = "0.11", features = ["cli-common", "plugin"] } rust-embed = { version = "8", features = ["deterministic-timestamps"] } rustls = { version = "0.23.29", default-features = false, features = ["ring"] } diff --git a/crates/e2e-tests/Cargo.toml b/crates/e2e-tests/Cargo.toml index 8dd15cd7d..8acaf5466 100644 --- a/crates/e2e-tests/Cargo.toml +++ b/crates/e2e-tests/Cargo.toml @@ -33,6 +33,7 @@ colored.workspace = true rand.workspace = true nix = { version = "0.26.4", features = ["signal"] } tracing-subscriber.workspace = true +hashi-telemetry = { path = "../hashi-telemetry" } [[bin]] name = "hashi-localnet" diff --git a/crates/e2e-tests/src/main.rs b/crates/e2e-tests/src/main.rs index 8f81443a4..ec44e3c13 100644 --- a/crates/e2e-tests/src/main.rs +++ b/crates/e2e-tests/src/main.rs @@ -279,13 +279,10 @@ async fn cmd_start( tracing::level_filters::LevelFilter::OFF }; - tracing_subscriber::fmt() - .with_env_filter( - tracing_subscriber::EnvFilter::builder() - .with_default_directive(default_level.into()) - .from_env_lossy(), - ) + let _guard = hashi_telemetry::TelemetryConfig::new() + .with_default_level(default_level) .with_target(false) + .with_env() .init(); use std::io::Write; diff --git a/crates/hashi-guardian/Cargo.toml b/crates/hashi-guardian/Cargo.toml index a3a119969..59ad86667 100644 --- a/crates/hashi-guardian/Cargo.toml +++ b/crates/hashi-guardian/Cargo.toml @@ -11,6 +11,7 @@ serde.workspace = true serde_json.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +hashi-telemetry = { path = "../hashi-telemetry" } tonic.workspace = true hashi-types = { path = "../hashi-types" } diff --git a/crates/hashi-guardian/src/main.rs b/crates/hashi-guardian/src/main.rs index 32512a924..4bfb54b97 100644 --- a/crates/hashi-guardian/src/main.rs +++ b/crates/hashi-guardian/src/main.rs @@ -102,7 +102,11 @@ pub struct EphemeralKeyPairs { /// SETUP_MODE=false: all endpoints except setup_new_key are enabled. #[tokio::main] async fn main() -> Result<()> { - init_tracing_subscriber(true); + let _guard = hashi_telemetry::TelemetryConfig::new() + .with_file_line(true) + .with_service_name("hashi-guardian") + .with_env() + .init(); // Check if SETUP_MODE is enabled (defaults to false) let setup_mode = std::env::var("SETUP_MODE") @@ -537,27 +541,6 @@ impl Enclave { } } -// --------------------------------- -// Tracing utilities -// --------------------------------- - -/// Initialize tracing subscriber with optional file/line number logging -pub fn init_tracing_subscriber(with_file_line: bool) { - let mut builder = tracing_subscriber::FmtSubscriber::builder().with_env_filter( - tracing_subscriber::EnvFilter::builder() - .with_default_directive(tracing::level_filters::LevelFilter::INFO.into()) - .from_env_lossy(), - ); - - if with_file_line { - builder = builder.with_file(true).with_line_number(true); - } - - let subscriber = builder.finish(); - tracing::subscriber::set_global_default(subscriber) - .expect("unable to initialize tracing subscriber"); -} - // --------------------------------- // Tests and related utilities // --------------------------------- diff --git a/crates/hashi-monitor/Cargo.toml b/crates/hashi-monitor/Cargo.toml index 95f7c4ef5..2c2edec8f 100644 --- a/crates/hashi-monitor/Cargo.toml +++ b/crates/hashi-monitor/Cargo.toml @@ -22,6 +22,7 @@ tokio.workspace = true tonic.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +hashi-telemetry = { path = "../hashi-telemetry" } [dev-dependencies] e2e-tests = { path = "../e2e-tests" } diff --git a/crates/hashi-monitor/src/main.rs b/crates/hashi-monitor/src/main.rs index 3e900d9a9..f9c6e099d 100644 --- a/crates/hashi-monitor/src/main.rs +++ b/crates/hashi-monitor/src/main.rs @@ -51,7 +51,11 @@ enum Command { #[tokio::main] async fn main() -> anyhow::Result<()> { - init_tracing_subscriber(false); + let _guard = hashi_telemetry::TelemetryConfig::new() + .with_service_name("hashi-monitor") + .with_target(false) + .with_env() + .init(); let cli = Cli::parse(); @@ -78,22 +82,3 @@ async fn main() -> anyhow::Result<()> { Ok(()) } - -pub fn init_tracing_subscriber(with_file_line: bool) { - let mut builder = tracing_subscriber::FmtSubscriber::builder().with_env_filter( - tracing_subscriber::EnvFilter::builder() - .with_default_directive(tracing::level_filters::LevelFilter::INFO.into()) - .from_env_lossy(), - ); - - if with_file_line { - builder = builder - .with_file(true) - .with_line_number(true) - .with_target(false); - } - - let subscriber = builder.finish(); - tracing::subscriber::set_global_default(subscriber) - .expect("unable to initialize tracing subscriber"); -} diff --git a/crates/hashi-screener/Cargo.toml b/crates/hashi-screener/Cargo.toml index 230dffb50..720a63059 100644 --- a/crates/hashi-screener/Cargo.toml +++ b/crates/hashi-screener/Cargo.toml @@ -11,6 +11,7 @@ thiserror.workspace = true tokio.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +hashi-telemetry = { path = "../hashi-telemetry" } prometheus.workspace = true axum.workspace = true sui-http.workspace = true diff --git a/crates/hashi-screener/src/main.rs b/crates/hashi-screener/src/main.rs index fb16c3ab0..1102d2084 100644 --- a/crates/hashi-screener/src/main.rs +++ b/crates/hashi-screener/src/main.rs @@ -226,20 +226,6 @@ impl ScreenerService for ScreenerServiceImpl { } } -fn init_tracing_subscriber() { - let subscriber = tracing_subscriber::FmtSubscriber::builder() - .with_env_filter( - tracing_subscriber::EnvFilter::builder() - .with_default_directive(tracing::level_filters::LevelFilter::INFO.into()) - .from_env_lossy(), - ) - .with_file(true) - .with_line_number(true) - .finish(); - tracing::subscriber::set_global_default(subscriber) - .expect("unable to initialize tracing subscriber"); -} - fn start_metrics_server(registry: prometheus::Registry) -> sui_http::ServerHandle { let addr: SocketAddr = "0.0.0.0:9184".parse().unwrap(); info!("Prometheus metrics server listening on {}", addr); @@ -266,7 +252,11 @@ async fn metrics_handler( #[tokio::main] async fn main() -> Result<()> { - init_tracing_subscriber(); + let _guard = hashi_telemetry::TelemetryConfig::new() + .with_file_line(true) + .with_service_name("hashi-screener") + .with_env() + .init(); let api_key = env::var("MERKLE_SCIENCE_API_KEY") .map_err(|_| anyhow::anyhow!("MERKLE_SCIENCE_API_KEY environment variable is not set"))?; diff --git a/crates/hashi-telemetry/Cargo.toml b/crates/hashi-telemetry/Cargo.toml new file mode 100644 index 000000000..b3e475c56 --- /dev/null +++ b/crates/hashi-telemetry/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "hashi-telemetry" +version = "0.1.0" +edition = "2024" +publish = false + +[dependencies] +tracing.workspace = true +tracing-subscriber.workspace = true +crossterm.workspace = true diff --git a/crates/hashi-telemetry/src/lib.rs b/crates/hashi-telemetry/src/lib.rs new file mode 100644 index 000000000..be90eb31a --- /dev/null +++ b/crates/hashi-telemetry/src/lib.rs @@ -0,0 +1,159 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! Shared telemetry configuration for all hashi binaries. +//! +//! Provides a [`TelemetryConfig`] builder that sets up a `tracing` subscriber with: +//! - **JSON output** for production (Kubernetes / Grafana / Loki) +//! - **Human-readable TTY output** for local development +//! - Automatic format detection based on whether stderr is a TTY +//! - `RUST_LOG` environment variable support via [`tracing_subscriber::EnvFilter`] + +use std::io::stderr; + +use crossterm::tty::IsTty; +use tracing::level_filters::LevelFilter; +use tracing_subscriber::EnvFilter; +use tracing_subscriber::Layer; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; + +/// Configuration for the tracing subscriber. +/// +/// # Examples +/// +/// ```no_run +/// use hashi_telemetry::TelemetryConfig; +/// +/// // Production server: auto-detect JSON vs TTY, INFO default +/// let _guard = TelemetryConfig::new() +/// .with_file_line(true) +/// .with_service_name("hashi") +/// .with_env() +/// .init(); +/// +/// // CLI tool: WARN default, no target, verbose override +/// let _guard = TelemetryConfig::new() +/// .with_default_level(tracing::level_filters::LevelFilter::WARN) +/// .with_target(false) +/// .with_env() +/// .init(); +/// ``` +pub struct TelemetryConfig { + /// Base log level when `RUST_LOG` is not set. Default: `INFO`. + default_level: LevelFilter, + /// Force JSON (`Some(true)`) or TTY (`Some(false)`) output. + /// `None` means auto-detect: JSON if stderr is not a TTY, TTY otherwise. + json: Option, + /// Show `file:line` in log output. Default: `false`. + file_line: bool, + /// Show module target path in log output. Default: `true`. + target: bool, + /// Service name added as a field in JSON output. + service_name: Option, +} + +impl Default for TelemetryConfig { + fn default() -> Self { + Self::new() + } +} + +impl TelemetryConfig { + /// Create a new config with sensible defaults (INFO level, auto-detect format). + pub fn new() -> Self { + Self { + default_level: LevelFilter::INFO, + json: None, + file_line: false, + target: true, + service_name: None, + } + } + + /// Set the default log level (used when `RUST_LOG` is not set). + pub fn with_default_level(mut self, level: LevelFilter) -> Self { + self.default_level = level; + self + } + + /// Force JSON (`true`) or TTY (`false`) output, overriding auto-detection. + pub fn with_json(mut self, json: bool) -> Self { + self.json = Some(json); + self + } + + /// Show `file:line` in log output. + pub fn with_file_line(mut self, enabled: bool) -> Self { + self.file_line = enabled; + self + } + + /// Show module target path in log output. + pub fn with_target(mut self, enabled: bool) -> Self { + self.target = enabled; + self + } + + /// Set a service name (included as a field in JSON output). + pub fn with_service_name(mut self, name: impl Into) -> Self { + self.service_name = Some(name.into()); + self + } + + /// Read configuration overrides from environment variables: + /// - `RUST_LOG_JSON`: if set, forces JSON output + pub fn with_env(mut self) -> Self { + if std::env::var("RUST_LOG_JSON").is_ok() { + self.json = Some(true); + } + self + } + + /// Build and install the tracing subscriber. + /// + /// Returns a [`TelemetryGuard`] that must be held alive for the duration of the program. + pub fn init(self) -> TelemetryGuard { + let use_json = match self.json { + Some(true) => true, + Some(false) => false, + // Auto-detect: use JSON when stderr is not a TTY (e.g. Kubernetes pods). + None => !stderr().is_tty(), + }; + + let env_filter = EnvFilter::builder() + .with_default_directive(self.default_level.into()) + .from_env_lossy(); + + if use_json { + let fmt_layer = tracing_subscriber::fmt::layer() + .with_file(true) + .with_line_number(true) + .with_target(self.target) + .json() + .with_filter(env_filter); + + tracing_subscriber::registry().with(fmt_layer).init(); + } else { + let fmt_layer = tracing_subscriber::fmt::layer() + .with_file(self.file_line) + .with_line_number(self.file_line) + .with_target(self.target) + .with_ansi(stderr().is_tty()) + .with_filter(env_filter); + + tracing_subscriber::registry().with(fmt_layer).init(); + } + + TelemetryGuard { _private: () } + } +} + +/// Guard that must be held alive for the duration of the program. +/// +/// Future additions (non-blocking writer flush, OpenTelemetry shutdown) +/// will be handled in its `Drop` implementation. +#[must_use = "dropping the guard immediately will lose buffered log output"] +pub struct TelemetryGuard { + _private: (), +} diff --git a/crates/hashi/Cargo.toml b/crates/hashi/Cargo.toml index 5a8cb3e61..edc2c1b97 100644 --- a/crates/hashi/Cargo.toml +++ b/crates/hashi/Cargo.toml @@ -35,6 +35,7 @@ tower.workspace = true tokio.workspace = true prometheus.workspace = true tracing.workspace = true +hashi-telemetry = { path = "../hashi-telemetry" } bin-version.workspace = true prometheus-closure-metric.workspace = true diff --git a/crates/hashi/src/btc_monitor/monitor.rs b/crates/hashi/src/btc_monitor/monitor.rs index 0a81e3fc4..044fdea1a 100644 --- a/crates/hashi/src/btc_monitor/monitor.rs +++ b/crates/hashi/src/btc_monitor/monitor.rs @@ -217,6 +217,7 @@ impl Monitor { } /// Run the main event loop, returning the reason it exited. + #[tracing::instrument(name = "btc_monitor", skip_all)] async fn run_event_loop( &mut self, kyoto_client: &mut kyoto::Client, diff --git a/crates/hashi/src/cli/mod.rs b/crates/hashi/src/cli/mod.rs index 877b2a6a7..8f895563a 100644 --- a/crates/hashi/src/cli/mod.rs +++ b/crates/hashi/src/cli/mod.rs @@ -659,19 +659,17 @@ fn parse_metadata(args: Vec) -> Vec<(String, String)> { } fn init_tracing(verbose: bool) { - let filter = if verbose { - tracing_subscriber::EnvFilter::builder() - .with_default_directive(tracing::level_filters::LevelFilter::DEBUG.into()) - .from_env_lossy() + let level = if verbose { + tracing::level_filters::LevelFilter::DEBUG } else { - tracing_subscriber::EnvFilter::builder() - .with_default_directive(tracing::level_filters::LevelFilter::WARN.into()) - .from_env_lossy() + tracing::level_filters::LevelFilter::WARN }; - tracing_subscriber::fmt() - .with_env_filter(filter) + // Guard is intentionally leaked — the CLI runs to completion in main(). + let _guard = hashi_telemetry::TelemetryConfig::new() + .with_default_level(level) .with_target(false) + .with_env() .init(); } diff --git a/crates/hashi/src/deposits.rs b/crates/hashi/src/deposits.rs index b4f158ab9..24ac92bb7 100644 --- a/crates/hashi/src/deposits.rs +++ b/crates/hashi/src/deposits.rs @@ -47,6 +47,7 @@ pub fn derive_deposit_address( } impl Hashi { + #[tracing::instrument(level = "info", skip_all, fields(deposit_id = %deposit_request.id))] pub async fn validate_and_sign_deposit_confirmation( &self, deposit_request: &DepositRequest, @@ -55,6 +56,7 @@ impl Hashi { self.sign_deposit_confirmation(deposit_request) } + #[tracing::instrument(level = "debug", skip_all, fields(deposit_id = %deposit_request.id))] pub async fn validate_deposit_request( &self, deposit_request: &DepositRequest, @@ -68,6 +70,7 @@ impl Hashi { /// Run AML/Sanctions checks for the deposit request. /// If no screener client is configured, checks are skipped. + #[tracing::instrument(level = "debug", skip_all, fields(deposit_id = %deposit_request.id))] async fn screen_deposit( &self, deposit_request: &DepositRequest, @@ -105,6 +108,7 @@ impl Hashi { } /// Validate that the deposit request exists on Sui + #[tracing::instrument(level = "debug", skip_all, fields(deposit_id = %deposit_request.id))] fn validate_deposit_request_on_sui( &self, deposit_request: &DepositRequest, @@ -144,6 +148,15 @@ impl Hashi { } /// Validate that there is a txout on Bitcoin that matches the deposit request + #[tracing::instrument( + level = "debug", + skip_all, + fields( + deposit_id = %deposit_request.id, + bitcoin_txid = %deposit_request.utxo.id.txid, + vout = deposit_request.utxo.id.vout, + ), + )] async fn validate_deposit_request_on_bitcoin( &self, deposit_request: &DepositRequest, diff --git a/crates/hashi/src/grpc/bridge_service.rs b/crates/hashi/src/grpc/bridge_service.rs index 66a84f3ab..1772682a8 100644 --- a/crates/hashi/src/grpc/bridge_service.rs +++ b/crates/hashi/src/grpc/bridge_service.rs @@ -45,20 +45,26 @@ impl BridgeService for HttpService { } /// Validate and sign a confirmation of a bitcoin deposit request. + #[tracing::instrument( + level = "info", + skip_all, + fields(deposit_id = tracing::field::Empty, caller = tracing::field::Empty), + )] async fn sign_deposit_confirmation( &self, request: Request, ) -> Result, Status> { - authenticate_caller(&request)?; + let caller = authenticate_caller(&request)?; + tracing::Span::current().record("caller", tracing::field::display(&caller)); let deposit_request = parse_deposit_request(request.get_ref()) .map_err(|e| Status::invalid_argument(e.to_string()))?; + tracing::Span::current().record("deposit_id", tracing::field::display(&deposit_request.id)); let member_signature = self .inner .validate_and_sign_deposit_confirmation(&deposit_request) .await .map_err(|e| Status::failed_precondition(e.to_string()))?; tracing::info!( - deposit_request_id = %deposit_request.id, utxo_txid = %deposit_request.utxo.id.txid, utxo_vout = deposit_request.utxo.id.vout, amount = deposit_request.utxo.amount, @@ -70,42 +76,53 @@ impl BridgeService for HttpService { } /// Step 1: Validate and sign approval for a batch of unapproved withdrawal requests. + #[tracing::instrument( + level = "info", + skip_all, + fields(request_id = tracing::field::Empty, caller = tracing::field::Empty), + )] async fn sign_withdrawal_request_approval( &self, request: Request, ) -> Result, Status> { - authenticate_caller(&request)?; + let caller = authenticate_caller(&request)?; + tracing::Span::current().record("caller", tracing::field::display(&caller)); let approval = parse_withdrawal_request_approval(request.get_ref()) .map_err(|e| Status::invalid_argument(e.to_string()))?; + tracing::Span::current() + .record("request_id", tracing::field::display(&approval.request_id)); let member_signature = self .inner .validate_and_sign_withdrawal_request_approval(&approval) .await .map_err(|e| Status::failed_precondition(e.to_string()))?; - tracing::info!( - request_id = %approval.request_id, - "Signed withdrawal request approval", - ); + tracing::info!("Signed withdrawal request approval"); Ok(Response::new(SignWithdrawalRequestApprovalResponse { member_signature: Some(member_signature), })) } /// Step 2: Validate and sign a proposed withdrawal transaction construction. + #[tracing::instrument( + level = "info", + skip_all, + fields(bitcoin_txid = tracing::field::Empty, caller = tracing::field::Empty), + )] async fn sign_withdrawal_tx_construction( &self, request: Request, ) -> Result, Status> { - authenticate_caller(&request)?; + let caller = authenticate_caller(&request)?; + tracing::Span::current().record("caller", tracing::field::display(&caller)); let approval = parse_withdrawal_tx_commitment(request.get_ref()) .map_err(|e| Status::invalid_argument(e.to_string()))?; + tracing::Span::current().record("bitcoin_txid", tracing::field::display(&approval.txid)); let member_signature = self .inner .validate_and_sign_withdrawal_tx_commitment(&approval) .await .map_err(|e| Status::failed_precondition(e.to_string()))?; tracing::info!( - txid = %approval.txid, requests = approval.request_ids.len(), "Signed withdrawal tx construction", ); @@ -114,23 +131,33 @@ impl BridgeService for HttpService { })) } + #[tracing::instrument( + level = "info", + skip_all, + fields(withdrawal_txn_id = tracing::field::Empty, caller = tracing::field::Empty), + )] async fn sign_withdrawal_transaction( &self, request: Request, ) -> Result, Status> { - authenticate_caller(&request)?; + let caller = authenticate_caller(&request)?; + tracing::Span::current().record("caller", tracing::field::display(&caller)); let withdrawal_txn_id = Address::from_bytes(&request.get_ref().withdrawal_txn_id) .map_err(|e| Status::invalid_argument(format!("invalid withdrawal_txn_id: {e}")))?; - tracing::info!(withdrawal_txn_id = %withdrawal_txn_id, "sign_withdrawal_transaction called"); + tracing::Span::current().record( + "withdrawal_txn_id", + tracing::field::display(&withdrawal_txn_id), + ); + tracing::info!("sign_withdrawal_transaction called"); let signatures = self .inner .validate_and_sign_withdrawal_tx(&withdrawal_txn_id) .await .map_err(|e| { - tracing::error!(withdrawal_txn_id = %withdrawal_txn_id, "sign_withdrawal_transaction failed: {e}"); + tracing::error!("sign_withdrawal_transaction failed: {e}"); Status::failed_precondition(e.to_string()) })?; - tracing::info!(withdrawal_txn_id = %withdrawal_txn_id, "sign_withdrawal_transaction succeeded"); + tracing::info!("sign_withdrawal_transaction succeeded"); Ok(Response::new(SignWithdrawalTransactionResponse { signatures_by_input: signatures .iter() @@ -140,41 +167,55 @@ impl BridgeService for HttpService { } /// Step 3: Validate and sign the BLS certificate over witness signatures. + #[tracing::instrument( + level = "info", + skip_all, + fields(withdrawal_id = tracing::field::Empty, caller = tracing::field::Empty), + )] async fn sign_withdrawal_tx_signing( &self, request: Request, ) -> Result, Status> { - authenticate_caller(&request)?; + let caller = authenticate_caller(&request)?; + tracing::Span::current().record("caller", tracing::field::display(&caller)); let message = parse_withdrawal_tx_signing(request.get_ref()) .map_err(|e| Status::invalid_argument(e.to_string()))?; + tracing::Span::current().record( + "withdrawal_id", + tracing::field::display(&message.withdrawal_id), + ); let member_signature = self .inner .validate_and_sign_withdrawal_tx_signing(&message) .map_err(|e| Status::failed_precondition(e.to_string()))?; - tracing::info!( - withdrawal_id = %message.withdrawal_id, - "Signed withdrawal tx signing", - ); + tracing::info!("Signed withdrawal tx signing"); Ok(Response::new(SignWithdrawalTxSigningResponse { member_signature: Some(member_signature), })) } + #[tracing::instrument( + level = "info", + skip_all, + fields(withdrawal_txn_id = tracing::field::Empty, caller = tracing::field::Empty), + )] async fn sign_withdrawal_confirmation( &self, request: Request, ) -> Result, Status> { - authenticate_caller(&request)?; + let caller = authenticate_caller(&request)?; + tracing::Span::current().record("caller", tracing::field::display(&caller)); let withdrawal_txn_id = Address::from_bytes(&request.get_ref().withdrawal_txn_id) .map_err(|e| Status::invalid_argument(format!("invalid withdrawal_txn_id: {e}")))?; + tracing::Span::current().record( + "withdrawal_txn_id", + tracing::field::display(&withdrawal_txn_id), + ); let member_signature = self .inner .sign_withdrawal_confirmation(&withdrawal_txn_id) .map_err(|e| Status::failed_precondition(e.to_string()))?; - tracing::info!( - withdrawal_txn_id = %withdrawal_txn_id, - "Signed withdrawal confirmation", - ); + tracing::info!("Signed withdrawal confirmation"); Ok(Response::new(SignWithdrawalConfirmationResponse { member_signature: Some(member_signature), })) diff --git a/crates/hashi/src/leader/mod.rs b/crates/hashi/src/leader/mod.rs index c8688c5c4..7a2475910 100644 --- a/crates/hashi/src/leader/mod.rs +++ b/crates/hashi/src/leader/mod.rs @@ -101,6 +101,7 @@ impl LeaderService { }) } + #[tracing::instrument(name = "leader", skip_all)] async fn run(mut self) { info!("Starting leader service"); @@ -359,19 +360,20 @@ impl LeaderService { self.check_delete_expired_deposit_requests(&deposit_requests, checkpoint_timestamp_ms); } + #[tracing::instrument(level = "info", skip_all, fields(deposit_id = %deposit_request.id))] async fn process_deposit_request( inner: Arc, deposit_retry_tracker: RetryTracker, deposit_request: DepositRequest, checkpoint_timestamp_ms: u64, ) -> anyhow::Result<()> { - info!(deposit_request_id = %deposit_request.id, "Processing deposit request"); + info!("Processing deposit request"); // Validate deposit_request before asking for signatures match inner.validate_deposit_request(&deposit_request).await { Ok(()) => {} Err(e) => { - debug!(request_id = ?deposit_request.id, "Deposit validation failed: {e}"); + debug!("Deposit validation failed: {e}"); let kind = e.kind(); inner .metrics @@ -387,7 +389,7 @@ impl LeaderService { } } - info!(deposit_request_id = %deposit_request.id, "Deposit request validated successfully"); + info!("Deposit request validated successfully"); let proto_request = deposit_request_to_proto(&deposit_request); let members = inner @@ -421,7 +423,7 @@ impl LeaderService { while let Some(result) = sig_tasks.join_next().await { let Ok(Some(sig)) = result else { continue }; if let Err(e) = aggregator.add_signature(sig) { - error!(deposit_request_id = %deposit_request.id, "Failed to add deposit signature: {e}"); + error!("Failed to add deposit signature: {e}"); } if aggregator.weight() >= required_weight { break; @@ -447,10 +449,10 @@ impl LeaderService { .with_label_values(&["confirm_deposit", "success"]) .inc(); inner.metrics.deposits_confirmed_total.inc(); - info!(deposit_request_id = %deposit_request.id, "Successfully submitted deposit confirmation"); + info!("Successfully submitted deposit confirmation"); }) .inspect_err(|e| { - error!(deposit_request_id = %deposit_request.id, "Failed to submit deposit confirmation: {e}"); + error!("Failed to submit deposit confirmation: {e}"); inner .metrics .sui_tx_submissions_total @@ -461,16 +463,14 @@ impl LeaderService { Ok(()) } + #[tracing::instrument(level = "debug", skip_all, fields(validator = %member.validator_address()))] async fn request_deposit_confirmation_signature( inner: &Arc, proto_request: SignDepositConfirmationRequest, member: &CommitteeMember, ) -> Option { let validator_address = member.validator_address(); - trace!( - "Requesting deposit confirmation signature from {}", - validator_address - ); + trace!("Requesting deposit confirmation signature"); let mut rpc_client = inner .onchain_state() @@ -574,6 +574,7 @@ impl LeaderService { })); } + #[tracing::instrument(level = "info", skip_all, fields(batch_size = to_process.len()))] async fn process_unapproved_withdrawal_requests_task( inner: Arc, retry_tracker: RetryTracker, @@ -661,10 +662,7 @@ impl LeaderService { retry_tracker.record_failure(kind, request_id, checkpoint_timestamp_ms); } if let Err(err) = &result { - error!( - withdrawal_request_id = %request_id, - "Withdrawal approval failed: {err:#}" - ); + error!(request_id = %request_id, "Withdrawal approval failed: {err:#}"); } (request_id, result) @@ -690,6 +688,7 @@ impl LeaderService { Ok(()) } + #[tracing::instrument(level = "info", skip_all, fields(request_id = %request.id))] async fn process_unapproved_withdrawal_request( inner: Arc, retry_tracker: RetryTracker, @@ -729,7 +728,7 @@ impl LeaderService { let mut aggregator = BlsSignatureAggregator::new(committee, approval); if let Err(e) = aggregator.add_signature(local_sig) { - error!(withdrawal_request_id = %request.id, "Failed to add local approval signature: {e}"); + error!("Failed to add local approval signature: {e}"); } // Fan out signature requests to remote members in parallel. @@ -750,7 +749,7 @@ impl LeaderService { while let Some(result) = sig_tasks.join_next().await { let Ok(Some(sig)) = result else { continue }; if let Err(e) = aggregator.add_signature(sig) { - error!(withdrawal_request_id = %request.id, "Failed to add approval signature: {e}"); + error!("Failed to add approval signature: {e}"); } if aggregator.weight() >= required_weight { break; @@ -769,14 +768,14 @@ impl LeaderService { request.id, checkpoint_timestamp_ms, ); - error!(withdrawal_request_id = %request.id, "Insufficient approval signatures: weight {weight} < {required_weight}"); + error!("Insufficient approval signatures: weight {weight} < {required_weight}"); return Ok(None); } match aggregator.finish() { Ok(signed) => Ok(Some((request.id, signed.committee_signature().clone()))), Err(e) => { - error!(withdrawal_request_id = %request.id, "Failed to build approval certificate: {e}"); + error!("Failed to build approval certificate: {e}"); Ok(None) } } @@ -937,6 +936,7 @@ impl LeaderService { })); } + #[tracing::instrument(level = "info", skip_all, fields(batch_size = requests.len()))] async fn process_approved_withdrawal_request_batch( inner: Arc, retry_tracker: GlobalRetryTracker, @@ -1117,6 +1117,7 @@ impl LeaderService { } } + #[tracing::instrument(level = "info", skip_all, fields(withdrawal_txn_id = %txn.id))] async fn process_unsigned_withdrawal_txn( inner: Arc, txn: WithdrawalTransaction, @@ -1127,7 +1128,6 @@ impl LeaderService { let current_epoch = inner.onchain_state().epoch(); if txn.epoch != current_epoch { info!( - withdrawal_txn_id = %txn.id, "Withdrawal transaction from epoch {} (current {}), reassigning presig indices", txn.epoch, current_epoch, ); @@ -1135,14 +1135,11 @@ impl LeaderService { executor .execute_allocate_presigs_for_withdrawal_txn(txn.id) .await?; - info!( - withdrawal_txn_id = %txn.id, - "Presig indices reassigned, will sign on next checkpoint" - ); + info!("Presig indices reassigned, will sign on next checkpoint"); // Return and let the next checkpoint iteration pick up the updated state. return Ok(()); } - info!(withdrawal_txn_id = %txn.id, "MPC signing withdrawal transaction"); + info!("MPC signing withdrawal transaction"); let members = inner .onchain_state() @@ -1282,25 +1279,33 @@ impl LeaderService { /// Check BTC tx status, broadcast/re-broadcast if needed, confirm when /// enough BTC confirmations are reached. + #[tracing::instrument(level = "info", skip_all, fields(withdrawal_txn_id = %txn.id, bitcoin_txid))] async fn handle_signed_withdrawal( inner: Arc, txn: WithdrawalTransaction, ) -> anyhow::Result<()> { let confirmation_threshold = inner.onchain_state().bitcoin_confirmation_threshold(); let txid: bitcoin::Txid = txn.txid.into(); + tracing::Span::current().record("bitcoin_txid", tracing::field::display(&txid)); match inner.btc_monitor().get_transaction_status(txid).await { Ok(TxStatus::Confirmed { confirmations }) if confirmations >= confirmation_threshold => { - info!(withdrawal_txn_id = %txn.id, bitcoin_txid = %txid, confirmations, "Withdrawal tx confirmed, proceeding to on-chain confirmation"); + info!( + confirmations, + "Withdrawal tx confirmed, proceeding to on-chain confirmation" + ); Self::confirm_withdrawal_on_sui(&inner, &txn).await?; } Ok(TxStatus::Confirmed { confirmations }) => { - debug!(withdrawal_txn_id = %txn.id, bitcoin_txid = %txid, confirmations, confirmation_threshold, "Withdrawal tx waiting for more confirmations"); + debug!( + confirmations, + confirmation_threshold, "Withdrawal tx waiting for more confirmations" + ); } Ok(TxStatus::InMempool) => { - debug!(withdrawal_txn_id = %txn.id, bitcoin_txid = %txid, "Withdrawal tx in mempool, waiting for confirmations"); + debug!("Withdrawal tx in mempool, waiting for confirmations"); } Ok(TxStatus::NotFound) => { Self::rebuild_and_broadcast_withdrawal_btc_tx(&inner, &txn, txid).await; @@ -1317,27 +1322,28 @@ impl LeaderService { /// Rebuild a fully signed Bitcoin transaction from on-chain WithdrawalTransaction /// data (stored witness signatures) and broadcast it to the Bitcoin network. + #[tracing::instrument(level = "info", skip_all, fields(withdrawal_txn_id = %txn.id, bitcoin_txid = %txid))] async fn rebuild_and_broadcast_withdrawal_btc_tx( inner: &Arc, txn: &WithdrawalTransaction, txid: bitcoin::Txid, ) { - warn!(withdrawal_txn_id = %txn.id, bitcoin_txid = %txid, "Withdrawal tx not found, re-broadcasting from on-chain signatures"); + warn!("Withdrawal tx not found, re-broadcasting from on-chain signatures"); let tx = match Self::rebuild_signed_tx_from_onchain(inner, txn) { Ok(tx) => tx, Err(e) => { - error!(withdrawal_txn_id = %txn.id, bitcoin_txid = %txid, "Failed to rebuild signed withdrawal tx: {e}"); + error!("Failed to rebuild signed withdrawal tx: {e}"); return; } }; match inner.btc_monitor().broadcast_transaction(tx).await { Ok(()) => { - info!(withdrawal_txn_id = %txn.id, bitcoin_txid = %txid, "Re-broadcast withdrawal tx"); + info!("Re-broadcast withdrawal tx"); } Err(e) => { - error!(withdrawal_txn_id = %txn.id, bitcoin_txid = %txid, "Failed to re-broadcast withdrawal tx: {e}"); + error!("Failed to re-broadcast withdrawal tx: {e}"); } } } @@ -1384,6 +1390,7 @@ impl LeaderService { Ok(tx) } + #[tracing::instrument(level = "info", skip_all, fields(withdrawal_txn_id = %txn.id))] async fn confirm_withdrawal_on_sui( inner: &Arc, txn: &WithdrawalTransaction, @@ -1417,6 +1424,7 @@ impl LeaderService { Ok(()) } + #[tracing::instrument(level = "debug", skip_all, fields(withdrawal_txn_id = %withdrawal_txn_id))] async fn collect_withdrawal_confirmation_signature( inner: &Arc, withdrawal_txn_id: Address, @@ -1446,7 +1454,7 @@ impl LeaderService { while let Some(result) = sig_tasks.join_next().await { let Ok(Some(sig)) = result else { continue }; if let Err(e) = aggregator.add_signature(sig) { - error!(withdrawal_txn_id = %withdrawal_txn_id, "Failed to add withdrawal confirmation signature: {e}"); + error!("Failed to add withdrawal confirmation signature: {e}"); } if aggregator.weight() >= required_weight { break; @@ -1464,16 +1472,14 @@ impl LeaderService { Ok(aggregator.finish()?.into_parts().0) } + #[tracing::instrument(level = "debug", skip_all, fields(validator = %member.validator_address()))] async fn request_withdrawal_tx_commitment_signature( inner: &Arc, proto_request: SignWithdrawalTxConstructionRequest, member: &CommitteeMember, ) -> Option { let validator_address = member.validator_address(); - trace!( - "Requesting withdrawal approval signature from {}", - validator_address - ); + trace!("Requesting withdrawal tx commitment signature"); let mut rpc_client = inner .onchain_state() @@ -1516,16 +1522,14 @@ impl LeaderService { .ok() } + #[tracing::instrument(level = "debug", skip_all, fields(validator = %member.validator_address()))] async fn request_withdrawal_approval_signature( inner: &Arc, proto_request: SignWithdrawalRequestApprovalRequest, member: &CommitteeMember, ) -> Option { let validator_address = member.validator_address(); - trace!( - "Requesting withdrawal request approval signature from {}", - validator_address - ); + trace!("Requesting withdrawal request approval signature"); let mut rpc_client = inner .onchain_state() @@ -1568,16 +1572,14 @@ impl LeaderService { .ok() } + #[tracing::instrument(level = "debug", skip_all, fields(validator = %member.validator_address()))] async fn request_withdrawal_tx_signing_signature( inner: &Arc, proto_request: SignWithdrawalTxSigningRequest, member: &CommitteeMember, ) -> Option { let validator_address = member.validator_address(); - trace!( - "Requesting withdrawal tx signing signature from {}", - validator_address - ); + trace!("Requesting withdrawal tx signing signature"); let mut rpc_client = inner .onchain_state() @@ -1620,16 +1622,14 @@ impl LeaderService { .ok() } + #[tracing::instrument(level = "debug", skip_all, fields(validator = %member.validator_address()))] async fn request_withdrawal_tx_signature( inner: &Arc, withdrawal_txn_id: &Address, member: &CommitteeMember, ) -> anyhow::Result> { let validator_address = member.validator_address(); - trace!( - "Requesting withdrawal tx signature from {}", - validator_address - ); + trace!("Requesting withdrawal tx signature"); let mut rpc_client = inner .onchain_state() @@ -1703,16 +1703,14 @@ impl LeaderService { } } + #[tracing::instrument(level = "debug", skip_all, fields(validator = %member.validator_address()))] async fn request_withdrawal_confirmation_signature( inner: &Arc, withdrawal_txn_id: Address, member: &CommitteeMember, ) -> Option { let validator_address = member.validator_address(); - trace!( - "Requesting withdrawal confirmation signature from {}", - validator_address - ); + trace!("Requesting withdrawal confirmation signature"); let mut rpc_client = inner .onchain_state() diff --git a/crates/hashi/src/main.rs b/crates/hashi/src/main.rs index a7badceb9..a1f68ad61 100644 --- a/crates/hashi/src/main.rs +++ b/crates/hashi/src/main.rs @@ -158,7 +158,11 @@ async fn main() -> anyhow::Result<()> { } async fn run_server(config_path: Option) -> anyhow::Result<()> { - init_tracing_subscriber(); + let _guard = hashi_telemetry::TelemetryConfig::new() + .with_file_line(true) + .with_service_name("hashi") + .with_env() + .init(); tracing::info!("welcome to hashi"); @@ -193,17 +197,3 @@ async fn run_server(config_path: Option) -> anyhow::Result<( tracing::info!("hashi shutting down; goodbye"); Ok(()) } - -fn init_tracing_subscriber() { - let subscriber = ::tracing_subscriber::FmtSubscriber::builder() - .with_env_filter( - tracing_subscriber::EnvFilter::builder() - .with_default_directive(tracing::level_filters::LevelFilter::INFO.into()) - .from_env_lossy(), - ) - .with_file(true) - .with_line_number(true) - .finish(); - ::tracing::subscriber::set_global_default(subscriber) - .expect("unable to initialize tracing subscriber"); -} diff --git a/crates/hashi/src/mpc/service.rs b/crates/hashi/src/mpc/service.rs index 5965393bd..b4c08bd98 100644 --- a/crates/hashi/src/mpc/service.rs +++ b/crates/hashi/src/mpc/service.rs @@ -102,6 +102,7 @@ impl MpcService { }) } + #[tracing::instrument(name = "mpc_service", skip_all)] async fn run(mut self) { if let Some(epoch) = self.get_pending_epoch_change() { self.handle_reconfig(epoch).await; @@ -275,6 +276,7 @@ impl MpcService { Ok(output) } + #[tracing::instrument(level = "info", skip_all, fields(target_epoch))] async fn run_dkg(&self, target_epoch: u64) -> anyhow::Result { let onchain_state = self.inner.onchain_state().clone(); let mpc_manager = self @@ -642,6 +644,7 @@ impl MpcService { Ok(()) } + #[tracing::instrument(level = "info", skip_all, fields(target_epoch))] async fn run_key_rotation(&self, target_epoch: u64) -> anyhow::Result { let onchain_state = self.inner.onchain_state().clone(); let mpc_manager = self diff --git a/crates/hashi/src/onchain/watcher.rs b/crates/hashi/src/onchain/watcher.rs index d41d34ef9..71a57fe4e 100644 --- a/crates/hashi/src/onchain/watcher.rs +++ b/crates/hashi/src/onchain/watcher.rs @@ -27,6 +27,7 @@ use crate::onchain::types::ProposalType; use crate::onchain::types::WithdrawalRequest; use hashi_types::move_types::HashiEvent; +#[tracing::instrument(name = "watcher", skip_all)] pub async fn watcher(mut client: Client, state: OnchainState, metrics: Option>) { let subscription_read_mask = FieldMask::from_paths([ Checkpoint::path_builder().sequence_number(), diff --git a/crates/hashi/src/withdrawals.rs b/crates/hashi/src/withdrawals.rs index 65eca3f31..3ef640a5c 100644 --- a/crates/hashi/src/withdrawals.rs +++ b/crates/hashi/src/withdrawals.rs @@ -104,6 +104,7 @@ pub struct WithdrawalConfirmation { impl Hashi { // --- Step 1: Request approval (lightweight) --- + #[tracing::instrument(level = "info", skip_all, fields(request_id = %approval.request_id))] pub async fn validate_and_sign_withdrawal_request_approval( &self, approval: &WithdrawalRequestApproval, @@ -132,6 +133,7 @@ impl Hashi { // --- Step 2: Construction approval (with UTXO selection) --- + #[tracing::instrument(level = "info", skip_all, fields(bitcoin_txid = %approval.txid))] pub async fn validate_and_sign_withdrawal_tx_commitment( &self, approval: &WithdrawalTxCommitment, @@ -140,6 +142,7 @@ impl Hashi { self.sign_withdrawal_tx_commitment(approval) } + #[tracing::instrument(level = "debug", skip_all, fields(bitcoin_txid = %approval.txid))] pub async fn validate_withdrawal_tx_commitment( &self, approval: &WithdrawalTxCommitment, @@ -396,6 +399,7 @@ impl Hashi { // --- Step 3: Sign withdrawal (store witness signatures on-chain) --- + #[tracing::instrument(level = "info", skip_all, fields(withdrawal_id = %message.withdrawal_id))] pub fn validate_and_sign_withdrawal_tx_signing( &self, message: &WithdrawalTxSigning, @@ -496,6 +500,7 @@ impl Hashi { // --- MPC BTC tx signing --- + #[tracing::instrument(level = "info", skip_all, fields(withdrawal_txn_id = %withdrawal_txn_id))] pub async fn validate_and_sign_withdrawal_tx( &self, withdrawal_txn_id: &Address, @@ -532,6 +537,11 @@ impl Hashi { } /// Produce MPC Schnorr signatures for an unsigned withdrawal transaction. + #[tracing::instrument( + level = "debug", + skip_all, + fields(withdrawal_txn_id = %txn.id, input_count = txn.inputs.len()), + )] async fn mpc_sign_withdrawal_tx( &self, txn: &crate::onchain::types::WithdrawalTransaction, @@ -706,6 +716,7 @@ impl Hashi { /// UTXOs using the batching-aware coin selection algorithm, build the /// unsigned BTC tx, and return a `WithdrawalTxCommitment` covering the /// selected requests. + #[tracing::instrument(level = "debug", skip_all, fields(request_count = requests.len()))] pub async fn build_withdrawal_tx_commitment( &self, requests: &[WithdrawalRequest], @@ -835,6 +846,7 @@ impl Hashi { /// Run AML/Sanctions checks for a withdrawal request. /// If no screener client is configured, checks are skipped. + #[tracing::instrument(level = "debug", skip_all, fields(request_id = %request.id))] pub(crate) async fn screen_withdrawal( &self, request: &WithdrawalRequest,