diff --git a/Cargo.lock b/Cargo.lock index 4c8ca20..4494934 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3196,6 +3196,7 @@ name = "platz-status-updates" version = "0.1.0" dependencies = [ "anyhow", + "clap", "futures", "platz-chart-ext", "platz-db", diff --git a/status-updates/Cargo.toml b/status-updates/Cargo.toml index efd194e..62b13d2 100644 --- a/status-updates/Cargo.toml +++ b/status-updates/Cargo.toml @@ -5,6 +5,7 @@ version = "0.1.0" [dependencies] anyhow = "1.0.87" +clap = { version = "4.5.17", features = ["derive"] } futures = "0.3.30" platz-chart-ext = { workspace = true } reqwest = { version = "0.12.7", default-features = false, features = [ diff --git a/status-updates/src/events.rs b/status-updates/src/events.rs index c997136..3d28941 100644 --- a/status-updates/src/events.rs +++ b/status-updates/src/events.rs @@ -2,19 +2,24 @@ use crate::tracker::StatusTracker; use anyhow::Result; use futures::future::join_all; use platz_db::{db_events, DbEventOperation, DbTable, Deployment}; +use std::{io::Write, path::PathBuf}; use tokio::time; use tracing::debug; const DEPLOYMENT_CHUNK_SIZE: usize = 10; const DEPLOYMENT_SLEEP_BETWEEN_CHUNKS: time::Duration = time::Duration::from_secs(1); -pub async fn watch_deployments(tracker: StatusTracker) -> Result<()> { +pub async fn watch_deployments( + tracker: StatusTracker, + heartbeat_file_path: Option, +) -> Result<()> { let mut db_rx = db_events(); for deploy_chunk in Deployment::all().await?.chunks(DEPLOYMENT_CHUNK_SIZE) { join_all( deploy_chunk .iter() + .filter(|dep| dep.enabled) .map(|deployment| tracker.add(deployment.clone())), ) .await; @@ -24,14 +29,37 @@ pub async fn watch_deployments(tracker: StatusTracker) -> Result<()> { loop { let event = db_rx.recv().await?; debug!("Got {:?}", event); - if event.table == DbTable::Deployments { - match event.operation { - DbEventOperation::Delete => tracker.remove(event.data.id).await, - _ => { - let deployment = Deployment::find(event.data.id).await?; - tracker.add(deployment).await - } + tokio::time::timeout( + std::time::Duration::from_secs(60), + handle_db_event(&tracker, event), + ) + .await + .map_err(|_| anyhow::anyhow!("Timed out processing DB event")) + .inspect_err(|e| tracing::error!("Error processing DB events: {e:?}")) + .and_then(|res| res)?; + + if let Some(path) = &heartbeat_file_path { + let mut f = std::fs::OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(path)?; + writeln!(&mut f, "{:?} -- {:?}", std::time::SystemTime::now(), event)?; + } + } +} + +async fn handle_db_event(tracker: &StatusTracker, event: platz_db::DbEvent) -> Result<()> { + if event.table == DbTable::Deployments { + match event.operation { + DbEventOperation::Delete => { + tracker.remove(event.data.id).await; + } + _ => { + let deployment = Deployment::find(event.data.id).await?; + tracker.add(deployment).await; } } } + Ok(()) } diff --git a/status-updates/src/main.rs b/status-updates/src/main.rs index e37529a..eb465cf 100644 --- a/status-updates/src/main.rs +++ b/status-updates/src/main.rs @@ -2,8 +2,11 @@ mod events; mod status_config; mod tracker; +use std::path::PathBuf; + use crate::tracker::StatusTracker; use anyhow::Result; +use clap::Parser; use platz_db::DbTable; use tokio::{ select, @@ -11,9 +14,17 @@ use tokio::{ }; use tracing::{info, warn}; +#[derive(Parser)] +struct Opts { + #[clap(long)] + heartbeat_file_path: Option, +} + #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt::init(); + let opts = Opts::parse(); + info!("Starting status updates worker"); let mut sigterm = signal(SignalKind::terminate())?; let mut sigint = signal(SignalKind::interrupt())?; @@ -38,7 +49,7 @@ async fn main() -> Result<()> { result.map_err(Into::into) } - result = events::watch_deployments(StatusTracker::new()) => { + result = events::watch_deployments(StatusTracker::new(), opts.heartbeat_file_path) => { result } } diff --git a/status-updates/src/tracker.rs b/status-updates/src/tracker.rs index 23d87eb..96849a2 100644 --- a/status-updates/src/tracker.rs +++ b/status-updates/src/tracker.rs @@ -7,7 +7,7 @@ use platz_db::{ }; use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::{sync::RwLock, task}; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use url::Url; use uuid::Uuid; @@ -28,15 +28,16 @@ impl StatusTracker { } pub async fn add(&self, deployment: Deployment) { + let deployment_id = deployment.id; if !deployment.enabled { warn!( "Deployment {} is disabled, stopping its status updates if there were any", - deployment.id + deployment_id ); self.remove(deployment.id).await; if deployment.reported_status.is_some() { UpdateDeploymentReportedStatus::new(None) - .save(deployment.id) + .save(deployment_id) .await .unwrap(); } @@ -79,16 +80,20 @@ impl StatusTracker { deployment.id, task::spawn(periodic_deployment_status_update(deployment, new_config)), ) { + debug!("Stopping previous status update task assigned for {deployment_id}"); handle.abort(); } + info!("Deployment {deployment_id} added to tracker"); } pub async fn remove(&self, id: Uuid) { info!("Removing deployment {}", id); self.inner.configs.write().await.remove(&id); if let Some(handle) = self.inner.tasks.write().await.remove(&id) { + debug!("Stopping previous status update task assigned for {id}"); handle.abort(); } + info!("Deployment {id} removed from tracker"); } }