Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions status-updates/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ version = "0.1.0"

[dependencies]
anyhow = "1.0.87"
clap = { version = "4.5.17", features = ["derive"] }
futures = "0.3.30"
platz-chart-ext = { workspace = true }
reqwest = { version = "0.12.7", default-features = false, features = [
Expand Down
44 changes: 36 additions & 8 deletions status-updates/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@ use crate::tracker::StatusTracker;
use anyhow::Result;
use futures::future::join_all;
use platz_db::{db_events, DbEventOperation, DbTable, Deployment};
use std::{io::Write, path::PathBuf};
use tokio::time;
use tracing::debug;

const DEPLOYMENT_CHUNK_SIZE: usize = 10;
const DEPLOYMENT_SLEEP_BETWEEN_CHUNKS: time::Duration = time::Duration::from_secs(1);

pub async fn watch_deployments(tracker: StatusTracker) -> Result<()> {
pub async fn watch_deployments(
tracker: StatusTracker,
heartbeat_file_path: Option<PathBuf>,
) -> Result<()> {
let mut db_rx = db_events();

for deploy_chunk in Deployment::all().await?.chunks(DEPLOYMENT_CHUNK_SIZE) {
join_all(
deploy_chunk
.iter()
.filter(|dep| dep.enabled)
.map(|deployment| tracker.add(deployment.clone())),
)
.await;
Expand All @@ -24,14 +29,37 @@ pub async fn watch_deployments(tracker: StatusTracker) -> Result<()> {
loop {
let event = db_rx.recv().await?;
debug!("Got {:?}", event);
if event.table == DbTable::Deployments {
match event.operation {
DbEventOperation::Delete => tracker.remove(event.data.id).await,
_ => {
let deployment = Deployment::find(event.data.id).await?;
tracker.add(deployment).await
}
tokio::time::timeout(
std::time::Duration::from_secs(60),
handle_db_event(&tracker, event),
)
.await
.map_err(|_| anyhow::anyhow!("Timed out processing DB event"))
.inspect_err(|e| tracing::error!("Error processing DB events: {e:?}"))
.and_then(|res| res)?;

if let Some(path) = &heartbeat_file_path {
let mut f = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(path)?;
writeln!(&mut f, "{:?} -- {:?}", std::time::SystemTime::now(), event)?;
}
}
}

async fn handle_db_event(tracker: &StatusTracker, event: platz_db::DbEvent) -> Result<()> {
if event.table == DbTable::Deployments {
match event.operation {
DbEventOperation::Delete => {
tracker.remove(event.data.id).await;
}
_ => {
let deployment = Deployment::find(event.data.id).await?;
tracker.add(deployment).await;
}
}
}
Ok(())
}
13 changes: 12 additions & 1 deletion status-updates/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,29 @@ mod events;
mod status_config;
mod tracker;

use std::path::PathBuf;

use crate::tracker::StatusTracker;
use anyhow::Result;
use clap::Parser;
use platz_db::DbTable;
use tokio::{
select,
signal::unix::{signal, SignalKind},
};
use tracing::{info, warn};

#[derive(Parser)]
struct Opts {
#[clap(long)]
heartbeat_file_path: Option<PathBuf>,
}

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let opts = Opts::parse();

info!("Starting status updates worker");
let mut sigterm = signal(SignalKind::terminate())?;
let mut sigint = signal(SignalKind::interrupt())?;
Expand All @@ -38,7 +49,7 @@ async fn main() -> Result<()> {
result.map_err(Into::into)
}

result = events::watch_deployments(StatusTracker::new()) => {
result = events::watch_deployments(StatusTracker::new(), opts.heartbeat_file_path) => {
result
}
}
Expand Down
11 changes: 8 additions & 3 deletions status-updates/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use platz_db::{
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::{sync::RwLock, task};
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};
use url::Url;
use uuid::Uuid;

Expand All @@ -28,15 +28,16 @@ impl StatusTracker {
}

pub async fn add(&self, deployment: Deployment) {
let deployment_id = deployment.id;
if !deployment.enabled {
warn!(
"Deployment {} is disabled, stopping its status updates if there were any",
deployment.id
deployment_id
);
self.remove(deployment.id).await;
if deployment.reported_status.is_some() {
UpdateDeploymentReportedStatus::new(None)
.save(deployment.id)
.save(deployment_id)
.await
.unwrap();
}
Expand Down Expand Up @@ -79,16 +80,20 @@ impl StatusTracker {
deployment.id,
task::spawn(periodic_deployment_status_update(deployment, new_config)),
) {
debug!("Stopping previous status update task assigned for {deployment_id}");
handle.abort();
}
info!("Deployment {deployment_id} added to tracker");
}

pub async fn remove(&self, id: Uuid) {
info!("Removing deployment {}", id);
self.inner.configs.write().await.remove(&id);
if let Some(handle) = self.inner.tasks.write().await.remove(&id) {
debug!("Stopping previous status update task assigned for {id}");
handle.abort();
}
info!("Deployment {id} removed from tracker");
}
}

Expand Down