diff --git a/apps/labrinth/src/background_task.rs b/apps/labrinth/src/background_task.rs index 35d8531b78..3c5f4a8d37 100644 --- a/apps/labrinth/src/background_task.rs +++ b/apps/labrinth/src/background_task.rs @@ -24,7 +24,7 @@ impl BackgroundTask { pool: sqlx::Pool, redis_pool: RedisPool, search_config: search::SearchConfig, - clickhouse: clickhouse::Client, + clickhouse: Option, stripe_client: stripe::Client, ) { use BackgroundTask::*; @@ -33,7 +33,13 @@ impl BackgroundTask { IndexSearch => index_search(pool, redis_pool, search_config).await, ReleaseScheduled => release_scheduled(pool).await, UpdateVersions => update_versions(pool, redis_pool).await, - Payouts => payouts(pool, clickhouse).await, + Payouts => { + if let Some(client) = clickhouse { + payouts(pool, Some(client)).await + } else { + warn!("Cannot run payouts: ClickHouse client unavailable"); + } + } IndexBilling => { crate::routes::internal::billing::index_billing( stripe_client, @@ -121,14 +127,18 @@ pub async fn update_versions( pub async fn payouts( pool: sqlx::Pool, - clickhouse: clickhouse::Client, + clickhouse: Option, ) { - info!("Started running payouts"); - let result = process_payout(&pool, &clickhouse).await; - if let Err(e) = result { - warn!("Payouts run failed: {:?}", e); + if let Some(client) = clickhouse { + info!("Started running payouts"); + let result = process_payout(&pool, Some(&client)).await; + if let Err(e) = result { + warn!("Payouts run failed: {:?}", e); + } + info!("Done running payouts"); + } else { + warn!("Skipping payouts: ClickHouse client unavailable"); } - info!("Done running payouts"); } mod version_updater { diff --git a/apps/labrinth/src/clickhouse/fetch.rs b/apps/labrinth/src/clickhouse/fetch.rs index b0245075b8..fc5e6543fb 100644 --- a/apps/labrinth/src/clickhouse/fetch.rs +++ b/apps/labrinth/src/clickhouse/fetch.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use crate::{models::ids::ProjectId, routes::ApiError}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; @@ -25,7 +23,7 @@ pub async fn fetch_playtimes( start_date: DateTime, end_date: DateTime, resolution_minute: u32, - client: Arc, + client: &clickhouse::Client, ) -> Result, ApiError> { let query = client .query( @@ -56,12 +54,12 @@ pub async fn fetch_views( start_date: DateTime, end_date: DateTime, resolution_minutes: u32, - client: Arc, + client: &clickhouse::Client, ) -> Result, ApiError> { let query = client .query( " - SELECT + SELECT toUnixTimestamp(toStartOfInterval(recorded, toIntervalMinute(?))) AS time, project_id AS id, count(1) AS total @@ -86,12 +84,12 @@ pub async fn fetch_downloads( start_date: DateTime, end_date: DateTime, resolution_minutes: u32, - client: Arc, + client: &clickhouse::Client, ) -> Result, ApiError> { let query = client .query( " - SELECT + SELECT toUnixTimestamp(toStartOfInterval(recorded, toIntervalMinute(?))) AS time, project_id as id, count(1) AS total @@ -113,7 +111,7 @@ pub async fn fetch_countries_downloads( projects: Vec, start_date: DateTime, end_date: DateTime, - client: Arc, + client: &clickhouse::Client, ) -> Result, ApiError> { let query = client .query( @@ -140,7 +138,7 @@ pub async fn fetch_countries_views( projects: Vec, start_date: DateTime, end_date: DateTime, - client: Arc, + client: &clickhouse::Client, ) -> Result, ApiError> { let query = client .query( diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index 30b85d8d4b..73283eb1cf 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -44,7 +44,7 @@ pub struct Pepper { pub struct LabrinthConfig { pub pool: sqlx::Pool, pub redis_pool: RedisPool, - pub clickhouse: Client, + pub clickhouse: Option, pub file_host: Arc, pub maxmind: Arc, pub scheduler: Arc, @@ -64,7 +64,7 @@ pub fn app_setup( pool: sqlx::Pool, redis_pool: RedisPool, search_config: search::SearchConfig, - clickhouse: &mut Client, + clickhouse: Option, file_host: Arc, maxmind: Arc, stripe_client: stripe::Client, @@ -142,15 +142,24 @@ pub fn app_setup( } }); - let pool_ref = pool.clone(); - let client_ref = clickhouse.clone(); - scheduler.run(Duration::from_secs(60 * 60 * 6), move || { - let pool_ref = pool_ref.clone(); - let client_ref = client_ref.clone(); - async move { - background_task::payouts(pool_ref, client_ref).await; + match clickhouse.clone() { + Some(client) => { + let pool_ref = pool.clone(); + scheduler.run(Duration::from_secs(60 * 60 * 6), move || { + let pool_ref = pool_ref.clone(); + let client_ref = client.clone(); + async move { + background_task::payouts(pool_ref, Some(client_ref)) + .await; + } + }); } - }); + None => { + warn!( + "Skipping scheduling payouts task: ClickHouse client unavailable" + ); + } + } let pool_ref = pool.clone(); let redis_ref = redis_pool.clone(); @@ -224,27 +233,35 @@ pub fn app_setup( let analytics_queue = Arc::new(AnalyticsQueue::new()); { - let client_ref = clickhouse.clone(); - let analytics_queue_ref = analytics_queue.clone(); - let pool_ref = pool.clone(); - let redis_ref = redis_pool.clone(); - scheduler.run(Duration::from_secs(15), move || { - let client_ref = client_ref.clone(); - let analytics_queue_ref = analytics_queue_ref.clone(); - let pool_ref = pool_ref.clone(); - let redis_ref = redis_ref.clone(); - - async move { - info!("Indexing analytics queue"); - let result = analytics_queue_ref - .index(client_ref, &redis_ref, &pool_ref) - .await; - if let Err(e) = result { - warn!("Indexing analytics queue failed: {:?}", e); - } - info!("Done indexing analytics queue"); + match clickhouse.clone() { + Some(client) => { + let analytics_queue_ref = analytics_queue.clone(); + let pool_ref = pool.clone(); + let redis_ref = redis_pool.clone(); + scheduler.run(Duration::from_secs(15), move || { + let client_ref = client.clone(); + let analytics_queue_ref = analytics_queue_ref.clone(); + let pool_ref = pool_ref.clone(); + let redis_ref = redis_ref.clone(); + + async move { + info!("Indexing analytics queue"); + let result = analytics_queue_ref + .index(Some(client_ref), &redis_ref, &pool_ref) + .await; + if let Err(e) = result { + warn!("Indexing analytics queue failed: {:?}", e); + } + info!("Done indexing analytics queue"); + } + }); } - }); + None => { + warn!( + "Skipping scheduling analytics index: ClickHouse client unavailable" + ); + } + } } let ip_salt = Pepper { @@ -285,7 +302,7 @@ pub fn app_setup( LabrinthConfig { pool, redis_pool, - clickhouse: clickhouse.clone(), + clickhouse, file_host, maxmind, scheduler: Arc::new(scheduler), diff --git a/apps/labrinth/src/main.rs b/apps/labrinth/src/main.rs index 54bae0fc1b..a2f95d397c 100644 --- a/apps/labrinth/src/main.rs +++ b/apps/labrinth/src/main.rs @@ -11,7 +11,7 @@ use labrinth::util::ratelimit::rate_limit_middleware; use labrinth::{check_env_vars, clickhouse, database, file_hosting, queue}; use std::ffi::CStr; use std::sync::Arc; -use tracing::{error, info}; +use tracing::{error, info, warn}; use tracing_actix_web::TracingLogger; #[cfg(target_os = "linux")] @@ -126,8 +126,19 @@ async fn main() -> std::io::Result<()> { _ => panic!("Invalid storage backend specified. Aborting startup!"), }; - info!("Initializing clickhouse connection"); - let mut clickhouse = clickhouse::init_client().await.unwrap(); + info!("Initializing ClickHouse connection"); + let clickhouse = clickhouse::init_client() + .await + .inspect_err(|err| { + warn!("ClickHouse connection unavailable: {err}."); + + let hard_fail = dotenvy::var("CLICKHOUSE_HARD_FAIL").is_ok_and(|v| !v.is_empty()); + + if hard_fail { + panic!("Aborting because a ClickHouse connection couldn't be established, and CLICKHOUSE_HAIL_FAIL is set.") + } + }) + .ok(); let search_config = search::SearchConfig::new(None); @@ -136,8 +147,14 @@ async fn main() -> std::io::Result<()> { if let Some(task) = args.run_background_task { info!("Running task {task:?} and exiting"); - task.run(pool, redis_pool, search_config, clickhouse, stripe_client) - .await; + task.run( + pool, + redis_pool, + search_config, + clickhouse.clone(), + stripe_client, + ) + .await; return Ok(()); } @@ -169,7 +186,7 @@ async fn main() -> std::io::Result<()> { pool.clone(), redis_pool.clone(), search_config.clone(), - &mut clickhouse, + clickhouse, file_host.clone(), maxmind_reader.clone(), stripe_client, diff --git a/apps/labrinth/src/queue/analytics.rs b/apps/labrinth/src/queue/analytics.rs index 4269edaeb4..ff66a28d63 100644 --- a/apps/labrinth/src/queue/analytics.rs +++ b/apps/labrinth/src/queue/analytics.rs @@ -52,7 +52,7 @@ impl AnalyticsQueue { pub async fn index( &self, - client: clickhouse::Client, + client: Option, redis: &RedisPool, pool: &PgPool, ) -> Result<(), ApiError> { @@ -65,6 +65,11 @@ impl AnalyticsQueue { let playtime_queue = self.playtime_queue.clone(); self.playtime_queue.clear(); + let Some(client) = client else { + // If ClickHouse isn't available, skip indexing silently + return Ok(()); + }; + if !playtime_queue.is_empty() { let mut playtimes = client.insert("playtime")?; diff --git a/apps/labrinth/src/queue/payouts.rs b/apps/labrinth/src/queue/payouts.rs index ccdeb678a0..6f1134d73a 100644 --- a/apps/labrinth/src/queue/payouts.rs +++ b/apps/labrinth/src/queue/payouts.rs @@ -736,8 +736,12 @@ pub async fn make_aditude_request( pub async fn process_payout( pool: &PgPool, - client: &clickhouse::Client, + client: Option<&clickhouse::Client>, ) -> Result<(), ApiError> { + let Some(client) = client else { + return Ok(()); + }; + sqlx::query!( " UPDATE payouts diff --git a/apps/labrinth/src/routes/v3/analytics_get.rs b/apps/labrinth/src/routes/v3/analytics_get.rs index b2c615726c..cb19dfc17a 100644 --- a/apps/labrinth/src/routes/v3/analytics_get.rs +++ b/apps/labrinth/src/routes/v3/analytics_get.rs @@ -71,7 +71,7 @@ pub struct FetchedPlaytime { } pub async fn playtimes_get( req: HttpRequest, - clickhouse: web::Data, + clickhouse: web::Data>, data: web::Query, session_queue: web::Data, pool: web::Data, @@ -106,14 +106,18 @@ pub async fn playtimes_get( filter_allowed_ids(project_ids, user, &pool, &redis, None).await?; // Get the views - let playtimes = crate::clickhouse::fetch_playtimes( - project_ids.unwrap_or_default(), - start_date, - end_date, - resolution_minutes, - clickhouse.into_inner(), - ) - .await?; + let playtimes = if let Some(client) = clickhouse.get_ref().as_ref() { + crate::clickhouse::fetch_playtimes( + project_ids.unwrap_or_default(), + start_date, + end_date, + resolution_minutes, + client, + ) + .await? + } else { + Vec::new() + }; let mut hm = HashMap::new(); for playtime in playtimes { @@ -140,7 +144,7 @@ pub async fn playtimes_get( /// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. pub async fn views_get( req: HttpRequest, - clickhouse: web::Data, + clickhouse: web::Data>, data: web::Query, session_queue: web::Data, pool: web::Data, @@ -175,14 +179,18 @@ pub async fn views_get( filter_allowed_ids(project_ids, user, &pool, &redis, None).await?; // Get the views - let views = crate::clickhouse::fetch_views( - project_ids.unwrap_or_default(), - start_date, - end_date, - resolution_minutes, - clickhouse.into_inner(), - ) - .await?; + let views = if let Some(client) = clickhouse.get_ref().as_ref() { + crate::clickhouse::fetch_views( + project_ids.unwrap_or_default(), + start_date, + end_date, + resolution_minutes, + client, + ) + .await? + } else { + Vec::new() + }; let mut hm = HashMap::new(); for views in views { @@ -209,7 +217,7 @@ pub async fn views_get( /// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. pub async fn downloads_get( req: HttpRequest, - clickhouse: web::Data, + clickhouse: web::Data>, data: web::Query, session_queue: web::Data, pool: web::Data, @@ -245,14 +253,18 @@ pub async fn downloads_get( .await?; // Get the downloads - let downloads = crate::clickhouse::fetch_downloads( - project_ids.unwrap_or_default(), - start_date, - end_date, - resolution_minutes, - clickhouse.into_inner(), - ) - .await?; + let downloads = if let Some(client) = clickhouse.get_ref().as_ref() { + crate::clickhouse::fetch_downloads( + project_ids.unwrap_or_default(), + start_date, + end_date, + resolution_minutes, + client, + ) + .await? + } else { + Vec::new() + }; let mut hm = HashMap::new(); for downloads in downloads { @@ -418,7 +430,7 @@ pub async fn revenue_get( /// For this endpoint, provided dates are a range to aggregate over, not specific days to fetch pub async fn countries_downloads_get( req: HttpRequest, - clickhouse: web::Data, + clickhouse: web::Data>, data: web::Query, session_queue: web::Data, pool: web::Data, @@ -450,13 +462,17 @@ pub async fn countries_downloads_get( filter_allowed_ids(project_ids, user, &pool, &redis, None).await?; // Get the countries - let countries = crate::clickhouse::fetch_countries_downloads( - project_ids.unwrap_or_default(), - start_date, - end_date, - clickhouse.into_inner(), - ) - .await?; + let countries = if let Some(client) = clickhouse.get_ref().as_ref() { + crate::clickhouse::fetch_countries_downloads( + project_ids.unwrap_or_default(), + start_date, + end_date, + client, + ) + .await? + } else { + Vec::new() + }; let mut hm = HashMap::new(); for views in countries { @@ -491,7 +507,7 @@ pub async fn countries_downloads_get( /// For this endpoint, provided dates are a range to aggregate over, not specific days to fetch pub async fn countries_views_get( req: HttpRequest, - clickhouse: web::Data, + clickhouse: web::Data>, data: web::Query, session_queue: web::Data, pool: web::Data, @@ -523,13 +539,17 @@ pub async fn countries_views_get( filter_allowed_ids(project_ids, user, &pool, &redis, None).await?; // Get the countries - let countries = crate::clickhouse::fetch_countries_views( - project_ids.unwrap_or_default(), - start_date, - end_date, - clickhouse.into_inner(), - ) - .await?; + let countries = if let Some(client) = clickhouse.get_ref().as_ref() { + crate::clickhouse::fetch_countries_views( + project_ids.unwrap_or_default(), + start_date, + end_date, + client, + ) + .await? + } else { + Vec::new() + }; let mut hm = HashMap::new(); for views in countries { diff --git a/apps/labrinth/tests/common/mod.rs b/apps/labrinth/tests/common/mod.rs index 625c8498cf..0c89d64ec4 100644 --- a/apps/labrinth/tests/common/mod.rs +++ b/apps/labrinth/tests/common/mod.rs @@ -30,7 +30,7 @@ pub async fn setup(db: &database::TemporaryDatabase) -> LabrinthConfig { let search_config = db.search_config.clone(); let file_host: Arc = Arc::new(file_hosting::MockHost::new()); - let mut clickhouse = clickhouse::init_client().await.unwrap(); + let clickhouse = Some(clickhouse::init_client().await.unwrap()); let maxmind_reader = Arc::new(queue::maxmind::MaxMindIndexer::new().await.unwrap()); @@ -42,7 +42,7 @@ pub async fn setup(db: &database::TemporaryDatabase) -> LabrinthConfig { pool.clone(), redis_pool.clone(), search_config, - &mut clickhouse, + clickhouse, file_host.clone(), maxmind_reader, stripe_client,