Skip to content
Merged
87 changes: 87 additions & 0 deletions src/api/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,20 +533,36 @@ async fn handle_unban(
let action = FlowSpecAction::from((mitigation.action_type, &mitigation.action_params));
let rule = FlowSpecRule::new(nlri, action);

let start = std::time::Instant::now();
if let Err(e) = state.announcer.withdraw(&rule).await {
tracing::error!(error = %e, "BGP withdrawal failed");
// Continue anyway - mark as withdrawn in DB
} else {
crate::observability::metrics::ANNOUNCEMENTS_TOTAL
.with_label_values(&["withdrawn"])
.inc();
crate::observability::metrics::ANNOUNCEMENTS_LATENCY
.observe(start.elapsed().as_secs_f64());
}
}

// Update mitigation status
let action_type_str = mitigation.action_type.to_string();
mitigation.withdraw(Some(format!("Detector unban: {}", source)));
state
.repo
.update_mitigation(&mitigation)
.await
.map_err(AppError)?;

crate::observability::metrics::MITIGATIONS_WITHDRAWN
.with_label_values(&[
action_type_str.as_str(),
mitigation.pop.as_str(),
"detector_unban",
])
.inc();

// Broadcast withdrawal via WebSocket
let _ = state
.ws_broadcast
Expand Down Expand Up @@ -589,6 +605,9 @@ async fn handle_ban(
.find_ban_event_by_external_id(&input.source, ext_id)
.await
{
crate::observability::metrics::EVENTS_REJECTED
.with_label_values(&[input.source.as_str(), "duplicate"])
.inc();
return Err(AppError(PrefixdError::DuplicateEvent {
detector_source: input.source.clone(),
external_id: ext_id.clone(),
Expand All @@ -602,6 +621,10 @@ async fn handle_ban(
// Store event
state.repo.insert_event(&event).await.map_err(AppError)?;

crate::observability::metrics::EVENTS_INGESTED
.with_label_values(&[&event.source, &event.attack_vector().to_string()])
.inc();

// Check if shutting down
if state.is_shutting_down() {
return Err(AppError(PrefixdError::ShuttingDown));
Expand Down Expand Up @@ -873,6 +896,20 @@ async fn handle_ban(
.validate(&intent, state.repo.as_ref(), is_safelisted)
.await
{
crate::observability::metrics::EVENTS_REJECTED
.with_label_values(&[event.source.as_str(), "guardrail"])
.inc();
let reason = match &e {
PrefixdError::GuardrailViolation(g) => format!("{:?}", g)
.split_whitespace()
.next()
.unwrap_or("unknown")
.to_string(),
Comment on lines +903 to +907
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little clunky but does the trick. If we wanted to take a dependency on strum, we could clean this up to be something like:

#[derive(strum::AsRefStr)]
enum GuardrailError {
    TtlRequired,           // .as_ref() => "TtlRequired"
    Safelisted { ip: String }, // .as_ref() => "Safelisted"
    QuotaExceeded { .. },  // .as_ref() => "QuotaExceeded"
}
let reason = match &e {
    PrefixdError::GuardrailViolation(g) => g.as_ref(),
    _ => "unknown",
};

_ => "unknown".to_string(),
};
crate::observability::metrics::GUARDRAIL_REJECTIONS
.with_label_values(&[&reason])
.inc();
tracing::warn!(error = %e, "guardrail rejected mitigation");
return Err(AppError(e));
}
Expand All @@ -888,6 +925,7 @@ async fn handle_ban(
let action = FlowSpecAction::from((mitigation.action_type, &mitigation.action_params));
let rule = FlowSpecRule::new(nlri, action);

let start = std::time::Instant::now();
if let Err(e) = state.announcer.announce(&rule).await {
tracing::error!(error = %e, "BGP announcement failed");
mitigation.reject(e.to_string());
Expand All @@ -898,6 +936,10 @@ async fn handle_ban(
.map_err(AppError)?;
return Err(AppError(e));
}
crate::observability::metrics::ANNOUNCEMENTS_TOTAL
.with_label_values(&["announced"])
.inc();
crate::observability::metrics::ANNOUNCEMENTS_LATENCY.observe(start.elapsed().as_secs_f64());
}

mitigation.activate();
Expand All @@ -907,6 +949,10 @@ async fn handle_ban(
.await
.map_err(AppError)?;

crate::observability::metrics::MITIGATIONS_CREATED
.with_label_values(&[&mitigation.action_type.to_string(), &state.settings.pop])
.inc();

// Resolve signal group to 'resolved' now that mitigation is confirmed
if let Some(group_id) = signal_group_id {
if let Ok(Some(mut group)) = state.repo.get_signal_group(group_id).await {
Expand Down Expand Up @@ -1370,6 +1416,17 @@ pub async fn create_mitigation(
.validate(&intent, state.repo.as_ref(), is_safelisted)
.await
{
let reason = match &e {
PrefixdError::GuardrailViolation(g) => format!("{:?}", g)
.split_whitespace()
.next()
.unwrap_or("unknown")
.to_string(),
_ => "unknown".to_string(),
};
crate::observability::metrics::GUARDRAIL_REJECTIONS
.with_label_values(&[&reason])
.inc();
return Ok(AppError(e).into_response());
}

Expand All @@ -1381,16 +1438,25 @@ pub async fn create_mitigation(
let nlri = FlowSpecNlri::from(&mitigation.match_criteria);
let action = FlowSpecAction::from((mitigation.action_type, &mitigation.action_params));
let rule = FlowSpecRule::new(nlri, action);
let start = std::time::Instant::now();
if let Err(e) = state.announcer.announce(&rule).await {
return Ok(AppError(e).into_response());
}
crate::observability::metrics::ANNOUNCEMENTS_TOTAL
.with_label_values(&["announced"])
.inc();
crate::observability::metrics::ANNOUNCEMENTS_LATENCY.observe(start.elapsed().as_secs_f64());
}

mitigation.activate();
if let Err(e) = state.repo.insert_mitigation(&mitigation).await {
return Ok(AppError(e).into_response());
}

crate::observability::metrics::MITIGATIONS_CREATED
.with_label_values(&[&mitigation.action_type.to_string(), &state.settings.pop])
.inc();

Ok((
StatusCode::CREATED,
Json(MitigationResponse::from(&mitigation)),
Expand Down Expand Up @@ -1434,20 +1500,34 @@ pub async fn withdraw_mitigation(
let nlri = FlowSpecNlri::from(&mitigation.match_criteria);
let action = FlowSpecAction::from((mitigation.action_type, &mitigation.action_params));
let rule = FlowSpecRule::new(nlri, action);
let start = std::time::Instant::now();
state
.announcer
.withdraw(&rule)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
crate::observability::metrics::ANNOUNCEMENTS_TOTAL
.with_label_values(&["withdrawn"])
.inc();
crate::observability::metrics::ANNOUNCEMENTS_LATENCY.observe(start.elapsed().as_secs_f64());
}

let action_type_str = mitigation.action_type.to_string();
mitigation.withdraw(Some(format!("{}: {}", req.operator_id, req.reason)));
state
.repo
.update_mitigation(&mitigation)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

crate::observability::metrics::MITIGATIONS_WITHDRAWN
.with_label_values(&[
action_type_str.as_str(),
mitigation.pop.as_str(),
"operator",
])
.inc();

// Broadcast withdrawal via WebSocket
let _ = state
.ws_broadcast
Expand Down Expand Up @@ -1565,8 +1645,15 @@ pub async fn bulk_withdraw_mitigations(
let nlri = FlowSpecNlri::from(&mitigation.match_criteria);
let action = FlowSpecAction::from((mitigation.action_type, &mitigation.action_params));
let rule = FlowSpecRule::new(nlri, action);
let start = std::time::Instant::now();
if let Err(e) = state.announcer.withdraw(&rule).await {
tracing::error!(error = %e, mitigation_id = %id, "BGP withdrawal failed in bulk withdraw");
} else {
crate::observability::metrics::ANNOUNCEMENTS_TOTAL
.with_label_values(&["withdrawn"])
.inc();
crate::observability::metrics::ANNOUNCEMENTS_LATENCY
.observe(start.elapsed().as_secs_f64());
}
}

Expand Down
11 changes: 5 additions & 6 deletions src/observability/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use once_cell::sync::Lazy;
use prometheus::{
CounterVec, Encoder, GaugeVec, HistogramVec, TextEncoder, register_counter_vec,
register_gauge_vec, register_histogram_vec,
CounterVec, Encoder, GaugeVec, Histogram, HistogramVec, TextEncoder, register_counter_vec,
register_gauge_vec, register_histogram, register_histogram_vec,
};

// Event metrics
Expand Down Expand Up @@ -65,16 +65,15 @@ pub static ANNOUNCEMENTS_TOTAL: Lazy<CounterVec> = Lazy::new(|| {
register_counter_vec!(
"prefixd_announcements_total",
"Total number of BGP announcements",
&["peer", "status"]
Copy link
Copy Markdown
Contributor Author

@bswinnerton bswinnerton Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find an easy way to derive the peer, so I opted to remove this from the instrumentation schema, but happy to put it back and add a placeholder like "global".

I also moved over to a Histogram so we don't have to do something like this:

crate::observability::metrics::ANNOUNCEMENTS_LATENCY
  .with_label_values(&[] as &[&str])

&["status"]
)
.unwrap()
});

pub static ANNOUNCEMENTS_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
pub static ANNOUNCEMENTS_LATENCY: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"prefixd_announcements_latency_seconds",
"BGP announcement latency in seconds",
&["peer"],
vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
.unwrap()
Expand Down
87 changes: 83 additions & 4 deletions src/scheduler/reconcile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,18 @@ impl ReconciliationLoop {
);

// Initial reconciliation
if let Err(e) = self.reconcile().await {
tracing::error!(error = %e, "initial reconciliation failed");
match self.reconcile().await {
Ok(()) => {
crate::observability::metrics::RECONCILIATION_RUNS
.with_label_values(&["success"])
.inc();
}
Err(e) => {
crate::observability::metrics::RECONCILIATION_RUNS
.with_label_values(&["error"])
.inc();
tracing::error!(error = %e, "initial reconciliation failed");
}
}

let mut interval = tokio::time::interval(self.interval);
Expand All @@ -65,8 +75,18 @@ impl ReconciliationLoop {
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(e) = self.reconcile().await {
tracing::error!(error = %e, "reconciliation failed");
match self.reconcile().await {
Ok(()) => {
crate::observability::metrics::RECONCILIATION_RUNS
.with_label_values(&["success"])
.inc();
}
Err(e) => {
crate::observability::metrics::RECONCILIATION_RUNS
.with_label_values(&["error"])
.inc();
tracing::error!(error = %e, "reconciliation failed");
}
}
}
_ = shutdown.recv() => {
Expand All @@ -88,6 +108,9 @@ impl ReconciliationLoop {
// 3. Sync desired vs actual state
self.sync_announcements().await?;

// 4. Update BGP session metrics
self.update_bgp_session_metrics().await;

Ok(())
}

Expand All @@ -104,19 +127,32 @@ impl ReconciliationLoop {
// Withdraw BGP announcement
if !self.dry_run {
let rule = self.build_flowspec_rule(&mitigation);
let start = std::time::Instant::now();
if let Err(e) = self.announcer.withdraw(&rule).await {
tracing::warn!(
mitigation_id = %mitigation.mitigation_id,
error = %e,
"failed to withdraw expired mitigation"
);
} else {
crate::observability::metrics::ANNOUNCEMENTS_TOTAL
.with_label_values(&["withdrawn"])
.inc();
crate::observability::metrics::ANNOUNCEMENTS_LATENCY
.observe(start.elapsed().as_secs_f64());
}
}

// Update status
let action_type_str = mitigation.action_type.to_string();
let pop = mitigation.pop.clone();
mitigation.expire();
self.repo.update_mitigation(&mitigation).await?;

crate::observability::metrics::MITIGATIONS_EXPIRED
.with_label_values(&[&action_type_str, &pop])
.inc();

// Broadcast expiry via WebSocket
if let Some(ref tx) = self.ws_broadcast {
let _ = tx.send(WsMessage::MitigationExpired {
Expand Down Expand Up @@ -197,6 +233,22 @@ impl ReconciliationLoop {
.with_label_values(&["local"])
.set(active.len() as f64);

{
use std::collections::HashMap;
let mut counts: HashMap<(String, String), f64> = HashMap::new();
for m in &active {
*counts
.entry((m.action_type.to_string(), m.pop.clone()))
.or_default() += 1.0;
}
crate::observability::metrics::MITIGATIONS_ACTIVE.reset();
for ((action_type, pop), count) in &counts {
crate::observability::metrics::MITIGATIONS_ACTIVE
.with_label_values(&[action_type, pop])
.set(*count);
}
}

// Get actual state from BGP
let announced = self.announcer.list_active().await?;
let announced_hashes: std::collections::HashSet<_> =
Expand All @@ -215,12 +267,19 @@ impl ReconciliationLoop {
);

if !self.dry_run {
let start = std::time::Instant::now();
if let Err(e) = self.announcer.announce(&rule).await {
tracing::error!(
mitigation_id = %mitigation.mitigation_id,
error = %e,
"failed to re-announce"
);
} else {
crate::observability::metrics::ANNOUNCEMENTS_TOTAL
.with_label_values(&["announced"])
.inc();
crate::observability::metrics::ANNOUNCEMENTS_LATENCY
.observe(start.elapsed().as_secs_f64());
}
}
}
Expand All @@ -245,6 +304,26 @@ impl ReconciliationLoop {
Ok(())
}

async fn update_bgp_session_metrics(&self) {
match self.announcer.session_status().await {
Ok(peers) => {
for peer in &peers {
let value = if peer.state.is_established() {
1.0
} else {
0.0
};
crate::observability::metrics::BGP_SESSION_UP
.with_label_values(&[&peer.name])
.set(value);
}
}
Err(e) => {
tracing::warn!(error = %e, "failed to fetch BGP session status for metrics");
}
}
}

fn build_flowspec_rule(&self, m: &crate::domain::Mitigation) -> FlowSpecRule {
let nlri = FlowSpecNlri::from(&m.match_criteria);
let action = FlowSpecAction::from((m.action_type, &m.action_params));
Expand Down
Loading