Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion crates/e2e-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,15 @@ mod tests {
.map(|node| {
let signing_manager = node.hashi().signing_manager();
let onchain_state = node.hashi().onchain_state().clone();
let p2p_channel = hashi::mpc::rpc::RpcP2PChannel::new(onchain_state, epoch);
let p2p_channel = hashi::mpc::rpc::RpcP2PChannel::new(
onchain_state,
epoch,
hashi::metrics::MPC_LABEL_SIGNING,
node.hashi().metrics.clone(),
);
let beacon = beacon_value;
let message = message.to_vec();
let metrics = node.hashi().metrics.clone();
async move {
hashi::mpc::SigningManager::sign(
&signing_manager,
Expand All @@ -668,6 +674,7 @@ mod tests {
&beacon,
None,
SIGNING_TIMEOUT,
&metrics,
)
.await
}
Expand Down
34 changes: 22 additions & 12 deletions crates/hashi/src/grpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::time::Duration;

use axum::http;
use prost::Message as _;
use tonic::Response;
use tonic_rustls::Channel;
use tonic_rustls::Endpoint;
Expand Down Expand Up @@ -105,34 +106,41 @@ impl Client {
&self,
epoch: u64,
request: &SendMessagesRequest,
) -> Result<SendMessagesResponse> {
) -> Result<(SendMessagesResponse, usize)> {
let proto_request = request.to_proto(epoch);
let request_size = proto_request.encoded_len();
let response = self
.mpc_service_client()
.send_messages(proto_request)
.await?;
SendMessagesResponse::try_from(response.get_ref())
.map_err(|e| tonic::Status::internal(e.to_string()))
let resp = SendMessagesResponse::try_from(response.get_ref())
.map_err(|e| tonic::Status::internal(e.to_string()))?;
Ok((resp, request_size))
}

pub async fn retrieve_messages(
&self,
request: &RetrieveMessagesRequest,
) -> Result<RetrieveMessagesResponse> {
) -> Result<(RetrieveMessagesResponse, usize, usize)> {
let proto_request = request.to_proto();
let request_size = proto_request.encoded_len();
let response = self
.mpc_service_client()
.retrieve_messages(proto_request)
.await?;
RetrieveMessagesResponse::try_from(response.get_ref())
.map_err(|e| tonic::Status::internal(e.to_string()))
let response_size = response.get_ref().encoded_len();
let resp = RetrieveMessagesResponse::try_from(response.get_ref())
.map_err(|e| tonic::Status::internal(e.to_string()))?;
Ok((resp, request_size, response_size))
Comment on lines +131 to +134
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to add metrics to the clients/servers then we should leverage middleware as it will be much cleaner and keep the business logic free from this instrumentation.

You can take a look at MystenLabs/sui-rust-sdk#237 for how middleware can be added to grpc clients and https://github.com/bmwill/sui/blob/bf4f4c2e66964ba3668baa2accbf0521c9aa0e81/crates/sui-rpc-api/src/lib.rs#L229-L243 for how middleware can be added to the server side (I think we do already do some server side metrics using this paradigm.)

}

pub async fn complain(&self, request: &ComplainRequest) -> Result<ComplaintResponses> {
pub async fn complain(&self, request: &ComplainRequest) -> Result<(ComplaintResponses, usize)> {
let proto_request = request.to_proto();
let request_size = proto_request.encoded_len();
let response = self.mpc_service_client().complain(proto_request).await?;
ComplaintResponses::try_from(response.get_ref())
.map_err(|e| tonic::Status::internal(e.to_string()))
let resp = ComplaintResponses::try_from(response.get_ref())
.map_err(|e| tonic::Status::internal(e.to_string()))?;
Ok((resp, request_size))
}

pub async fn get_public_mpc_output(
Expand All @@ -152,14 +160,16 @@ impl Client {
&self,
epoch: u64,
request: &GetPartialSignaturesRequest,
) -> Result<GetPartialSignaturesResponse> {
) -> Result<(GetPartialSignaturesResponse, usize)> {
let proto_request = request.to_proto(epoch);
let request_size = proto_request.encoded_len();
let response = self
.mpc_service_client()
.get_partial_signatures(proto_request)
.await?;
GetPartialSignaturesResponse::try_from(response.get_ref())
.map_err(|e| tonic::Status::internal(e.to_string()))
let resp = GetPartialSignaturesResponse::try_from(response.get_ref())
.map_err(|e| tonic::Status::internal(e.to_string()))?;
Ok((resp, request_size))
}

pub async fn get_reconfig_completion_signature(&self, epoch: u64) -> Result<Option<Vec<u8>>> {
Expand Down
4 changes: 4 additions & 0 deletions crates/hashi/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ impl HttpService {
Self { inner: hashi }
}

pub(crate) fn metrics(&self) -> &crate::metrics::Metrics {
&self.inner.metrics
}

pub async fn start(self) -> (std::net::SocketAddr, Service) {
let router = {
let max_decoding_message_size = self.inner.config.grpc_max_decoding_message_size();
Expand Down
221 changes: 221 additions & 0 deletions crates/hashi/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,30 @@ pub struct Metrics {

pub mpc_sign_duration_seconds: HistogramVec,
pub mpc_sign_failures_total: IntCounterVec,

// MPC profiling metrics
pub mpc_reconfig_total_duration_seconds: HistogramVec,
pub mpc_end_reconfig_duration_seconds: HistogramVec,
pub mpc_prepare_signing_duration_seconds: HistogramVec,
pub mpc_total_duration_seconds: HistogramVec,
pub mpc_dealer_crypto_duration_seconds: HistogramVec,
pub mpc_p2p_broadcast_duration_seconds: HistogramVec,
pub mpc_cert_publish_duration_seconds: HistogramVec,
pub mpc_tob_poll_duration_seconds: HistogramVec,
pub mpc_cert_verify_duration_seconds: HistogramVec,
pub mpc_message_process_duration_seconds: HistogramVec,
pub mpc_message_retrieval_duration_seconds: HistogramVec,
pub mpc_complaint_recovery_duration_seconds: HistogramVec,
pub mpc_completion_duration_seconds: HistogramVec,
pub mpc_presig_conversion_duration_seconds: HistogramVec,
pub mpc_rotation_prepare_previous_duration_seconds: HistogramVec,
pub mpc_sign_partial_gen_duration_seconds: HistogramVec,
pub mpc_sign_collection_duration_seconds: HistogramVec,
pub mpc_sign_aggregation_duration_seconds: HistogramVec,
pub mpc_rpc_handler_process_duration_seconds: HistogramVec,
pub mpc_bytes_sent_total: IntCounterVec,
pub mpc_bytes_received_total: IntCounterVec,
pub mpc_p2p_message_size_bytes: HistogramVec,
}

const LATENCY_SEC_BUCKETS: &[f64] = &[
Expand All @@ -81,6 +105,19 @@ const LATENCY_SEC_BUCKETS: &[f64] = &[

const MPC_SIGN_DURATION_BUCKETS: &[f64] = &[0.1, 0.25, 0.5, 1., 1.5, 2., 2.5, 3., 4., 5., 7.5, 10.];

pub const MPC_LABEL_DKG: &str = "dkg";
pub const MPC_LABEL_KEY_ROTATION: &str = "key_rotation";
pub const MPC_LABEL_NONCE_GEN: &str = "nonce_gen";
pub const MPC_LABEL_SIGNING: &str = "signing";

const MPC_PROTOCOL_DURATION_BUCKETS: &[f64] = &[0.1, 0.25, 0.5, 1., 2., 5., 10., 20., 30., 60.];

const MPC_PHASE_DURATION_BUCKETS: &[f64] =
&[0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2., 5., 10.];

const MPC_MESSAGE_SIZE_BUCKETS: &[f64] =
&[1024., 4096., 16384., 65536., 262144., 1048576., 4194304.];

impl Metrics {
pub fn new_default() -> Self {
Self::new(prometheus::default_registry())
Expand Down Expand Up @@ -343,6 +380,190 @@ impl Metrics {
registry,
)
.unwrap(),

// MPC profiling: reconfig-level
mpc_reconfig_total_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_reconfig_total_duration_seconds",
"Duration of full handle_reconfig",
&["protocol"],
MPC_PROTOCOL_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),
mpc_end_reconfig_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_end_reconfig_duration_seconds",
"Duration of submit_end_reconfig",
&["protocol"],
MPC_PROTOCOL_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),
mpc_prepare_signing_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_prepare_signing_duration_seconds",
"Duration of prepare_signing",
&["protocol"],
MPC_PROTOCOL_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),

// MPC profiling: per-phase (labeled by protocol)
mpc_total_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_total_duration_seconds",
"End-to-end duration of MPC protocol",
&["protocol"],
MPC_PROTOCOL_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),
mpc_dealer_crypto_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_dealer_crypto_duration_seconds",
"Duration of dealer crypto",
&["protocol"],
MPC_PHASE_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),
mpc_p2p_broadcast_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_p2p_broadcast_duration_seconds",
"Duration of send_to_many",
&["protocol"],
MPC_PHASE_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),
mpc_cert_publish_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_cert_publish_duration_seconds",
"Duration of tob_channel.publish",
&["protocol"],
MPC_PHASE_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),
mpc_tob_poll_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_tob_poll_duration_seconds",
"Duration of tob_channel.receive",
&["protocol"],
MPC_PHASE_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),
mpc_cert_verify_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_cert_verify_duration_seconds",
"Duration of BLS certificate signature verification",
&["protocol"],
MPC_PHASE_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),
mpc_message_process_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_message_process_duration_seconds",
"Duration of AVSS message processing",
&["protocol"],
MPC_PHASE_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),
mpc_message_retrieval_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_message_retrieval_duration_seconds",
"Duration of retrieve_dealer_message",
&["protocol"],
MPC_PHASE_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),
mpc_complaint_recovery_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_complaint_recovery_duration_seconds",
"Duration of complaint recovery",
&["protocol"],
MPC_PHASE_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),
mpc_completion_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_completion_duration_seconds",
"Duration of final aggregation",
&["protocol"],
MPC_PHASE_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),
mpc_presig_conversion_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_presig_conversion_duration_seconds",
"Duration of Presignatures::new",
&["protocol"],
MPC_PHASE_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),
mpc_rotation_prepare_previous_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_rotation_prepare_previous_duration_seconds",
"Duration of prepare_previous_output",
&["protocol"],
MPC_PHASE_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),

