From a0f83b34975f817543559d80c90d252cfe1b8112 Mon Sep 17 00:00:00 2001 From: boring_nick Date: Sat, 8 Feb 2025 11:11:03 +0200 Subject: [PATCH 1/3] username history using materialized view --- src/db/migrations/mod.rs | 4 ++ src/db/migrations/username_history.rs | 77 +++++++++++++++++++++++++++ src/db/mod.rs | 48 ++++++----------- src/web/mod.rs | 12 ++--- 4 files changed, 104 insertions(+), 37 deletions(-) create mode 100644 src/db/migrations/username_history.rs diff --git a/src/db/migrations/mod.rs b/src/db/migrations/mod.rs index 503efc5..c4a9232 100644 --- a/src/db/migrations/mod.rs +++ b/src/db/migrations/mod.rs @@ -1,10 +1,12 @@ mod migratable; mod structured; +mod username_history; use crate::Result; use clickhouse::Client; use structured::StructuredMigration; use tracing::{debug, info}; +use username_history::UsernameHistoryMigration; use self::migratable::Migratable; @@ -71,6 +73,8 @@ String CODEC(ZSTD(10)) run_migration(db, "6_structured_message", StructuredMigration { db_name }).await?; + run_migration(db, "7_username_history", UsernameHistoryMigration).await?; + Ok(()) } diff --git a/src/db/migrations/username_history.rs b/src/db/migrations/username_history.rs new file mode 100644 index 0000000..6f90fd5 --- /dev/null +++ b/src/db/migrations/username_history.rs @@ -0,0 +1,77 @@ +use super::migratable::Migratable; +use anyhow::Context; +use tracing::info; + +pub struct UsernameHistoryMigration; + +impl<'a> Migratable<'a> for UsernameHistoryMigration { + async fn run(&self, db: &'a clickhouse::Client) -> anyhow::Result<()> { + let partitions = db + .query("SELECT DISTINCT toYYYYMM(timestamp) as partition FROM message_structured ORDER BY partition ASC") + .fetch_all::() + .await + .context("Could not fetch partition list")?; + + db.query( + " + CREATE TABLE username_history + ( + user_id String CODEC(ZSTD(8)), + user_login String CODEC(ZSTD(8)), + first_timestamp SimpleAggregateFunction(min, DateTime64(3)) CODEC(ZSTD(5)), + last_timestamp SimpleAggregateFunction(max, DateTime64(3)) CODEC(ZSTD(5)) + ) + ENGINE = AggregatingMergeTree + ORDER BY (user_id, user_login) + ", + ) + .execute() + .await?; + + info!( + "Filling username history from {} partitions", + partitions.len() + ); + + for partition in partitions { + info!("Filling username history for partition {partition}"); + db.query( + " + INSERT INTO username_history + SELECT + user_id, + user_login, + minSimpleState(timestamp) AS first_timestamp, + maxSimpleState(timestamp) AS last_timestamp + FROM message_structured + WHERE toYYYYMM(timestamp) = ? + GROUP BY user_id, user_login + ", + ) + .bind(partition) + .execute() + .await + .context("Could not fill username history")?; + } + + db.query( + " + CREATE MATERIALIZED VIEW username_history_mv + TO username_history + AS SELECT + user_id, + user_login, + minSimpleState(timestamp) AS first_timestamp, + maxSimpleState(timestamp) AS last_timestamp + FROM message_structured + GROUP BY user_id, user_login + ", + ) + .execute() + .await?; + + info!("Username history built"); + + Ok(()) + } +} diff --git a/src/db/mod.rs b/src/db/mod.rs index 7160193..6060bd1 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -18,7 +18,6 @@ use crate::{ }; use chrono::{DateTime, Datelike, Duration, Utc}; use clickhouse::{query::RowCursor, Client, Row}; -use futures::future::try_join_all; use rand::{rng, seq::IteratorRandom}; use schema::StructuredMessage; use tracing::debug; @@ -386,46 +385,33 @@ pub async fn get_user_stats( pub async fn get_user_name_history(db: &Client, user_id: &str) -> Result> { #[derive(Deserialize, Row)] struct SingleNameHistory { - pub last_timestamp: i32, - pub first_timestamp: i32, + user_login: String, + last_timestamp: i64, + first_timestamp: i64, } - let name_query = "SELECT DISTINCT user_login FROM message_structured WHERE user_id = ? SETTINGS use_query_cache = 1, query_cache_ttl = 600".to_owned(); - let name_query = db.query(&name_query).bind(user_id); - let distinct_logins = name_query.fetch_all::().await?; - if distinct_logins.is_empty() { - return Ok(vec![]); - } - - let sanitized_user_logins = distinct_logins - .iter() - .map(|login| login.trim_start_matches(':').to_owned()); + let query = " + SELECT user_login, + max(last_timestamp) AS last_timestamp, + min(first_timestamp) AS first_timestamp + FROM username_history + WHERE user_id = ? + GROUP BY user_login"; - let history_query = "SELECT toDateTime(MAX(timestamp)) AS last_timestamp, toDateTime(MIN(timestamp)) AS first_timestamp FROM message_structured WHERE (user_id = ?) AND (user_login = ?) SETTINGS use_query_cache = 1, query_cache_ttl = 600".to_owned(); - - let name_history_rows = try_join_all(sanitized_user_logins.into_iter().map(|login| { - let query = history_query.clone(); - async move { - let query = db.query(&query).bind(user_id).bind(&login); - query - .fetch_one::() - .await - .map(|history| (login, history)) - } - })) - .await?; + let name_history_rows: Vec = + db.query(query).bind(user_id).fetch_all().await?; let mut seen_logins = HashSet::new(); let names = name_history_rows .into_iter() - .filter_map(|(login, history)| { - if seen_logins.insert(login.clone()) { + .filter_map(|row| { + if seen_logins.insert(row.user_login.clone()) { Some(PreviousName { - user_login: login, - last_timestamp: DateTime::from_timestamp(history.last_timestamp.into(), 0) + user_login: row.user_login, + last_timestamp: DateTime::from_timestamp_millis(row.last_timestamp) .expect("Invalid DateTime"), - first_timestamp: DateTime::from_timestamp(history.first_timestamp.into(), 0) + first_timestamp: DateTime::from_timestamp_millis(row.first_timestamp) .expect("Invalid DateTime"), }) } else { diff --git a/src/web/mod.rs b/src/web/mod.rs index db7754b..83a30e8 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -83,12 +83,12 @@ pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender Date: Sat, 8 Feb 2025 17:16:48 +0200 Subject: [PATCH 2/3] trim login and run optimize table on migration --- src/db/migrations/username_history.rs | 6 +++++- src/db/mod.rs | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/db/migrations/username_history.rs b/src/db/migrations/username_history.rs index 6f90fd5..79b5898 100644 --- a/src/db/migrations/username_history.rs +++ b/src/db/migrations/username_history.rs @@ -1,6 +1,6 @@ use super::migratable::Migratable; use anyhow::Context; -use tracing::info; +use tracing::{info, warn}; pub struct UsernameHistoryMigration; @@ -70,6 +70,10 @@ impl<'a> Migratable<'a> for UsernameHistoryMigration { .execute() .await?; + if let Err(err) = db.query("OPTIMIZE TABLE username_history").execute().await { + warn!("Could not run OPTIMIZE query on table: {err}"); + } + info!("Username history built"); Ok(()) diff --git a/src/db/mod.rs b/src/db/mod.rs index 6060bd1..d3a2c5d 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -391,7 +391,7 @@ pub async fn get_user_name_history(db: &Client, user_id: &str) -> Result) { aide::generate::on_error(|error| {