From 45d15bc3f6dc85452f4b333b4a1a5f1787e29387 Mon Sep 17 00:00:00 2001 From: RyanPotat Date: Tue, 21 Jan 2025 08:39:07 +0000 Subject: [PATCH 1/5] feat: add namechagne route --- src/db/mod.rs | 53 ++++++++++++++++++++++++++++++++++++++++++++- src/web/handlers.rs | 16 +++++++++++++- src/web/mod.rs | 6 +++++ src/web/schema.rs | 14 ++++++++++++ 4 files changed, 87 insertions(+), 2 deletions(-) diff --git a/src/db/mod.rs b/src/db/mod.rs index 521874c..ea33c67 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -2,6 +2,8 @@ mod migrations; pub mod schema; pub mod writer; +use std::collections::HashSet; + pub use migrations::run as setup_db; use serde::Deserialize; use writer::FlushBuffer; @@ -12,7 +14,7 @@ use crate::{ schema::LogRangeParams, stream::{FlushBufferResponse, LogsStream}, }, - web::schema::{AvailableLogDate, ChannelLogsStats, LogsParams, UserLogsStats}, + web::schema::{AvailableLogDate, ChannelLogsStats, LogsParams, PreviousName, PreviousNames, UserLogsStats}, Result, }; use chrono::{DateTime, Datelike, Duration, Utc}; @@ -381,6 +383,55 @@ pub async fn get_user_stats( }) } +pub async fn get_user_name_history( + db: &Client, + user_id: &str, +) -> Result { + #[derive(Deserialize, Row, Debug)] + struct SingleNameHistory { + pub user_login: String, + pub last_timestamp: String, + pub first_timestamp: String, + } + + let query = "SELECT user_login, MAX(timestamp) AS last_timestamp FROM rustlog.message_structured WHERE user_id = ? GROUP BY user_login".to_owned(); + + let query = db.query(&query).bind(user_id); + + let name_history_rows = query.fetch_all::().await?; + + // Log to console the name history rows + for name_history_row in &name_history_rows { + println!("Name history row: {:?}", name_history_row); + } + + let mut seen_logins = HashSet::new(); + + // If name starts with ':' (NOTICE, CLEARCHAT? Parse error?), remove char and deduplicate rows + let names = name_history_rows + .into_iter() + .filter_map(|name_history_row: SingleNameHistory| { + let sanitized_user_login = if name_history_row.user_login.starts_with(':') { + name_history_row.user_login.chars().skip(1).collect::() + } else { + name_history_row.user_login.clone() + }; + + if seen_logins.insert(sanitized_user_login.clone()) { + Some(PreviousName { + user_login: sanitized_user_login, + last_timestamp: name_history_row.last_timestamp, + first_timestamp: name_history_row.first_timestamp, + }) + } else { + None + } + }) + .collect::(); + + Ok(names) +} + fn apply_limit_offset(query: &mut String, buffer_response: &FlushBufferResponse) { if let Some(limit) = buffer_response.normalized_limit() { *query = format!("{query} LIMIT {limit}"); diff --git a/src/web/handlers.rs b/src/web/handlers.rs index f2e33a6..4df120f 100644 --- a/src/web/handlers.rs +++ b/src/web/handlers.rs @@ -3,7 +3,7 @@ use super::{ schema::{ AvailableLogs, AvailableLogsParams, Channel, ChannelIdType, ChannelLogsByDatePath, ChannelLogsStats, ChannelParam, ChannelsList, LogsParams, LogsPathChannel, SearchParams, - UserLogPathParams, UserLogsPath, UserLogsStats, UserParam, + UserLogPathParams, UserLogsPath, UserLogsStats, UserParam, UserNameHistoryParam }, }; use crate::{ @@ -505,6 +505,20 @@ async fn search_user_logs( Ok(logs) } + +pub async fn get_user_name_history( + app: State, + Path(UserNameHistoryParam { + user_id, + }): Path, +) -> Result { + app.check_opted_out(&user_id, None)?; + + let names = db::get_user_name_history(&app.db,&user_id).await?; + + Ok(Json(names)) +} + pub async fn optout(app: State) -> Json { let mut rng = thread_rng(); let optout_code: String = (0..5).map(|_| rng.sample(Alphanumeric) as char).collect(); diff --git a/src/web/mod.rs b/src/web/mod.rs index ba557cc..98d387e 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -167,6 +167,12 @@ pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender; From f2afe5c580904942f2b4d68d564a77c9f28d9df4 Mon Sep 17 00:00:00 2001 From: RyanPotat Date: Tue, 21 Jan 2025 10:05:54 +0000 Subject: [PATCH 2/5] fix: actually works now --- src/db/mod.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/db/mod.rs b/src/db/mod.rs index ea33c67..4c6b037 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,7 +1,6 @@ mod migrations; pub mod schema; pub mod writer; - use std::collections::HashSet; pub use migrations::run as setup_db; @@ -387,27 +386,21 @@ pub async fn get_user_name_history( db: &Client, user_id: &str, ) -> Result { - #[derive(Deserialize, Row, Debug)] + #[derive(Deserialize, Row)] struct SingleNameHistory { pub user_login: String, - pub last_timestamp: String, - pub first_timestamp: String, + pub last_timestamp: i32, + pub first_timestamp: i32, } - let query = "SELECT user_login, MAX(timestamp) AS last_timestamp FROM rustlog.message_structured WHERE user_id = ? GROUP BY user_login".to_owned(); + let query = "SELECT user_login, toDateTime((MAX(timestamp))) AS last_timestamp, toDateTime(MIN(timestamp)) AS first_timestamp FROM message_structured WHERE user_id = ? GROUP BY user_login".to_owned(); let query = db.query(&query).bind(user_id); let name_history_rows = query.fetch_all::().await?; - // Log to console the name history rows - for name_history_row in &name_history_rows { - println!("Name history row: {:?}", name_history_row); - } - let mut seen_logins = HashSet::new(); - // If name starts with ':' (NOTICE, CLEARCHAT? Parse error?), remove char and deduplicate rows let names = name_history_rows .into_iter() .filter_map(|name_history_row: SingleNameHistory| { @@ -420,8 +413,12 @@ pub async fn get_user_name_history( if seen_logins.insert(sanitized_user_login.clone()) { Some(PreviousName { user_login: sanitized_user_login, - last_timestamp: name_history_row.last_timestamp, - first_timestamp: name_history_row.first_timestamp, + last_timestamp: DateTime::from_timestamp(name_history_row.last_timestamp.into(), 0) + .expect("Invalid DateTime") + .to_rfc3339(), + first_timestamp: DateTime::from_timestamp(name_history_row.first_timestamp.into(), 0) + .expect("Invalid DateTime") + .to_rfc3339(), }) } else { None From b15b7ceee99664757a93c47582cf8428f1b9fb73 Mon Sep 17 00:00:00 2001 From: RyanPotat Date: Thu, 6 Feb 2025 05:50:15 +0000 Subject: [PATCH 3/5] fix: add cache, simplify deduplication, remove redundant type --- src/db/mod.rs | 15 +++++---------- src/web/schema.rs | 2 -- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/src/db/mod.rs b/src/db/mod.rs index 4c6b037..0599d0d 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -13,7 +13,7 @@ use crate::{ schema::LogRangeParams, stream::{FlushBufferResponse, LogsStream}, }, - web::schema::{AvailableLogDate, ChannelLogsStats, LogsParams, PreviousName, PreviousNames, UserLogsStats}, + web::schema::{AvailableLogDate, ChannelLogsStats, LogsParams, PreviousName, UserLogsStats}, Result, }; use chrono::{DateTime, Datelike, Duration, Utc}; @@ -385,7 +385,7 @@ pub async fn get_user_stats( pub async fn get_user_name_history( db: &Client, user_id: &str, -) -> Result { +) -> Result> { #[derive(Deserialize, Row)] struct SingleNameHistory { pub user_login: String, @@ -393,7 +393,7 @@ pub async fn get_user_name_history( pub first_timestamp: i32, } - let query = "SELECT user_login, toDateTime((MAX(timestamp))) AS last_timestamp, toDateTime(MIN(timestamp)) AS first_timestamp FROM message_structured WHERE user_id = ? GROUP BY user_login".to_owned(); + let query = "SELECT user_login, toDateTime((MAX(timestamp))) AS last_timestamp, toDateTime(MIN(timestamp)) AS first_timestamp FROM message_structured WHERE user_id = ? GROUP BY user_login SETTINGS use_query_cache = 1, query_cache_ttl = 600".to_owned(); let query = db.query(&query).bind(user_id); @@ -404,12 +404,7 @@ pub async fn get_user_name_history( let names = name_history_rows .into_iter() .filter_map(|name_history_row: SingleNameHistory| { - let sanitized_user_login = if name_history_row.user_login.starts_with(':') { - name_history_row.user_login.chars().skip(1).collect::() - } else { - name_history_row.user_login.clone() - }; - + let sanitized_user_login = name_history_row.user_login.trim_start_matches(':').to_owned(); if seen_logins.insert(sanitized_user_login.clone()) { Some(PreviousName { user_login: sanitized_user_login, @@ -424,7 +419,7 @@ pub async fn get_user_name_history( None } }) - .collect::(); + .collect::>(); Ok(names) } diff --git a/src/web/schema.rs b/src/web/schema.rs index eac2280..a58720f 100644 --- a/src/web/schema.rs +++ b/src/web/schema.rs @@ -188,5 +188,3 @@ pub struct PreviousName { pub last_timestamp: String, pub first_timestamp: String, } - -pub type PreviousNames = Vec; From fe52cb848c23ab71fd65578d6b38163b78522bca Mon Sep 17 00:00:00 2001 From: RyanPotat Date: Thu, 6 Feb 2025 06:33:50 +0000 Subject: [PATCH 4/5] impr: split query to avoid aggregation --- src/db/mod.rs | 46 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/src/db/mod.rs b/src/db/mod.rs index 0599d0d..edf99fb 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -21,6 +21,7 @@ use clickhouse::{query::RowCursor, Client, Row}; use rand::{seq::IteratorRandom, thread_rng}; use schema::StructuredMessage; use tracing::debug; +use futures::stream::StreamExt; const CHANNEL_MULTI_QUERY_SIZE_DAYS: i64 = 14; @@ -388,30 +389,55 @@ pub async fn get_user_name_history( ) -> Result> { #[derive(Deserialize, Row)] struct SingleNameHistory { - pub user_login: String, pub last_timestamp: i32, pub first_timestamp: i32, } - let query = "SELECT user_login, toDateTime((MAX(timestamp))) AS last_timestamp, toDateTime(MIN(timestamp)) AS first_timestamp FROM message_structured WHERE user_id = ? GROUP BY user_login SETTINGS use_query_cache = 1, query_cache_ttl = 600".to_owned(); + let name_query = "SELECT DISTINCT user_login FROM message_structured WHERE user_id = ? SETTINGS use_query_cache = 1, query_cache_ttl = 600".to_owned(); + let name_query = db.query(&name_query).bind(user_id); + let distinct_logins = name_query.fetch_all::().await?; + if distinct_logins.len() == 0 { + return Ok(vec![]); + } + + let sanitized_user_logins = distinct_logins + .iter() + .map(|login| login.trim_start_matches(':').to_owned()) + .collect::>(); - let query = db.query(&query).bind(user_id); + let history_query = "SELECT toDateTime(MAX(timestamp)) AS last_timestamp, toDateTime(MIN(timestamp)) AS first_timestamp FROM message_structured WHERE (user_id = ?) AND (user_login = ?) SETTINGS use_query_cache = 1, query_cache_ttl = 600".to_owned(); - let name_history_rows = query.fetch_all::().await?; + let name_history_rows = sanitized_user_logins + .into_iter() + .map(|login| { + let query = history_query.clone(); + async move { + let query = db.query(&query).bind(user_id).bind(&login); + query + .fetch_one::() + .await + .map(|history| (login, history)) + .map_err(Error::from) + } + }) + .collect::>() + .collect::>() + .await + .into_iter() + .collect::>>()?; let mut seen_logins = HashSet::new(); let names = name_history_rows .into_iter() - .filter_map(|name_history_row: SingleNameHistory| { - let sanitized_user_login = name_history_row.user_login.trim_start_matches(':').to_owned(); - if seen_logins.insert(sanitized_user_login.clone()) { + .filter_map(|(login, history)| { + if seen_logins.insert(login.clone()) { Some(PreviousName { - user_login: sanitized_user_login, - last_timestamp: DateTime::from_timestamp(name_history_row.last_timestamp.into(), 0) + user_login: login, + last_timestamp: DateTime::from_timestamp(history.last_timestamp.into(), 0) .expect("Invalid DateTime") .to_rfc3339(), - first_timestamp: DateTime::from_timestamp(name_history_row.first_timestamp.into(), 0) + first_timestamp: DateTime::from_timestamp(history.first_timestamp.into(), 0) .expect("Invalid DateTime") .to_rfc3339(), }) From f67e62bb2726b8fd35c9c66e8451e035ae8487a3 Mon Sep 17 00:00:00 2001 From: RyanPotat Date: Fri, 7 Feb 2025 09:06:17 +0000 Subject: [PATCH 5/5] fix: update type, simplify mapping, clippy nitpick --- src/db/mod.rs | 44 +++++++++++++++++--------------------------- src/web/schema.rs | 7 +++++-- 2 files changed, 22 insertions(+), 29 deletions(-) diff --git a/src/db/mod.rs b/src/db/mod.rs index edf99fb..cba1437 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -21,7 +21,7 @@ use clickhouse::{query::RowCursor, Client, Row}; use rand::{seq::IteratorRandom, thread_rng}; use schema::StructuredMessage; use tracing::debug; -use futures::stream::StreamExt; +use futures::future::try_join_all; const CHANNEL_MULTI_QUERY_SIZE_DAYS: i64 = 14; @@ -396,35 +396,27 @@ pub async fn get_user_name_history( let name_query = "SELECT DISTINCT user_login FROM message_structured WHERE user_id = ? SETTINGS use_query_cache = 1, query_cache_ttl = 600".to_owned(); let name_query = db.query(&name_query).bind(user_id); let distinct_logins = name_query.fetch_all::().await?; - if distinct_logins.len() == 0 { + if distinct_logins.is_empty() { return Ok(vec![]); } let sanitized_user_logins = distinct_logins .iter() - .map(|login| login.trim_start_matches(':').to_owned()) - .collect::>(); + .map(|login| login.trim_start_matches(':').to_owned()); let history_query = "SELECT toDateTime(MAX(timestamp)) AS last_timestamp, toDateTime(MIN(timestamp)) AS first_timestamp FROM message_structured WHERE (user_id = ?) AND (user_login = ?) SETTINGS use_query_cache = 1, query_cache_ttl = 600".to_owned(); - let name_history_rows = sanitized_user_logins - .into_iter() - .map(|login| { - let query = history_query.clone(); - async move { - let query = db.query(&query).bind(user_id).bind(&login); - query - .fetch_one::() - .await - .map(|history| (login, history)) - .map_err(Error::from) - } - }) - .collect::>() - .collect::>() - .await - .into_iter() - .collect::>>()?; + let name_history_rows = try_join_all(sanitized_user_logins.into_iter().map(|login| { + let query = history_query.clone(); + async move { + let query = db.query(&query).bind(user_id).bind(&login); + query + .fetch_one::() + .await + .map(|history| (login, history)) + } + })) + .await?; let mut seen_logins = HashSet::new(); @@ -435,17 +427,15 @@ pub async fn get_user_name_history( Some(PreviousName { user_login: login, last_timestamp: DateTime::from_timestamp(history.last_timestamp.into(), 0) - .expect("Invalid DateTime") - .to_rfc3339(), + .expect("Invalid DateTime"), first_timestamp: DateTime::from_timestamp(history.first_timestamp.into(), 0) - .expect("Invalid DateTime") - .to_rfc3339(), + .expect("Invalid DateTime"), }) } else { None } }) - .collect::>(); + .collect(); Ok(names) } diff --git a/src/web/schema.rs b/src/web/schema.rs index a58720f..23d3e77 100644 --- a/src/web/schema.rs +++ b/src/web/schema.rs @@ -1,4 +1,5 @@ use super::responders::logs::{JsonResponseType, LogsResponseType}; +use chrono::{DateTime, Utc}; use schemars::JsonSchema; use serde::{Deserialize, Deserializer, Serialize}; use std::fmt::Display; @@ -185,6 +186,8 @@ pub struct UserNameHistoryParam { #[derive(Serialize, JsonSchema)] pub struct PreviousName { pub user_login: String, - pub last_timestamp: String, - pub first_timestamp: String, + #[schemars(with = "String")] + pub last_timestamp: DateTime, + #[schemars(with = "String")] + pub first_timestamp: DateTime, }