// MPC profiling: signing phase breakdown
mpc_sign_partial_gen_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_sign_partial_gen_duration_seconds",
"Duration of generate_partial_signatures",
&["protocol"],
MPC_PHASE_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),
mpc_sign_collection_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_sign_collection_duration_seconds",
"Duration of P2P partial signature collection from peers",
&["protocol"],
MPC_PHASE_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),
mpc_sign_aggregation_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_sign_aggregation_duration_seconds",
"Duration of aggregate_signatures / RS recovery",
&["protocol"],
MPC_PHASE_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),

// MPC profiling: RPC handler
mpc_rpc_handler_process_duration_seconds: register_histogram_vec_with_registry!(
"hashi_mpc_rpc_handler_process_duration_seconds",
"Duration of process_message in RPC handler",
&["protocol"],
MPC_PHASE_DURATION_BUCKETS.to_vec(),
registry,
)
.unwrap(),

// MPC profiling: communication volume
mpc_bytes_sent_total: register_int_counter_vec_with_registry!(
"hashi_mpc_bytes_sent_total",
"Total bytes sent in MPC P2P messages",
&["protocol"],
registry,
)
.unwrap(),
mpc_bytes_received_total: register_int_counter_vec_with_registry!(
"hashi_mpc_bytes_received_total",
"Total bytes received in MPC P2P messages",
&["protocol"],
registry,
)
.unwrap(),
mpc_p2p_message_size_bytes: register_histogram_vec_with_registry!(
"hashi_mpc_p2p_message_size_bytes",
"Size of each MPC P2P message sent (bytes)",
&["protocol"],
MPC_MESSAGE_SIZE_BUCKETS.to_vec(),
registry,
)
.unwrap(),
}
}

Expand Down
Loading
Loading