Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/db/migrations/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
mod migratable;
mod structured;
mod username_history;

use crate::Result;
use clickhouse::Client;
use structured::StructuredMigration;
use tracing::{debug, info};
use username_history::UsernameHistoryMigration;

use self::migratable::Migratable;

Expand Down Expand Up @@ -71,6 +73,8 @@ String CODEC(ZSTD(10))

run_migration(db, "6_structured_message", StructuredMigration { db_name }).await?;

run_migration(db, "7_username_history", UsernameHistoryMigration).await?;

Ok(())
}

Expand Down
81 changes: 81 additions & 0 deletions src/db/migrations/username_history.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use super::migratable::Migratable;
use anyhow::Context;
use tracing::{info, warn};

pub struct UsernameHistoryMigration;

impl<'a> Migratable<'a> for UsernameHistoryMigration {
async fn run(&self, db: &'a clickhouse::Client) -> anyhow::Result<()> {
let partitions = db
.query("SELECT DISTINCT toYYYYMM(timestamp) as partition FROM message_structured ORDER BY partition ASC")
.fetch_all::<u32>()
.await
.context("Could not fetch partition list")?;

db.query(
"
CREATE TABLE username_history
(
user_id String CODEC(ZSTD(8)),
user_login String CODEC(ZSTD(8)),
first_timestamp SimpleAggregateFunction(min, DateTime64(3)) CODEC(ZSTD(5)),
last_timestamp SimpleAggregateFunction(max, DateTime64(3)) CODEC(ZSTD(5))
)
ENGINE = AggregatingMergeTree
ORDER BY (user_id, user_login)
",
)
.execute()
.await?;

info!(
"Filling username history from {} partitions",
partitions.len()
);

for partition in partitions {
info!("Filling username history for partition {partition}");
db.query(
"
INSERT INTO username_history
SELECT
user_id,
user_login,
minSimpleState(timestamp) AS first_timestamp,
maxSimpleState(timestamp) AS last_timestamp
FROM message_structured
WHERE toYYYYMM(timestamp) = ?
GROUP BY user_id, user_login
",
)
.bind(partition)
.execute()
.await
.context("Could not fill username history")?;
}

db.query(
"
CREATE MATERIALIZED VIEW username_history_mv
TO username_history
AS SELECT
user_id,
user_login,
minSimpleState(timestamp) AS first_timestamp,
maxSimpleState(timestamp) AS last_timestamp
FROM message_structured
GROUP BY user_id, user_login
",
)
.execute()
.await?;

if let Err(err) = db.query("OPTIMIZE TABLE username_history").execute().await {
warn!("Could not run OPTIMIZE query on table: {err}");
}

info!("Username history built");

Ok(())
}
}
48 changes: 17 additions & 31 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::{
};
use chrono::{DateTime, Datelike, Duration, Utc};
use clickhouse::{query::RowCursor, Client, Row};
use futures::future::try_join_all;
use rand::{rng, seq::IteratorRandom};
use schema::StructuredMessage;
use tracing::debug;
Expand Down Expand Up @@ -386,46 +385,33 @@ pub async fn get_user_stats(
pub async fn get_user_name_history(db: &Client, user_id: &str) -> Result<Vec<PreviousName>> {
#[derive(Deserialize, Row)]
struct SingleNameHistory {
pub last_timestamp: i32,
pub first_timestamp: i32,
user_login: String,
last_timestamp: i64,
first_timestamp: i64,
}

let name_query = "SELECT DISTINCT user_login FROM message_structured WHERE user_id = ? SETTINGS use_query_cache = 1, query_cache_ttl = 600".to_owned();
let name_query = db.query(&name_query).bind(user_id);
let distinct_logins = name_query.fetch_all::<String>().await?;
if distinct_logins.is_empty() {
return Ok(vec![]);
}

let sanitized_user_logins = distinct_logins
.iter()
.map(|login| login.trim_start_matches(':').to_owned());
let query = "
SELECT trim(LEADING ':' FROM user_login) as user_login,
max(last_timestamp) AS last_timestamp,
min(first_timestamp) AS first_timestamp
FROM username_history
WHERE user_id = ?
GROUP BY user_login";

let history_query = "SELECT toDateTime(MAX(timestamp)) AS last_timestamp, toDateTime(MIN(timestamp)) AS first_timestamp FROM message_structured WHERE (user_id = ?) AND (user_login = ?) SETTINGS use_query_cache = 1, query_cache_ttl = 600".to_owned();

let name_history_rows = try_join_all(sanitized_user_logins.into_iter().map(|login| {
let query = history_query.clone();
async move {
let query = db.query(&query).bind(user_id).bind(&login);
query
.fetch_one::<SingleNameHistory>()
.await
.map(|history| (login, history))
}
}))
.await?;
let name_history_rows: Vec<SingleNameHistory> =
db.query(query).bind(user_id).fetch_all().await?;

let mut seen_logins = HashSet::new();

let names = name_history_rows
.into_iter()
.filter_map(|(login, history)| {
if seen_logins.insert(login.clone()) {
.filter_map(|row| {
if seen_logins.insert(row.user_login.clone()) {
Some(PreviousName {
user_login: login,
last_timestamp: DateTime::from_timestamp(history.last_timestamp.into(), 0)
user_login: row.user_login,
last_timestamp: DateTime::from_timestamp_millis(row.last_timestamp)
.expect("Invalid DateTime"),
first_timestamp: DateTime::from_timestamp(history.first_timestamp.into(), 0)
first_timestamp: DateTime::from_timestamp_millis(row.first_timestamp)
.expect("Invalid DateTime"),
})
} else {
Expand Down
14 changes: 7 additions & 7 deletions src/web/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tower_http::{
};
use tracing::{debug, info};

const CAPABILITIES: &[&str] = &["arbitrary-range-query", "search", "stats"];
const CAPABILITIES: &[&str] = &["arbitrary-range-query", "search", "stats", "namehistory"];

pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender<BotMessage>) {
aide::generate::on_error(|error| {
Expand Down Expand Up @@ -83,12 +83,12 @@ pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender<BotMessag
}),
)
// Paths with static parts should go first so they aren't overridden by the dynamic date paths later
// .api_route(
// "/namehistory/{user_id}",
// get_with(handlers::get_user_name_history, |op| {
// op.description("Get user name history by provided user id")
// }),
// )
.api_route(
"/namehistory/{user_id}",
get_with(handlers::get_user_name_history, |op| {
op.description("Get user name history by provided user id")
}),
)
.api_route(
"/{channel_id_type}/{channel}/{user_id_type}/{user}/search",
get_with(handlers::search_user_logs, |op| {
Expand Down
Loading