Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 137 additions & 26 deletions src/interpreter/clickhouse.rs

Large diffs are not rendered by default.

14 changes: 13 additions & 1 deletion src/interpreter/context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::actions::ActionDescription;
use crate::interpreter::{ClickHouse, Worker, options::ChDigOptions};
use crate::interpreter::{
ClickHouse, Worker,
options::{ChDigOptions, ChDigViews},
};
use anyhow::Result;
use chrono::Duration;
use cursive::{Cursive, View, event::Event, event::EventResult, views::Dialog, views::OnEventView};
Expand Down Expand Up @@ -40,6 +43,9 @@ pub struct Context {
pub view_registry: crate::view::ViewRegistry,

pub search_history: crate::view::search_history::SearchHistory,

pub selected_host: Option<String>,
pub current_view: Option<ChDigViews>,
}

impl Context {
Expand Down Expand Up @@ -71,6 +77,8 @@ impl Context {
pending_view_callback: None,
view_registry,
search_history: crate::view::search_history::SearchHistory::new(),
selected_host: None,
current_view: None,
}));

context.lock().unwrap().worker.start(context.clone());
Expand Down Expand Up @@ -127,6 +135,10 @@ impl Context {
self.add_view(name, move |siv| {
let context = siv.user_data::<ContextArc>().unwrap().clone();
let provider = context.lock().unwrap().view_registry.get(name);
{
let mut ctx = context.lock().unwrap();
ctx.current_view = Some(provider.view_type());
}
provider.show(siv, context.clone());
});
}
Expand Down
32 changes: 25 additions & 7 deletions src/interpreter/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,13 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool)
.pastila_clickhouse_host
.clone();
let pastila_url = context.lock().unwrap().options.service.pastila_url.clone();
let selected_host = context.lock().unwrap().selected_host.clone();

