diff --git a/src/interpreter/clickhouse.rs b/src/interpreter/clickhouse.rs index f780406..6dafa26 100644 --- a/src/interpreter/clickhouse.rs +++ b/src/interpreter/clickhouse.rs @@ -224,8 +224,18 @@ impl ClickHouse { start: RelativeDateTime, end: RelativeDateTime, limit: u64, + selected_host: Option<&String>, ) -> Result { let dbtable = self.get_table_name("system", "query_log"); + let host_filter = if let Some(host) = selected_host { + if !host.is_empty() && self.options.cluster.is_some() { + format!("AND hostName() = '{}'", host.replace('\'', "''")) + } else { + String::new() + } + } else { + String::new() + }; return self .execute( format!( @@ -244,6 +254,7 @@ impl ClickHouse { query_duration_ms > 1e3 {filter} {internal} + {host_filter} ORDER BY query_duration_ms DESC LIMIT {limit} ) @@ -290,7 +301,8 @@ impl ClickHouse { format!("AND (client_hostname LIKE '{0}' OR log_comment LIKE '{0}' OR os_user LIKE '{0}' OR user LIKE '{0}' OR initial_user LIKE '{0}' OR client_name LIKE '{0}' OR query_id LIKE '{0}' OR query LIKE '{0}')", &filter) } else { "".to_string() - } + }, + host_filter = host_filter, ) .as_str(), ) @@ -303,11 +315,21 @@ impl ClickHouse { start: RelativeDateTime, end: RelativeDateTime, limit: u64, + selected_host: Option<&String>, ) -> Result { // TODO: // - propagate sort order from the table // - distributed_group_by_no_merge=2 is broken for this query with WINDOW function let dbtable = self.get_table_name("system", "query_log"); + let host_filter = if let Some(host) = selected_host { + if !host.is_empty() && self.options.cluster.is_some() { + format!("AND hostName() = '{}'", host.replace('\'', "''")) + } else { + String::new() + } + } else { + String::new() + }; return self .execute( format!( @@ -324,6 +346,7 @@ impl ClickHouse { is_initial_query {filter} {internal} + {host_filter} ORDER BY event_date DESC, event_time DESC LIMIT {limit} ) @@ -370,15 +393,22 @@ impl ClickHouse { format!("AND (client_hostname LIKE '{0}' OR log_comment LIKE '{0}' OR os_user LIKE '{0}' OR user LIKE '{0}' OR initial_user LIKE '{0}' OR client_name LIKE '{0}' OR query_id LIKE '{0}' OR query LIKE '{0}')", &filter) } else { "".to_string() - } + }, + host_filter = host_filter, ) .as_str(), ) .await; } - pub async fn get_processlist(&self, filter: String, limit: u64) -> Result { + pub async fn get_processlist( + &self, + filter: String, + limit: u64, + selected_host: Option<&String>, + ) -> Result { let dbtable = self.get_table_name_no_history("system", "processes"); + let host_filter = self.get_host_filter_clause(selected_host); return self .execute( format!( @@ -408,6 +438,7 @@ impl ClickHouse { WHERE 1 {filter} {internal} + {host_filter} LIMIT {limit} "#, dbtable, @@ -438,15 +469,26 @@ impl ClickHouse { } else { "length(thread_ids)" }, + host_filter = host_filter, ) .as_str(), ) .await; } - pub async fn get_summary(&self) -> Result { + pub async fn get_summary( + &self, + selected_host: Option<&String>, + ) -> Result { + let host_filter = self.get_host_filter_clause(selected_host); + let host_where = if host_filter.is_empty() { + String::new() + } else { + format!(" WHERE {}", &host_filter[4..]) // Remove leading "AND " + }; + let memory_index_granularity_trait = if self.quirks.has(ClickHouseAvailableQuirks::AsynchronousMetricsTotalIndexGranularityBytesInMemoryAllocated) { - format!("(SELECT sum(index_granularity_bytes_in_memory_allocated) FROM {}) AS memory_index_granularity_", self.get_table_name_no_history("system", "parts")) + format!("(SELECT sum(index_granularity_bytes_in_memory_allocated) FROM {}{}) AS memory_index_granularity_", self.get_table_name_no_history("system", "parts"), host_where) } else { "0::UInt64 AS memory_index_granularity_".to_string() }; @@ -458,26 +500,27 @@ impl ClickHouse { r#" WITH -- memory detalization - (SELECT sum(CAST(value AS UInt64)) FROM {metrics} WHERE metric = 'MemoryTracking') AS memory_tracked_, - (SELECT sum(CAST(value AS UInt64)) FROM {metrics} WHERE metric = 'MergesMutationsMemoryTracking') AS memory_merges_mutations_, - (SELECT sum(total_bytes) FROM {tables} WHERE engine IN ('Join','Memory','Buffer','Set')) AS memory_tables_, - (SELECT sum(CAST(value AS UInt64)) FROM {asynchronous_metrics} WHERE metric LIKE '%CacheBytes' AND metric NOT LIKE '%Filesystem%') AS memory_async_metrics_caches_, + (SELECT sum(CAST(value AS UInt64)) FROM {metrics} WHERE metric = 'MemoryTracking' {host_filter_and}) AS memory_tracked_, + (SELECT sum(CAST(value AS UInt64)) FROM {metrics} WHERE metric = 'MergesMutationsMemoryTracking' {host_filter_and}) AS memory_merges_mutations_, + (SELECT sum(total_bytes) FROM {tables} WHERE engine IN ('Join','Memory','Buffer','Set') {host_filter_and}) AS memory_tables_, + (SELECT sum(CAST(value AS UInt64)) FROM {asynchronous_metrics} WHERE metric LIKE '%CacheBytes' AND metric NOT LIKE '%Filesystem%' {host_filter_and}) AS memory_async_metrics_caches_, (SELECT sum(CAST(value AS UInt64)) FROM {metrics} WHERE metric NOT LIKE '%Filesystem%' AND (metric LIKE '%CacheBytes' OR metric IN ('IcebergMetadataFilesCacheSize', 'VectorSimilarityIndexCacheSize')) + {host_filter_and} ) AS memory_metrics_caches_, - (SELECT sum(CAST(memory_usage AS UInt64)) FROM {processes}) AS memory_queries_, - (SELECT sum(CAST(memory_usage AS UInt64)) FROM {merges}) AS memory_active_merges_, - (SELECT sum(bytes_allocated) FROM {dictionaries}) AS memory_dictionaries_, - (SELECT sum(total_bytes) FROM {async_inserts}) AS memory_async_inserts_, + (SELECT sum(CAST(memory_usage AS UInt64)) FROM {processes} {host_filter_where}) AS memory_queries_, + (SELECT sum(CAST(memory_usage AS UInt64)) FROM {merges} {host_filter_where}) AS memory_active_merges_, + (SELECT sum(bytes_allocated) FROM {dictionaries} {host_filter_where}) AS memory_dictionaries_, + (SELECT sum(total_bytes) FROM {async_inserts} {host_filter_where}) AS memory_async_inserts_, {memory_index_granularity_trait}, - (SELECT count() FROM {one}) AS servers_, - (SELECT count() FROM {processes}) AS queries_, - (SELECT count() FROM {merges}) AS merges_, - (SELECT count() FROM {mutations} WHERE NOT is_done) AS mutations_, - (SELECT count() FROM {replication_queue}) AS replication_queue_, - (SELECT sum(num_tries) FROM {replication_queue}) AS replication_queue_tries_, - (SELECT count() FROM {fetches}) AS fetches_ + (SELECT count() FROM {one} {host_filter_where}) AS servers_, + (SELECT count() FROM {processes} {host_filter_where}) AS queries_, + (SELECT count() FROM {merges} {host_filter_where}) AS merges_, + (SELECT count() FROM {mutations} WHERE NOT is_done {host_filter_and}) AS mutations_, + (SELECT count() FROM {replication_queue} {host_filter_where}) AS replication_queue_, + (SELECT sum(num_tries) FROM {replication_queue} {host_filter_where}) AS replication_queue_tries_, + (SELECT count() FROM {fetches} {host_filter_where}) AS fetches_ SELECT assumeNotNull(memory_tracked_) AS memory_tracked, assumeNotNull(memory_merges_mutations_) AS memory_merges_mutations, @@ -555,6 +598,7 @@ impl ClickHouse { -- update intervals CAST(anyLastIf(value, metric == 'AsynchronousMetricsUpdateInterval') AS UInt64) AS metrics_update_interval FROM {asynchronous_metrics} + {host_filter_where} ) as asynchronous_metrics, ( SELECT @@ -562,6 +606,7 @@ impl ClickHouse { sumIf(CAST(value AS UInt64), event == 'SelectedRows') AS selected_rows, sumIf(CAST(value AS UInt64), event == 'InsertedRows') AS inserted_rows FROM {events} + {host_filter_where} ) as events, ( SELECT @@ -600,6 +645,7 @@ impl ClickHouse { 'DestroyAggregatesThreadsActive' )) AS threads_queries FROM {metrics} + {host_filter_where} ) as metrics SETTINGS enable_global_with_statement=0 "#, @@ -617,6 +663,8 @@ impl ClickHouse { one=self.get_table_name_no_history("system", "one"), memory_index_granularity_trait=memory_index_granularity_trait, + host_filter_where=host_where, + host_filter_and=host_filter, ) ) .await?; @@ -877,7 +925,7 @@ impl ClickHouse { "".into() }, if let Some(hostname) = &args.hostname { - format!("AND hostname = '{}'", hostname) + format!("AND (hostName() = '{0}' OR hostname = '{0}')", hostname.replace('\'', "''")) } else { "".into() }, @@ -906,8 +954,18 @@ impl ClickHouse { query_ids: Option<&[String]>, start_microseconds: Option>, end_microseconds: Option>, + selected_host: Option<&String>, ) -> Result { let dbtable = self.get_table_name("system", "trace_log"); + let host_filter = if let Some(host) = selected_host { + if !host.is_empty() && self.options.cluster.is_some() { + format!("AND hostName() = '{}'", host.replace('\'', "''")) + } else { + String::new() + } + } else { + String::new() + }; return self .execute(&format!( r#" @@ -923,6 +981,7 @@ impl ClickHouse { AND event_date <= toDate(end_time_) AND event_time <= toDateTime(end_time_) AND event_time_microseconds <= end_time_ AND trace_type = '{:?}' {} + {} GROUP BY human_trace SETTINGS allow_introspection_functions=1 "#, @@ -974,6 +1033,7 @@ impl ClickHouse { } else { String::new() }, + host_filter, )) .await; } @@ -981,8 +1041,16 @@ impl ClickHouse { pub async fn get_live_query_flamegraph( &self, query_ids: &Option>, + selected_host: Option<&String>, ) -> Result { let dbtable = self.get_table_name_no_history("system", "stack_trace"); + let host_filter = self.get_host_filter_clause(selected_host); + let where_clause = match (query_ids.as_ref(), host_filter.is_empty()) { + (Some(v), true) => format!("query_id IN ('{}')", v.join("','")), + (Some(v), false) => format!("query_id IN ('{}') {}", v.join("','"), host_filter), + (None, false) => format!("1 {}", host_filter), + (None, true) => "1".to_string(), + }; return self .execute(&format!( r#" @@ -997,11 +1065,7 @@ impl ClickHouse { GROUP BY human_trace SETTINGS allow_introspection_functions=1 "#, - dbtable, - query_ids - .as_ref() - .map(|v| format!("query_id IN ('{}')", v.join("','"))) - .unwrap_or("1".into()) + dbtable, where_clause )) .await; } @@ -1013,6 +1077,7 @@ impl ClickHouse { table: String, start: RelativeDateTime, end: RelativeDateTime, + selected_host: Option<&String>, ) -> Result> { let dbtable = self.get_table_name("system", "background_schedule_pool_log"); @@ -1023,6 +1088,16 @@ impl ClickHouse { .to_sql_datetime_64() .ok_or_else(|| Error::msg("Invalid end"))?; + let host_filter = if let Some(host) = selected_host { + if !host.is_empty() && self.options.cluster.is_some() { + format!("AND hostName() = '{}'", host.replace('\'', "''")) + } else { + String::new() + } + } else { + String::new() + }; + let query = if let Some(ref log_name) = log_name { format!( r#" @@ -1035,6 +1110,7 @@ impl ClickHouse { log_name = '{log_name}' AND database = '{database}' AND table = '{table}' + {host_filter} LIMIT 1000 "#, start = start_sql, @@ -1043,6 +1119,7 @@ impl ClickHouse { log_name = log_name.replace('\'', "''"), database = database.replace('\'', "''"), table = table.replace('\'', "''"), + host_filter = host_filter, ) } else { format!( @@ -1055,6 +1132,7 @@ impl ClickHouse { event_time BETWEEN toDateTime(start_) AND toDateTime(end_) AND database = '{database}' AND table = '{table}' + {host_filter} LIMIT 1000 "#, start = start_sql, @@ -1062,6 +1140,7 @@ impl ClickHouse { dbtable = dbtable, database = database.replace('\'', "''"), table = table.replace('\'', "''"), + host_filter = host_filter, ) }; @@ -1099,6 +1178,38 @@ impl ClickHouse { } } + pub async fn get_cluster_hosts(&self) -> Result> { + let cluster = self.options.cluster.clone().unwrap_or_default(); + if cluster.is_empty() { + return Ok(Vec::new()); + } + + let query = format!( + "SELECT DISTINCT hostName() AS host FROM clusterAllReplicas('{}', system.one) ORDER BY host", + cluster + ); + + let columns = self.execute(&query).await?; + let mut hosts = Vec::new(); + for i in 0..columns.row_count() { + if let Ok(host) = columns.get::(i, "host") { + hosts.push(host); + } + } + + Ok(hosts) + } + + pub fn get_host_filter_clause(&self, selected_host: Option<&String>) -> String { + if let Some(host) = selected_host + && !host.is_empty() + && self.options.cluster.is_some() + { + return format!("AND hostName() = '{}'", host.replace('\'', "''")); + } + String::new() + } + pub fn get_table_name(&self, database: &str, table: &str) -> String { let cluster = self.options.cluster.clone().unwrap_or_default(); let history = self.options.history; diff --git a/src/interpreter/context.rs b/src/interpreter/context.rs index 64cb5e8..b2e996f 100644 --- a/src/interpreter/context.rs +++ b/src/interpreter/context.rs @@ -1,5 +1,8 @@ use crate::actions::ActionDescription; -use crate::interpreter::{ClickHouse, Worker, options::ChDigOptions}; +use crate::interpreter::{ + ClickHouse, Worker, + options::{ChDigOptions, ChDigViews}, +}; use anyhow::Result; use chrono::Duration; use cursive::{Cursive, View, event::Event, event::EventResult, views::Dialog, views::OnEventView}; @@ -40,6 +43,9 @@ pub struct Context { pub view_registry: crate::view::ViewRegistry, pub search_history: crate::view::search_history::SearchHistory, + + pub selected_host: Option, + pub current_view: Option, } impl Context { @@ -71,6 +77,8 @@ impl Context { pending_view_callback: None, view_registry, search_history: crate::view::search_history::SearchHistory::new(), + selected_host: None, + current_view: None, })); context.lock().unwrap().worker.start(context.clone()); @@ -127,6 +135,10 @@ impl Context { self.add_view(name, move |siv| { let context = siv.user_data::().unwrap().clone(); let provider = context.lock().unwrap().view_registry.get(name); + { + let mut ctx = context.lock().unwrap(); + ctx.current_view = Some(provider.view_type()); + } provider.show(siv, context.clone()); }); } diff --git a/src/interpreter/worker.rs b/src/interpreter/worker.rs index ef9cb94..708943a 100644 --- a/src/interpreter/worker.rs +++ b/src/interpreter/worker.rs @@ -330,10 +330,13 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool) .pastila_clickhouse_host .clone(); let pastila_url = context.lock().unwrap().options.service.pastila_url.clone(); + let selected_host = context.lock().unwrap().selected_host.clone(); match event { Event::ProcessList(filter, limit) => { - let block = clickhouse.get_processlist(filter, limit).await?; + let block = clickhouse + .get_processlist(filter, limit, selected_host.as_ref()) + .await?; cb_sink .send(Box::new(move |siv: &mut cursive::Cursive| { siv.call_on_name_or_render_error( @@ -347,7 +350,7 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool) } Event::SlowQueryLog(filter, start, end, limit) => { let block = clickhouse - .get_slow_query_log(&filter, start, end, limit) + .get_slow_query_log(&filter, start, end, limit, selected_host.as_ref()) .await?; cb_sink .send(Box::new(move |siv: &mut cursive::Cursive| { @@ -362,7 +365,7 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool) } Event::LastQueryLog(filter, start, end, limit) => { let block = clickhouse - .get_last_query_log(&filter, start, end, limit) + .get_last_query_log(&filter, start, end, limit, selected_host.as_ref()) .await?; cb_sink .send(Box::new(move |siv: &mut cursive::Cursive| { @@ -390,7 +393,13 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool) } Event::ServerFlameGraph(tui, trace_type, start, end) => { let flamegraph_block = clickhouse - .get_flamegraph(trace_type, None, Some(start), Some(end)) + .get_flamegraph( + trace_type, + None, + Some(start), + Some(end), + selected_host.as_ref(), + ) .await?; render_flamegraph( tui, @@ -404,7 +413,13 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool) } Event::QueryFlameGraph(trace_type, tui, start, end, query_ids) => { let flamegraph_block = clickhouse - .get_flamegraph(trace_type, Some(&query_ids), Some(start), end) + .get_flamegraph( + trace_type, + Some(&query_ids), + Some(start), + end, + selected_host.as_ref(), + ) .await?; render_flamegraph( tui, @@ -417,7 +432,9 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool) *need_clear = true; } Event::LiveQueryFlameGraph(tui, query_ids) => { - let flamegraph_block = clickhouse.get_live_query_flamegraph(&query_ids).await?; + let flamegraph_block = clickhouse + .get_live_query_flamegraph(&query_ids, selected_host.as_ref()) + .await?; render_flamegraph( tui, cb_sink, @@ -567,7 +584,7 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool) .map_err(|_| anyhow!("Cannot send message to UI"))?; } Event::Summary => { - let block = clickhouse.get_summary().await; + let block = clickhouse.get_summary(selected_host.as_ref()).await; match block { Err(err) => { let message = err.to_string(); @@ -615,6 +632,7 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool) table.clone(), start.clone(), end.clone(), + selected_host.as_ref(), ) .await?; diff --git a/src/view/navigation.rs b/src/view/navigation.rs index 7f52de1..f71bdfe 100644 --- a/src/view/navigation.rs +++ b/src/view/navigation.rs @@ -51,6 +51,7 @@ pub trait Navigation { fn show_actions(&mut self); fn show_fuzzy_actions(&mut self); fn show_server_flamegraph(&mut self, tui: bool, trace_type: Option); + fn show_host_filter_dialog(&mut self); fn drop_main_view(&mut self); fn set_main_view(&mut self, view: V); @@ -248,6 +249,10 @@ impl Navigation for Cursive { context.add_global_action(self, "Show actions", Key::F8, |siv| siv.show_actions()); context.add_global_action(self, "Fuzzy actions", Event::CtrlChar('p'), |siv| siv.show_fuzzy_actions()); + if context.options.clickhouse.cluster.is_some() { + context.add_global_action(self, "Filter by host", Event::CtrlChar('h'), |siv| siv.show_host_filter_dialog()); + } + context.add_global_action(self, "Server CPU Flamegraph", 'F', |siv| siv.show_server_flamegraph(true, Some(TraceType::CPU))); context.add_global_action_without_shortcut(self, "Server Real Flamegraph", |siv| siv.show_server_flamegraph(true, Some(TraceType::Real))); context.add_global_action_without_shortcut(self, "Server Memory Flamegraph", |siv| siv.show_server_flamegraph(true, Some(TraceType::Memory))); @@ -580,6 +585,101 @@ impl Navigation for Cursive { } } + fn show_host_filter_dialog(&mut self) { + let context_arc = self.user_data::().unwrap().clone(); + let context = context_arc.lock().unwrap(); + + let cluster = context.options.clickhouse.cluster.clone(); + if cluster.is_none() { + drop(context); + self.add_layer(Dialog::info( + "Cluster mode is not enabled. Use --cluster option.", + )); + return; + } + + let clickhouse = context.clickhouse.clone(); + let cb_sink = context.cb_sink.clone(); + drop(context); + + std::thread::spawn(move || { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let hosts = runtime.block_on(async { clickhouse.get_cluster_hosts().await }); + + cb_sink + .send(Box::new(move |siv: &mut Cursive| match hosts { + Ok(hosts) if !hosts.is_empty() => { + let mut select = SelectView::new().autojump(); + + select.add_item("", String::new()); + for host in hosts { + let host_clone = host.clone(); + select.add_item(host, host_clone); + } + + let context_arc = siv.user_data::().unwrap().clone(); + select.set_on_submit(move |siv, selected_host: &String| { + let current_view = { + let mut context = context_arc.lock().unwrap(); + + if selected_host.is_empty() { + context.selected_host = None; + log::info!("Reset host filter"); + siv.set_statusbar_content(""); + } else { + context.selected_host = Some(selected_host.clone()); + log::info!("Set host filter to: {}", selected_host); + let status_msg = format!("Host filter: {}", selected_host); + siv.set_statusbar_content(status_msg); + } + + // Get current view name to re-open it + context + .current_view + .or(context.options.start_view) + .unwrap_or(ChDigViews::Queries) + }; + + siv.pop_layer(); + + // Re-open the current view to rebuild with correct columns + log::info!("Reopen {:?} view", current_view); + + let provider = context_arc + .lock() + .unwrap() + .view_registry + .get_by_view_type(current_view); + + siv.drop_main_view(); + provider.show(siv, context_arc.clone()); + + context_arc.lock().unwrap().trigger_view_refresh(); + }); + + let dialog = Dialog::around(select).title("Filter by host").button( + "Cancel", + |siv| { + siv.pop_layer(); + }, + ); + + siv.add_layer(dialog); + } + Ok(_) => { + siv.add_layer(Dialog::info("No hosts found in cluster")); + } + Err(err) => { + siv.add_layer(Dialog::info(format!( + "Failed to fetch cluster hosts: {}", + err + ))); + } + })) + .unwrap(); + }); + } + fn drop_main_view(&mut self) { while self.screen_mut().len() > 1 { self.pop_layer(); diff --git a/src/view/providers/asynchronous_inserts.rs b/src/view/providers/asynchronous_inserts.rs index c86e711..510e3a9 100644 --- a/src/view/providers/asynchronous_inserts.rs +++ b/src/view/providers/asynchronous_inserts.rs @@ -30,18 +30,28 @@ fn build_query( filters: &super::TableFilterParams, is_dialog: bool, ) -> String { - let limit = context.lock().unwrap().options.clickhouse.limit; + let (limit, dbtable, clickhouse, selected_host) = { + let ctx = context.lock().unwrap(); + ( + ctx.options.clickhouse.limit, + ctx.clickhouse + .get_table_name("system", "asynchronous_inserts"), + ctx.clickhouse.clone(), + ctx.selected_host.clone(), + ) + }; - let dbtable = context - .lock() - .unwrap() - .clickhouse - .get_table_name("system", "asynchronous_inserts"); + let mut where_clauses = filters.build_where_clauses(); + + let host_filter = clickhouse.get_host_filter_clause(selected_host.as_ref()); + if !host_filter.is_empty() { + where_clauses.push(format!("1 {}", host_filter)); + } - let where_clause = if filters.build_where_clauses().is_empty() { + let where_clause = if where_clauses.is_empty() { String::new() } else { - format!("WHERE {}", filters.build_where_clauses().join(" AND ")) + format!("WHERE {}", where_clauses.join(" AND ")) }; let select_clause = if is_dialog { diff --git a/src/view/providers/background_schedule_pool.rs b/src/view/providers/background_schedule_pool.rs index e595d0a..71612a2 100644 --- a/src/view/providers/background_schedule_pool.rs +++ b/src/view/providers/background_schedule_pool.rs @@ -40,24 +40,37 @@ impl ViewProvider for BackgroundSchedulePoolViewProvider { "delayed", ]; - let cluster = context.lock().unwrap().options.clickhouse.cluster.is_some(); - let columns_to_compare = if cluster { + let (cluster, dbtable, clickhouse, selected_host) = { + let ctx = context.lock().unwrap(); + ( + ctx.options.clickhouse.cluster.is_some(), + ctx.clickhouse + .get_table_name("system", "background_schedule_pool"), + ctx.clickhouse.clone(), + ctx.selected_host.clone(), + ) + }; + + // Only show hostname column when in cluster mode AND no host filter is active + let columns_to_compare = if cluster && selected_host.is_none() { columns.insert(0, "hostName() host"); vec!["host", "pool", "database", "table", "log_name"] } else { vec!["pool", "database", "table", "log_name"] }; - let dbtable = context - .lock() - .unwrap() - .clickhouse - .get_table_name("system", "background_schedule_pool"); + let host_filter = clickhouse.get_host_filter_clause(selected_host.as_ref()); + let where_clause = if host_filter.is_empty() { + String::new() + } else { + format!("WHERE 1 {}", host_filter) + }; let query = format!( - "SELECT {} FROM {} ORDER BY pool, database, table, log_name", + "SELECT {} FROM {} {} ORDER BY pool, database, table, log_name", columns.join(", "), dbtable, + where_clause, ); siv.drop_main_view(); diff --git a/src/view/providers/background_schedule_pool_log.rs b/src/view/providers/background_schedule_pool_log.rs index 28514df..b4ce490 100644 --- a/src/view/providers/background_schedule_pool_log.rs +++ b/src/view/providers/background_schedule_pool_log.rs @@ -96,14 +96,17 @@ impl FilterParams { } fn build_query(context: &ContextArc, filters: &FilterParams) -> String { - let view_options = context.lock().unwrap().options.view.clone(); - let limit = context.lock().unwrap().options.clickhouse.limit; - - let dbtable = context - .lock() - .unwrap() - .clickhouse - .get_table_name("system", "background_schedule_pool_log"); + let (view_options, limit, dbtable, clickhouse, selected_host) = { + let ctx = context.lock().unwrap(); + ( + ctx.options.view.clone(), + ctx.options.clickhouse.limit, + ctx.clickhouse + .get_table_name("system", "background_schedule_pool_log"), + ctx.clickhouse.clone(), + ctx.selected_host.clone(), + ) + }; let start_sql = view_options .start @@ -114,7 +117,12 @@ fn build_query(context: &ContextArc, filters: &FilterParams) -> String { .to_sql_datetime_64() .unwrap_or_else(|| "now()".to_string()); - let where_clauses = filters.build_where_clauses(); + let mut where_clauses = filters.build_where_clauses(); + + let host_filter = clickhouse.get_host_filter_clause(selected_host.as_ref()); + if !host_filter.is_empty() { + where_clauses.push(format!("1 {}", host_filter)); + } format!( r#" diff --git a/src/view/providers/logger_names.rs b/src/view/providers/logger_names.rs index 681b6cc..9182bbc 100644 --- a/src/view/providers/logger_names.rs +++ b/src/view/providers/logger_names.rs @@ -26,11 +26,17 @@ impl ViewProvider for LoggerNamesViewProvider { return; } - let view_options = context.lock().unwrap().options.view.clone(); + let (view_options, cluster, selected_host_check) = { + let ctx = context.lock().unwrap(); + ( + ctx.options.view.clone(), + ctx.options.clickhouse.cluster.is_some(), + ctx.selected_host.clone(), + ) + }; let start = DateTime::::from(view_options.start); let end = view_options.end; - let cluster = context.lock().unwrap().options.clickhouse.cluster.is_some(); let mut columns = vec![ "logger_name::String logger_name", "count() count", @@ -43,7 +49,9 @@ impl ViewProvider for LoggerNamesViewProvider { "countIf(level = 'Debug') debug", "countIf(level = 'Trace') trace", ]; - let columns_to_compare = if cluster { + + // Only show hostname column when in cluster mode AND no host filter is active + let columns_to_compare = if cluster && selected_host_check.is_none() { columns.insert(0, "hostName() host"); vec!["host", "logger_name"] } else { @@ -87,11 +95,15 @@ impl ViewProvider for LoggerNamesViewProvider { }; // Build the query with time filtering - let dbtable = context - .lock() - .unwrap() - .clickhouse - .get_table_name("system", "text_log"); + let (dbtable, clickhouse, selected_host, limit) = { + let ctx = context.lock().unwrap(); + ( + ctx.clickhouse.get_table_name("system", "text_log"), + ctx.clickhouse.clone(), + ctx.selected_host.clone(), + ctx.options.clickhouse.limit, + ) + }; let start_nanos = start .timestamp_nanos_opt() @@ -99,6 +111,13 @@ impl ViewProvider for LoggerNamesViewProvider { .unwrap(); let end_datetime = end.to_sql_datetime_64().unwrap_or_default(); + let host_filter = clickhouse.get_host_filter_clause(selected_host.as_ref()); + let host_where = if host_filter.is_empty() { + String::new() + } else { + format!("\n {}", host_filter) + }; + let query = format!( r#" WITH @@ -108,7 +127,7 @@ impl ViewProvider for LoggerNamesViewProvider { FROM {} WHERE event_date >= toDate(start_time_) AND event_time >= toDateTime(start_time_) AND event_time_microseconds > start_time_ - AND event_date <= toDate(end_time_) AND event_time <= toDateTime(end_time_) AND event_time_microseconds <= end_time_ + AND event_date <= toDate(end_time_) AND event_time <= toDateTime(end_time_) AND event_time_microseconds <= end_time_{} GROUP BY {} ORDER BY count DESC LIMIT {} @@ -117,12 +136,13 @@ impl ViewProvider for LoggerNamesViewProvider { end_datetime, columns.join(", "), dbtable, + host_where, if cluster { "host, logger_name" } else { "logger_name" }, - context.lock().unwrap().options.clickhouse.limit, + limit, ); siv.drop_main_view(); diff --git a/src/view/providers/merges.rs b/src/view/providers/merges.rs index 4448260..d1f0010 100644 --- a/src/view/providers/merges.rs +++ b/src/view/providers/merges.rs @@ -65,18 +65,22 @@ fn build_query( is_dialog: bool, ) -> String { let columns = get_columns(is_dialog); - let where_clauses = filters.build_where_clauses(); - - let tables_dbtable = context - .lock() - .unwrap() - .clickhouse - .get_table_name("system", "tables"); - let merges_dbtable = context - .lock() - .unwrap() - .clickhouse - .get_table_name("system", "merges"); + let mut where_clauses = filters.build_where_clauses(); + + let (tables_dbtable, merges_dbtable, clickhouse, selected_host) = { + let ctx = context.lock().unwrap(); + ( + ctx.clickhouse.get_table_name("system", "tables"), + ctx.clickhouse.get_table_name("system", "merges"), + ctx.clickhouse.clone(), + ctx.selected_host.clone(), + ) + }; + + let host_filter = clickhouse.get_host_filter_clause(selected_host.as_ref()); + if !host_filter.is_empty() { + where_clauses.push(format!("1 {}", host_filter)); + } let where_clause = if where_clauses.is_empty() { String::new() diff --git a/src/view/providers/mod.rs b/src/view/providers/mod.rs index 6f65f1a..5cd6395 100644 --- a/src/view/providers/mod.rs +++ b/src/view/providers/mod.rs @@ -319,15 +319,17 @@ pub fn render_from_clickhouse_query( return; } - let cluster = params - .context - .lock() - .unwrap() - .options - .clickhouse - .cluster - .is_some(); - if cluster { + let (cluster, selected_host, clickhouse) = { + let ctx = params.context.lock().unwrap(); + ( + ctx.options.clickhouse.cluster.is_some(), + ctx.selected_host.clone(), + ctx.clickhouse.clone(), + ) + }; + + // Only show hostname column when in cluster mode AND no host filter is active + if cluster && selected_host.is_none() { params.columns.insert(0, "hostName() host"); // Add "host" to the beginning of columns to compare params.columns_to_compare.insert(0, "host"); @@ -353,16 +355,22 @@ pub fn render_from_clickhouse_query( ) .to_string() }; + + let host_filter = clickhouse.get_host_filter_clause(selected_host.as_ref()); + let where_clause = match (params.filter, host_filter.is_empty()) { + (Some(filter), true) => format!(" WHERE {}", filter), + (Some(filter), false) => format!(" WHERE {} {}", filter, host_filter), + (None, false) => format!(" WHERE 1 {}", host_filter), + (None, true) => String::new(), + }; + let query = format!( "select {} from {} as {} {}{}{}", params.columns.join(", "), dbtable, params.table, params.join.unwrap_or_default(), - params - .filter - .map(|x| format!(" WHERE {}", x)) - .unwrap_or_default(), + where_clause, settings_str, ); diff --git a/src/view/providers/mutations.rs b/src/view/providers/mutations.rs index 9791833..c6f51fb 100644 --- a/src/view/providers/mutations.rs +++ b/src/view/providers/mutations.rs @@ -59,11 +59,19 @@ fn build_query( let mut where_clauses = vec!["is_done = 0".to_string()]; where_clauses.extend(filters.build_where_clauses()); - let mutations_dbtable = context - .lock() - .unwrap() - .clickhouse - .get_table_name("system", "mutations"); + let (mutations_dbtable, clickhouse, selected_host) = { + let ctx = context.lock().unwrap(); + ( + ctx.clickhouse.get_table_name("system", "mutations"), + ctx.clickhouse.clone(), + ctx.selected_host.clone(), + ) + }; + + let host_filter = clickhouse.get_host_filter_clause(selected_host.as_ref()); + if !host_filter.is_empty() { + where_clauses.push(format!("1 {}", host_filter)); + } format!( "select {} from {} as mutations WHERE {}", diff --git a/src/view/providers/part_log.rs b/src/view/providers/part_log.rs index acf919e..008fa96 100644 --- a/src/view/providers/part_log.rs +++ b/src/view/providers/part_log.rs @@ -91,14 +91,16 @@ impl FilterParams { } fn build_query(context: &ContextArc, filters: &FilterParams, is_dialog: bool) -> String { - let view_options = context.lock().unwrap().options.view.clone(); - let limit = context.lock().unwrap().options.clickhouse.limit; - - let dbtable = context - .lock() - .unwrap() - .clickhouse - .get_table_name("system", "part_log"); + let (view_options, limit, dbtable, clickhouse, selected_host) = { + let ctx = context.lock().unwrap(); + ( + ctx.options.view.clone(), + ctx.options.clickhouse.limit, + ctx.clickhouse.get_table_name("system", "part_log"), + ctx.clickhouse.clone(), + ctx.selected_host.clone(), + ) + }; let start_sql = view_options .start @@ -109,7 +111,12 @@ fn build_query(context: &ContextArc, filters: &FilterParams, is_dialog: bool) -> .to_sql_datetime_64() .unwrap_or_else(|| "now()".to_string()); - let where_clauses = filters.build_where_clauses(); + let mut where_clauses = filters.build_where_clauses(); + + let host_filter = clickhouse.get_host_filter_clause(selected_host.as_ref()); + if !host_filter.is_empty() { + where_clauses.push(format!("1 {}", host_filter)); + } let select_clause = if is_dialog { r#"event_time, diff --git a/src/view/providers/replicas.rs b/src/view/providers/replicas.rs index 887c242..8b9f05a 100644 --- a/src/view/providers/replicas.rs +++ b/src/view/providers/replicas.rs @@ -45,24 +45,36 @@ impl ViewProvider for ReplicasViewProvider { columns.push("uuid::String _uuid"); } - let cluster = context.lock().unwrap().options.clickhouse.cluster.is_some(); - let columns_to_compare = if cluster { + let (cluster, dbtable, clickhouse, selected_host) = { + let ctx = context.lock().unwrap(); + ( + ctx.options.clickhouse.cluster.is_some(), + ctx.clickhouse.get_table_name("system", "replicas"), + ctx.clickhouse.clone(), + ctx.selected_host.clone(), + ) + }; + + // Only show hostname column when in cluster mode AND no host filter is active + let columns_to_compare = if cluster && selected_host.is_none() { columns.insert(0, "hostName() host"); vec!["host", "database", "table"] } else { vec!["database", "table"] }; - let dbtable = context - .lock() - .unwrap() - .clickhouse - .get_table_name("system", "replicas"); + let host_filter = clickhouse.get_host_filter_clause(selected_host.as_ref()); + let where_clause = if host_filter.is_empty() { + String::new() + } else { + format!("WHERE 1 {}", host_filter) + }; let query = format!( - "SELECT DISTINCT ON (database, table, zookeeper_path) {} FROM {} ORDER BY queue_size DESC, database, table", + "SELECT DISTINCT ON (database, table, zookeeper_path) {} FROM {} {} ORDER BY queue_size DESC, database, table", columns.join(", "), dbtable, + where_clause, ); siv.drop_main_view(); diff --git a/src/view/providers/server_logs.rs b/src/view/providers/server_logs.rs index eed47b7..6abc8c5 100644 --- a/src/view/providers/server_logs.rs +++ b/src/view/providers/server_logs.rs @@ -25,7 +25,10 @@ impl ViewProvider for ServerLogsViewProvider { return; } - let view_options = context.clone().lock().unwrap().options.view.clone(); + let (view_options, selected_host) = { + let ctx = context.lock().unwrap(); + (ctx.options.view.clone(), ctx.selected_host.clone()) + }; siv.drop_main_view(); siv.set_main_view( @@ -39,7 +42,7 @@ impl ViewProvider for ServerLogsViewProvider { crate::interpreter::TextLogArguments { query_ids: None, logger_names: None, - hostname: None, + hostname: selected_host, message_filter: None, max_level: None, start: DateTime::::from(view_options.start), diff --git a/src/view/providers/table_parts.rs b/src/view/providers/table_parts.rs index 5fa5e1c..090a8fc 100644 --- a/src/view/providers/table_parts.rs +++ b/src/view/providers/table_parts.rs @@ -30,18 +30,27 @@ fn build_query( filters: &super::TableFilterParams, is_dialog: bool, ) -> String { - let limit = context.lock().unwrap().options.clickhouse.limit; + let (limit, dbtable, clickhouse, selected_host) = { + let ctx = context.lock().unwrap(); + ( + ctx.options.clickhouse.limit, + ctx.clickhouse.get_table_name("system", "parts"), + ctx.clickhouse.clone(), + ctx.selected_host.clone(), + ) + }; - let dbtable = context - .lock() - .unwrap() - .clickhouse - .get_table_name("system", "parts"); + let mut where_clauses = filters.build_where_clauses(); + + let host_filter = clickhouse.get_host_filter_clause(selected_host.as_ref()); + if !host_filter.is_empty() { + where_clauses.push(format!("1 {}", host_filter)); + } - let where_clause = if filters.build_where_clauses().is_empty() { + let where_clause = if where_clauses.is_empty() { String::new() } else { - format!("WHERE {}", filters.build_where_clauses().join(" AND ")) + format!("WHERE {}", where_clauses.join(" AND ")) }; let select_clause = if is_dialog { diff --git a/src/view/providers/tables.rs b/src/view/providers/tables.rs index 0b15dc3..3c6864f 100644 --- a/src/view/providers/tables.rs +++ b/src/view/providers/tables.rs @@ -37,29 +37,37 @@ impl ViewProvider for TablesViewProvider { "assumeNotNull(total_rows) total_rows", ]; - let cluster = context.lock().unwrap().options.clickhouse.cluster.is_some(); - let columns_to_compare = if cluster { + let (cluster, has_background_schedule_pool, dbtable, clickhouse, selected_host) = { + let ctx = context.lock().unwrap(); + ( + ctx.options.clickhouse.cluster.is_some(), + ctx.clickhouse + .quirks + .has(ClickHouseAvailableQuirks::SystemBackgroundSchedulePool), + ctx.clickhouse.get_table_name("system", "tables"), + ctx.clickhouse.clone(), + ctx.selected_host.clone(), + ) + }; + + // Only show hostname column when in cluster mode AND no host filter is active + let columns_to_compare = if cluster && selected_host.is_none() { columns.insert(0, "hostName() host"); vec!["host", "database", "table"] } else { vec!["database", "table"] }; - let has_background_schedule_pool = context - .lock() - .unwrap() - .clickhouse - .quirks - .has(ClickHouseAvailableQuirks::SystemBackgroundSchedulePool); if has_background_schedule_pool { columns.push("tasks"); } - let dbtable = context - .lock() - .unwrap() - .clickhouse - .get_table_name("system", "tables"); + let host_filter = clickhouse.get_host_filter_clause(selected_host.as_ref()); + let host_where = if host_filter.is_empty() { + String::new() + } else { + format!("AND 1 {}", host_filter) + }; let query = if has_background_schedule_pool { format!( @@ -70,10 +78,12 @@ impl ViewProvider for TablesViewProvider { WHERE engine NOT LIKE 'System%' AND tables.database NOT IN ('INFORMATION_SCHEMA', 'information_schema') + {} ORDER BY database, table, total_bytes DESC "#, columns.join(", "), dbtable, + host_where, ) } else { format!( @@ -83,10 +93,12 @@ impl ViewProvider for TablesViewProvider { WHERE engine NOT LIKE 'System%' AND database NOT IN ('INFORMATION_SCHEMA', 'information_schema') + {} ORDER BY database, table, total_bytes DESC "#, columns.join(", "), dbtable, + host_where, ) }; diff --git a/src/view/queries_view.rs b/src/view/queries_view.rs index d76c4ba..2a5c56f 100644 --- a/src/view/queries_view.rs +++ b/src/view/queries_view.rs @@ -993,7 +993,13 @@ impl QueriesView { if !view_options.no_subqueries { table.insert_column(0, QueriesColumn::SubQueries, "Q#", |c| c.width_min_max(2, 5)); } - if context.lock().unwrap().options.clickhouse.cluster.is_some() { + + // Only show hostname column when in cluster mode AND no host filter is active + let (cluster, selected_host) = { + let ctx = context.lock().unwrap(); + (ctx.options.clickhouse.cluster.is_some(), ctx.selected_host.clone()) + }; + if cluster && selected_host.is_none() { table.insert_column(0, QueriesColumn::HostName, "host", |c| c.width_min_max(4, 16)); } diff --git a/src/view/text_log_view.rs b/src/view/text_log_view.rs index 9536291..549ffc1 100644 --- a/src/view/text_log_view.rs +++ b/src/view/text_log_view.rs @@ -43,9 +43,12 @@ impl TextLogView { let (delay, is_cluster, wrap, no_strip_hostname_suffix) = { let ctx = context.lock().unwrap(); + // Only show hostname in logs when in cluster mode AND no host filter is active + let show_hostname = + ctx.options.clickhouse.cluster.is_some() && ctx.selected_host.is_none(); ( ctx.options.view.delay_interval, - ctx.options.clickhouse.cluster.is_some(), + show_hostname, ctx.options.view.wrap, ctx.options.view.no_strip_hostname_suffix, )