diff --git a/src/api/handlers.rs b/src/api/handlers.rs index a0b047e..7887c4a 100644 --- a/src/api/handlers.rs +++ b/src/api/handlers.rs @@ -533,13 +533,21 @@ async fn handle_unban( let action = FlowSpecAction::from((mitigation.action_type, &mitigation.action_params)); let rule = FlowSpecRule::new(nlri, action); + let start = std::time::Instant::now(); if let Err(e) = state.announcer.withdraw(&rule).await { tracing::error!(error = %e, "BGP withdrawal failed"); // Continue anyway - mark as withdrawn in DB + } else { + crate::observability::metrics::ANNOUNCEMENTS_TOTAL + .with_label_values(&["withdrawn"]) + .inc(); + crate::observability::metrics::ANNOUNCEMENTS_LATENCY + .observe(start.elapsed().as_secs_f64()); } } // Update mitigation status + let action_type_str = mitigation.action_type.to_string(); mitigation.withdraw(Some(format!("Detector unban: {}", source))); state .repo @@ -547,6 +555,14 @@ async fn handle_unban( .await .map_err(AppError)?; + crate::observability::metrics::MITIGATIONS_WITHDRAWN + .with_label_values(&[ + action_type_str.as_str(), + mitigation.pop.as_str(), + "detector_unban", + ]) + .inc(); + // Broadcast withdrawal via WebSocket let _ = state .ws_broadcast @@ -589,6 +605,9 @@ async fn handle_ban( .find_ban_event_by_external_id(&input.source, ext_id) .await { + crate::observability::metrics::EVENTS_REJECTED + .with_label_values(&[input.source.as_str(), "duplicate"]) + .inc(); return Err(AppError(PrefixdError::DuplicateEvent { detector_source: input.source.clone(), external_id: ext_id.clone(), @@ -602,6 +621,10 @@ async fn handle_ban( // Store event state.repo.insert_event(&event).await.map_err(AppError)?; + crate::observability::metrics::EVENTS_INGESTED + .with_label_values(&[&event.source, &event.attack_vector().to_string()]) + .inc(); + // Check if shutting down if state.is_shutting_down() { return Err(AppError(PrefixdError::ShuttingDown)); @@ -873,6 +896,20 @@ async fn handle_ban( .validate(&intent, state.repo.as_ref(), is_safelisted) .await { + crate::observability::metrics::EVENTS_REJECTED + .with_label_values(&[event.source.as_str(), "guardrail"]) + .inc(); + let reason = match &e { + PrefixdError::GuardrailViolation(g) => format!("{:?}", g) + .split_whitespace() + .next() + .unwrap_or("unknown") + .to_string(), + _ => "unknown".to_string(), + }; + crate::observability::metrics::GUARDRAIL_REJECTIONS + .with_label_values(&[&reason]) + .inc(); tracing::warn!(error = %e, "guardrail rejected mitigation"); return Err(AppError(e)); } @@ -888,6 +925,7 @@ async fn handle_ban( let action = FlowSpecAction::from((mitigation.action_type, &mitigation.action_params)); let rule = FlowSpecRule::new(nlri, action); + let start = std::time::Instant::now(); if let Err(e) = state.announcer.announce(&rule).await { tracing::error!(error = %e, "BGP announcement failed"); mitigation.reject(e.to_string()); @@ -898,6 +936,10 @@ async fn handle_ban( .map_err(AppError)?; return Err(AppError(e)); } + crate::observability::metrics::ANNOUNCEMENTS_TOTAL + .with_label_values(&["announced"]) + .inc(); + crate::observability::metrics::ANNOUNCEMENTS_LATENCY.observe(start.elapsed().as_secs_f64()); } mitigation.activate(); @@ -907,6 +949,10 @@ async fn handle_ban( .await .map_err(AppError)?; + crate::observability::metrics::MITIGATIONS_CREATED + .with_label_values(&[&mitigation.action_type.to_string(), &state.settings.pop]) + .inc(); + // Resolve signal group to 'resolved' now that mitigation is confirmed if let Some(group_id) = signal_group_id { if let Ok(Some(mut group)) = state.repo.get_signal_group(group_id).await { @@ -1370,6 +1416,17 @@ pub async fn create_mitigation( .validate(&intent, state.repo.as_ref(), is_safelisted) .await { + let reason = match &e { + PrefixdError::GuardrailViolation(g) => format!("{:?}", g) + .split_whitespace() + .next() + .unwrap_or("unknown") + .to_string(), + _ => "unknown".to_string(), + }; + crate::observability::metrics::GUARDRAIL_REJECTIONS + .with_label_values(&[&reason]) + .inc(); return Ok(AppError(e).into_response()); } @@ -1381,9 +1438,14 @@ pub async fn create_mitigation( let nlri = FlowSpecNlri::from(&mitigation.match_criteria); let action = FlowSpecAction::from((mitigation.action_type, &mitigation.action_params)); let rule = FlowSpecRule::new(nlri, action); + let start = std::time::Instant::now(); if let Err(e) = state.announcer.announce(&rule).await { return Ok(AppError(e).into_response()); } + crate::observability::metrics::ANNOUNCEMENTS_TOTAL + .with_label_values(&["announced"]) + .inc(); + crate::observability::metrics::ANNOUNCEMENTS_LATENCY.observe(start.elapsed().as_secs_f64()); } mitigation.activate(); @@ -1391,6 +1453,10 @@ pub async fn create_mitigation( return Ok(AppError(e).into_response()); } + crate::observability::metrics::MITIGATIONS_CREATED + .with_label_values(&[&mitigation.action_type.to_string(), &state.settings.pop]) + .inc(); + Ok(( StatusCode::CREATED, Json(MitigationResponse::from(&mitigation)), @@ -1434,13 +1500,19 @@ pub async fn withdraw_mitigation( let nlri = FlowSpecNlri::from(&mitigation.match_criteria); let action = FlowSpecAction::from((mitigation.action_type, &mitigation.action_params)); let rule = FlowSpecRule::new(nlri, action); + let start = std::time::Instant::now(); state .announcer .withdraw(&rule) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + crate::observability::metrics::ANNOUNCEMENTS_TOTAL + .with_label_values(&["withdrawn"]) + .inc(); + crate::observability::metrics::ANNOUNCEMENTS_LATENCY.observe(start.elapsed().as_secs_f64()); } + let action_type_str = mitigation.action_type.to_string(); mitigation.withdraw(Some(format!("{}: {}", req.operator_id, req.reason))); state .repo @@ -1448,6 +1520,14 @@ pub async fn withdraw_mitigation( .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + crate::observability::metrics::MITIGATIONS_WITHDRAWN + .with_label_values(&[ + action_type_str.as_str(), + mitigation.pop.as_str(), + "operator", + ]) + .inc(); + // Broadcast withdrawal via WebSocket let _ = state .ws_broadcast @@ -1565,8 +1645,15 @@ pub async fn bulk_withdraw_mitigations( let nlri = FlowSpecNlri::from(&mitigation.match_criteria); let action = FlowSpecAction::from((mitigation.action_type, &mitigation.action_params)); let rule = FlowSpecRule::new(nlri, action); + let start = std::time::Instant::now(); if let Err(e) = state.announcer.withdraw(&rule).await { tracing::error!(error = %e, mitigation_id = %id, "BGP withdrawal failed in bulk withdraw"); + } else { + crate::observability::metrics::ANNOUNCEMENTS_TOTAL + .with_label_values(&["withdrawn"]) + .inc(); + crate::observability::metrics::ANNOUNCEMENTS_LATENCY + .observe(start.elapsed().as_secs_f64()); } } diff --git a/src/observability/metrics.rs b/src/observability/metrics.rs index 36b6daf..d86e388 100644 --- a/src/observability/metrics.rs +++ b/src/observability/metrics.rs @@ -1,7 +1,7 @@ use once_cell::sync::Lazy; use prometheus::{ - CounterVec, Encoder, GaugeVec, HistogramVec, TextEncoder, register_counter_vec, - register_gauge_vec, register_histogram_vec, + CounterVec, Encoder, GaugeVec, Histogram, HistogramVec, TextEncoder, register_counter_vec, + register_gauge_vec, register_histogram, register_histogram_vec, }; // Event metrics @@ -65,16 +65,15 @@ pub static ANNOUNCEMENTS_TOTAL: Lazy = Lazy::new(|| { register_counter_vec!( "prefixd_announcements_total", "Total number of BGP announcements", - &["peer", "status"] + &["status"] ) .unwrap() }); -pub static ANNOUNCEMENTS_LATENCY: Lazy = Lazy::new(|| { - register_histogram_vec!( +pub static ANNOUNCEMENTS_LATENCY: Lazy = Lazy::new(|| { + register_histogram!( "prefixd_announcements_latency_seconds", "BGP announcement latency in seconds", - &["peer"], vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0] ) .unwrap() diff --git a/src/scheduler/reconcile.rs b/src/scheduler/reconcile.rs index 07f3d42..c45f043 100644 --- a/src/scheduler/reconcile.rs +++ b/src/scheduler/reconcile.rs @@ -55,8 +55,18 @@ impl ReconciliationLoop { ); // Initial reconciliation - if let Err(e) = self.reconcile().await { - tracing::error!(error = %e, "initial reconciliation failed"); + match self.reconcile().await { + Ok(()) => { + crate::observability::metrics::RECONCILIATION_RUNS + .with_label_values(&["success"]) + .inc(); + } + Err(e) => { + crate::observability::metrics::RECONCILIATION_RUNS + .with_label_values(&["error"]) + .inc(); + tracing::error!(error = %e, "initial reconciliation failed"); + } } let mut interval = tokio::time::interval(self.interval); @@ -65,8 +75,18 @@ impl ReconciliationLoop { loop { tokio::select! { _ = interval.tick() => { - if let Err(e) = self.reconcile().await { - tracing::error!(error = %e, "reconciliation failed"); + match self.reconcile().await { + Ok(()) => { + crate::observability::metrics::RECONCILIATION_RUNS + .with_label_values(&["success"]) + .inc(); + } + Err(e) => { + crate::observability::metrics::RECONCILIATION_RUNS + .with_label_values(&["error"]) + .inc(); + tracing::error!(error = %e, "reconciliation failed"); + } } } _ = shutdown.recv() => { @@ -88,6 +108,9 @@ impl ReconciliationLoop { // 3. Sync desired vs actual state self.sync_announcements().await?; + // 4. Update BGP session metrics + self.update_bgp_session_metrics().await; + Ok(()) } @@ -104,19 +127,32 @@ impl ReconciliationLoop { // Withdraw BGP announcement if !self.dry_run { let rule = self.build_flowspec_rule(&mitigation); + let start = std::time::Instant::now(); if let Err(e) = self.announcer.withdraw(&rule).await { tracing::warn!( mitigation_id = %mitigation.mitigation_id, error = %e, "failed to withdraw expired mitigation" ); + } else { + crate::observability::metrics::ANNOUNCEMENTS_TOTAL + .with_label_values(&["withdrawn"]) + .inc(); + crate::observability::metrics::ANNOUNCEMENTS_LATENCY + .observe(start.elapsed().as_secs_f64()); } } // Update status + let action_type_str = mitigation.action_type.to_string(); + let pop = mitigation.pop.clone(); mitigation.expire(); self.repo.update_mitigation(&mitigation).await?; + crate::observability::metrics::MITIGATIONS_EXPIRED + .with_label_values(&[&action_type_str, &pop]) + .inc(); + // Broadcast expiry via WebSocket if let Some(ref tx) = self.ws_broadcast { let _ = tx.send(WsMessage::MitigationExpired { @@ -197,6 +233,22 @@ impl ReconciliationLoop { .with_label_values(&["local"]) .set(active.len() as f64); + { + use std::collections::HashMap; + let mut counts: HashMap<(String, String), f64> = HashMap::new(); + for m in &active { + *counts + .entry((m.action_type.to_string(), m.pop.clone())) + .or_default() += 1.0; + } + crate::observability::metrics::MITIGATIONS_ACTIVE.reset(); + for ((action_type, pop), count) in &counts { + crate::observability::metrics::MITIGATIONS_ACTIVE + .with_label_values(&[action_type, pop]) + .set(*count); + } + } + // Get actual state from BGP let announced = self.announcer.list_active().await?; let announced_hashes: std::collections::HashSet<_> = @@ -215,12 +267,19 @@ impl ReconciliationLoop { ); if !self.dry_run { + let start = std::time::Instant::now(); if let Err(e) = self.announcer.announce(&rule).await { tracing::error!( mitigation_id = %mitigation.mitigation_id, error = %e, "failed to re-announce" ); + } else { + crate::observability::metrics::ANNOUNCEMENTS_TOTAL + .with_label_values(&["announced"]) + .inc(); + crate::observability::metrics::ANNOUNCEMENTS_LATENCY + .observe(start.elapsed().as_secs_f64()); } } } @@ -245,6 +304,26 @@ impl ReconciliationLoop { Ok(()) } + async fn update_bgp_session_metrics(&self) { + match self.announcer.session_status().await { + Ok(peers) => { + for peer in &peers { + let value = if peer.state.is_established() { + 1.0 + } else { + 0.0 + }; + crate::observability::metrics::BGP_SESSION_UP + .with_label_values(&[&peer.name]) + .set(value); + } + } + Err(e) => { + tracing::warn!(error = %e, "failed to fetch BGP session status for metrics"); + } + } + } + fn build_flowspec_rule(&self, m: &crate::domain::Mitigation) -> FlowSpecRule { let nlri = FlowSpecNlri::from(&m.match_criteria); let action = FlowSpecAction::from((m.action_type, &m.action_params));