match event {
Event::ProcessList(filter, limit) => {
let block = clickhouse.get_processlist(filter, limit).await?;
let block = clickhouse
.get_processlist(filter, limit, selected_host.as_ref())
.await?;
cb_sink
.send(Box::new(move |siv: &mut cursive::Cursive| {
siv.call_on_name_or_render_error(
Expand All @@ -347,7 +350,7 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool)
}
Event::SlowQueryLog(filter, start, end, limit) => {
let block = clickhouse
.get_slow_query_log(&filter, start, end, limit)
.get_slow_query_log(&filter, start, end, limit, selected_host.as_ref())
.await?;
cb_sink
.send(Box::new(move |siv: &mut cursive::Cursive| {
Expand All @@ -362,7 +365,7 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool)
}
Event::LastQueryLog(filter, start, end, limit) => {
let block = clickhouse
.get_last_query_log(&filter, start, end, limit)
.get_last_query_log(&filter, start, end, limit, selected_host.as_ref())
.await?;
cb_sink
.send(Box::new(move |siv: &mut cursive::Cursive| {
Expand Down Expand Up @@ -390,7 +393,13 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool)
}
Event::ServerFlameGraph(tui, trace_type, start, end) => {
let flamegraph_block = clickhouse
.get_flamegraph(trace_type, None, Some(start), Some(end))
.get_flamegraph(
trace_type,
None,
Some(start),
Some(end),
selected_host.as_ref(),
)
.await?;
render_flamegraph(
tui,
Expand All @@ -404,7 +413,13 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool)
}
Event::QueryFlameGraph(trace_type, tui, start, end, query_ids) => {
let flamegraph_block = clickhouse
.get_flamegraph(trace_type, Some(&query_ids), Some(start), end)
.get_flamegraph(
trace_type,
Some(&query_ids),
Some(start),
end,
selected_host.as_ref(),
)
.await?;
render_flamegraph(
tui,
Expand All @@ -417,7 +432,9 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool)
*need_clear = true;
}
Event::LiveQueryFlameGraph(tui, query_ids) => {
let flamegraph_block = clickhouse.get_live_query_flamegraph(&query_ids).await?;
let flamegraph_block = clickhouse
.get_live_query_flamegraph(&query_ids, selected_host.as_ref())
.await?;
render_flamegraph(
tui,
cb_sink,
Expand Down Expand Up @@ -567,7 +584,7 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool)
.map_err(|_| anyhow!("Cannot send message to UI"))?;
}
Event::Summary => {
let block = clickhouse.get_summary().await;
let block = clickhouse.get_summary(selected_host.as_ref()).await;
match block {
Err(err) => {
let message = err.to_string();
Expand Down Expand Up @@ -615,6 +632,7 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool)
table.clone(),
start.clone(),
end.clone(),
selected_host.as_ref(),
)
.await?;

Expand Down
100 changes: 100 additions & 0 deletions src/view/navigation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub trait Navigation {
fn show_actions(&mut self);
fn show_fuzzy_actions(&mut self);
fn show_server_flamegraph(&mut self, tui: bool, trace_type: Option<TraceType>);
fn show_host_filter_dialog(&mut self);

fn drop_main_view(&mut self);
fn set_main_view<V: IntoBoxedView + 'static>(&mut self, view: V);
Expand Down Expand Up @@ -248,6 +249,10 @@ impl Navigation for Cursive {
context.add_global_action(self, "Show actions", Key::F8, |siv| siv.show_actions());
context.add_global_action(self, "Fuzzy actions", Event::CtrlChar('p'), |siv| siv.show_fuzzy_actions());

if context.options.clickhouse.cluster.is_some() {
context.add_global_action(self, "Filter by host", Event::CtrlChar('h'), |siv| siv.show_host_filter_dialog());
}

context.add_global_action(self, "Server CPU Flamegraph", 'F', |siv| siv.show_server_flamegraph(true, Some(TraceType::CPU)));
context.add_global_action_without_shortcut(self, "Server Real Flamegraph", |siv| siv.show_server_flamegraph(true, Some(TraceType::Real)));
context.add_global_action_without_shortcut(self, "Server Memory Flamegraph", |siv| siv.show_server_flamegraph(true, Some(TraceType::Memory)));
Expand Down Expand Up @@ -580,6 +585,101 @@ impl Navigation for Cursive {
}
}

fn show_host_filter_dialog(&mut self) {
let context_arc = self.user_data::<ContextArc>().unwrap().clone();
let context = context_arc.lock().unwrap();

let cluster = context.options.clickhouse.cluster.clone();
if cluster.is_none() {
drop(context);
self.add_layer(Dialog::info(
"Cluster mode is not enabled. Use --cluster option.",
));
return;
}

let clickhouse = context.clickhouse.clone();
let cb_sink = context.cb_sink.clone();
drop(context);

std::thread::spawn(move || {
let runtime = tokio::runtime::Runtime::new().unwrap();
let hosts = runtime.block_on(async { clickhouse.get_cluster_hosts().await });

cb_sink
.send(Box::new(move |siv: &mut Cursive| match hosts {
Ok(hosts) if !hosts.is_empty() => {
let mut select = SelectView::new().autojump();

select.add_item("<All hosts (reset filter)>", String::new());
for host in hosts {
let host_clone = host.clone();
select.add_item(host, host_clone);
}

let context_arc = siv.user_data::<ContextArc>().unwrap().clone();
select.set_on_submit(move |siv, selected_host: &String| {
let current_view = {
let mut context = context_arc.lock().unwrap();

if selected_host.is_empty() {
context.selected_host = None;
log::info!("Reset host filter");
siv.set_statusbar_content("");
} else {
context.selected_host = Some(selected_host.clone());
log::info!("Set host filter to: {}", selected_host);
let status_msg = format!("Host filter: {}", selected_host);
siv.set_statusbar_content(status_msg);
}

// Get current view name to re-open it
context
.current_view
.or(context.options.start_view)
.unwrap_or(ChDigViews::Queries)
};

siv.pop_layer();

// Re-open the current view to rebuild with correct columns
log::info!("Reopen {:?} view", current_view);

let provider = context_arc
.lock()
.unwrap()
.view_registry
.get_by_view_type(current_view);

siv.drop_main_view();
provider.show(siv, context_arc.clone());

context_arc.lock().unwrap().trigger_view_refresh();
});

let dialog = Dialog::around(select).title("Filter by host").button(
"Cancel",
|siv| {
siv.pop_layer();
},
);

siv.add_layer(dialog);
}
Ok(_) => {
siv.add_layer(Dialog::info("No hosts found in cluster"));
}
Err(err) => {
siv.add_layer(Dialog::info(format!(
"Failed to fetch cluster hosts: {}",
err
)));
}
}))
.unwrap();
});
}

fn drop_main_view(&mut self) {
while self.screen_mut().len() > 1 {
self.pop_layer();
Expand Down
26 changes: 18 additions & 8 deletions src/view/providers/asynchronous_inserts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,28 @@ fn build_query(
filters: &super::TableFilterParams,
is_dialog: bool,
) -> String {
let limit = context.lock().unwrap().options.clickhouse.limit;
let (limit, dbtable, clickhouse, selected_host) = {
let ctx = context.lock().unwrap();
(
ctx.options.clickhouse.limit,
ctx.clickhouse
.get_table_name("system", "asynchronous_inserts"),
ctx.clickhouse.clone(),
ctx.selected_host.clone(),
)
};

let dbtable = context
.lock()
.unwrap()
.clickhouse
.get_table_name("system", "asynchronous_inserts");
let mut where_clauses = filters.build_where_clauses();

let host_filter = clickhouse.get_host_filter_clause(selected_host.as_ref());
if !host_filter.is_empty() {
where_clauses.push(format!("1 {}", host_filter));
}

let where_clause = if filters.build_where_clauses().is_empty() {
let where_clause = if where_clauses.is_empty() {
String::new()
} else {
format!("WHERE {}", filters.build_where_clauses().join(" AND "))
format!("WHERE {}", where_clauses.join(" AND "))
};

let select_clause = if is_dialog {
Expand Down
29 changes: 21 additions & 8 deletions src/view/providers/background_schedule_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,37 @@ impl ViewProvider for BackgroundSchedulePoolViewProvider {
"delayed",
];

let cluster = context.lock().unwrap().options.clickhouse.cluster.is_some();
let columns_to_compare = if cluster {
let (cluster, dbtable, clickhouse, selected_host) = {
let ctx = context.lock().unwrap();
(
ctx.options.clickhouse.cluster.is_some(),
ctx.clickhouse
.get_table_name("system", "background_schedule_pool"),
ctx.clickhouse.clone(),
ctx.selected_host.clone(),
)
};

// Only show hostname column when in cluster mode AND no host filter is active
let columns_to_compare = if cluster && selected_host.is_none() {
columns.insert(0, "hostName() host");
vec!["host", "pool", "database", "table", "log_name"]
} else {
vec!["pool", "database", "table", "log_name"]
};

let dbtable = context
.lock()
.unwrap()
.clickhouse
.get_table_name("system", "background_schedule_pool");
let host_filter = clickhouse.get_host_filter_clause(selected_host.as_ref());
let where_clause = if host_filter.is_empty() {
String::new()
} else {
format!("WHERE 1 {}", host_filter)
};

let query = format!(
"SELECT {} FROM {} ORDER BY pool, database, table, log_name",
"SELECT {} FROM {} {} ORDER BY pool, database, table, log_name",
columns.join(", "),
dbtable,
where_clause,
);

siv.drop_main_view();
Expand Down
26 changes: 17 additions & 9 deletions src/view/providers/background_schedule_pool_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,17 @@ impl FilterParams {
}

fn build_query(context: &ContextArc, filters: &FilterParams) -> String {
let view_options = context.lock().unwrap().options.view.clone();
let limit = context.lock().unwrap().options.clickhouse.limit;

let dbtable = context
.lock()
.unwrap()
.clickhouse
.get_table_name("system", "background_schedule_pool_log");
let (view_options, limit, dbtable, clickhouse, selected_host) = {
let ctx = context.lock().unwrap();
(
ctx.options.view.clone(),
ctx.options.clickhouse.limit,
ctx.clickhouse
.get_table_name("system", "background_schedule_pool_log"),
ctx.clickhouse.clone(),
ctx.selected_host.clone(),
)
};

let start_sql = view_options
.start
Expand All @@ -114,7 +117,12 @@ fn build_query(context: &ContextArc, filters: &FilterParams) -> String {
.to_sql_datetime_64()
.unwrap_or_else(|| "now()".to_string());

let where_clauses = filters.build_where_clauses();
let mut where_clauses = filters.build_where_clauses();

let host_filter = clickhouse.get_host_filter_clause(selected_host.as_ref());
if !host_filter.is_empty() {
where_clauses.push(format!("1 {}", host_filter));
}

format!(
r#"
Expand Down
Loading