Skip to content

Commit e3dc317

Browse files
authored
feat: add infra diagnostics tool to the Moose Dev MCP (#2915)
<!-- CURSOR_SUMMARY --> > [!NOTE] > Adds a new MCP tool `diagnose_infrastructure` with multiple ClickHouse diagnostics and wires it into the server alongside tests and docs updates. > > - **MCP Server**: > - Registers new tool `diagnose_infrastructure` in `list_tools` and routes in `call_tool`. > - Updates server instructions to mention diagnostics and source browsing. > - Updates tests to account for 6 tools and validates tool names. > - **New Tool: `infra_issues` (ClickHouse diagnostics)**: > - Core module `infra_issues/mod.rs` with params parsing, severity filtering, Redis-backed infra map usage, and aggregated JSON output with summaries. > - Diagnostic providers: > - `MutationDiagnostic` (`system.mutations`): stuck/failed mutations. > - `PartsDiagnostic` (`system.parts`): excessive parts per partition. > - `MergeDiagnostic` (`system.merges`): long-running merges. > - `ErrorStatsDiagnostic` (`system.errors`): top recurring errors (system-wide). > - `S3QueueDiagnostic` (`system.s3queue_log`): ingestion failures (S3Queue tables). > - `ReplicationDiagnostic` (`system.replication_queue`/`system.replicas`): lag and health issues (replicated tables). > - `MergeFailureDiagnostic` (`system.metrics`): background merge failures (system-wide). > - `StoppedOperationsDiagnostic`: merges/replication possibly stopped. > - **Tool Registry**: > - Exposes `infra_issues` in `tools/mod.rs`. > - **Misc**: > - `get_source.rs`: remove unused `SourceLocationNotAvailable` error variant. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 7c40c36. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 1bce01c commit e3dc317

File tree

12 files changed

+2172
-11
lines changed

12 files changed

+2172
-11
lines changed

apps/framework-cli/src/mcp/server.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ use rmcp::{
1414
use std::sync::Arc;
1515

1616
use super::embedded_docs;
17-
use super::tools::{create_error_result, get_source, infra_map, logs, query_olap, sample_stream};
17+
use super::tools::{
18+
create_error_result, get_source, infra_issues, infra_map, logs, query_olap, sample_stream,
19+
};
1820
use crate::cli::processing_coordinator::ProcessingCoordinator;
1921
use crate::infrastructure::olap::clickhouse::config::ClickHouseConfig;
2022
use crate::infrastructure::redis::redis_client::RedisClient;
@@ -69,7 +71,7 @@ impl ServerHandler for MooseMcpHandler {
6971
website_url: None,
7072
},
7173
instructions: Some(
72-
"Moose MCP Server - Access dev server logs, infrastructure map, query the OLAP database, sample streaming topics, and browse embedded Moose documentation"
74+
"Moose MCP Server - Access dev server logs, infrastructure map, diagnose infrastructure issues, query the OLAP database, sample streaming topics, browse code sources, and read embedded Moose documentation"
7375
.to_string(),
7476
),
7577
}
@@ -84,6 +86,7 @@ impl ServerHandler for MooseMcpHandler {
8486
tools: vec![
8587
logs::tool_definition(),
8688
infra_map::tool_definition(),
89+
infra_issues::tool_definition(),
8790
query_olap::tool_definition(),
8891
sample_stream::tool_definition(),
8992
get_source::tool_definition(),
@@ -108,6 +111,12 @@ impl ServerHandler for MooseMcpHandler {
108111
self.redis_client.clone(),
109112
)
110113
.await),
114+
"diagnose_infrastructure" => Ok(infra_issues::handle_call(
115+
param.arguments.as_ref(),
116+
self.redis_client.clone(),
117+
&self.clickhouse_config,
118+
)
119+
.await),
111120
"query_olap" => Ok(query_olap::handle_call(
112121
&self.clickhouse_config,
113122
param.arguments.as_ref(),
@@ -219,19 +228,21 @@ mod tests {
219228
// Test that all expected tools are returned
220229
let logs_tool = logs::tool_definition();
221230
let infra_tool = infra_map::tool_definition();
231+
let infra_issues_tool = infra_issues::tool_definition();
222232
let olap_tool = query_olap::tool_definition();
223233
let stream_tool = sample_stream::tool_definition();
224234
let get_source_tool = get_source::tool_definition();
225235

226-
// Ensure we have 5 tools
236+
// Ensure we have 6 tools
227237
let all_tools = vec![
228238
&logs_tool,
229239
&infra_tool,
240+
&infra_issues_tool,
230241
&olap_tool,
231242
&stream_tool,
232243
&get_source_tool,
233244
];
234-
assert_eq!(all_tools.len(), 5);
245+
assert_eq!(all_tools.len(), 6);
235246

236247
// Verify each tool has required fields
237248
for tool in all_tools {
@@ -250,18 +261,21 @@ mod tests {
250261
"query_olap",
251262
"get_stream_sample",
252263
"get_source",
264+
"diagnose_infrastructure",
253265
];
254266

255267
let logs_tool = logs::tool_definition();
256268
let infra_tool = infra_map::tool_definition();
257269
let olap_tool = query_olap::tool_definition();
258270
let stream_tool = sample_stream::tool_definition();
259271
let get_source_tool = get_source::tool_definition();
272+
let infra_issues_tool = infra_issues::tool_definition();
260273

261274
assert_eq!(logs_tool.name, expected_tools[0]);
262275
assert_eq!(infra_tool.name, expected_tools[1]);
263276
assert_eq!(olap_tool.name, expected_tools[2]);
264277
assert_eq!(stream_tool.name, expected_tools[3]);
265278
assert_eq!(get_source_tool.name, expected_tools[4]);
279+
assert_eq!(infra_issues_tool.name, expected_tools[5]);
266280
}
267281
}

apps/framework-cli/src/mcp/tools/get_source.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,6 @@ pub enum GetSourceError {
2929
component_type: String,
3030
name: String,
3131
},
32-
33-
#[allow(dead_code)]
34-
#[error("Source location not available for {component_type}/{name}")]
35-
SourceLocationNotAvailable {
36-
component_type: String,
37-
name: String,
38-
},
3932
}
4033

4134
/// Parameters for the get_source tool
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
//! Diagnostic provider for checking system-wide errors
2+
3+
use log::debug;
4+
use serde_json::{json, Map, Value};
5+
6+
use super::{Component, DiagnoseError, DiagnosticProvider, Issue, Severity};
7+
use crate::infrastructure::olap::clickhouse::client::ClickHouseClient;
8+
use crate::infrastructure::olap::clickhouse::config::ClickHouseConfig;
9+
use crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine;
10+
11+
/// Query timeout for diagnostic checks (30 seconds)
12+
const DIAGNOSTIC_QUERY_TIMEOUT_SECS: u64 = 30;
13+
14+
/// Diagnostic provider for checking system-wide errors
15+
pub struct ErrorStatsDiagnostic;
16+
17+
#[async_trait::async_trait]
18+
impl DiagnosticProvider for ErrorStatsDiagnostic {
19+
fn name(&self) -> &str {
20+
"ErrorStatsDiagnostic"
21+
}
22+
23+
fn applicable_to(&self, _component: &Component, _engine: Option<&ClickhouseEngine>) -> bool {
24+
// Error stats are system-wide, not component-specific
25+
// This should be run separately outside the component loop
26+
false
27+
}
28+
29+
fn is_system_wide(&self) -> bool {
30+
true
31+
}
32+
33+
async fn diagnose(
34+
&self,
35+
component: &Component,
36+
_engine: Option<&ClickhouseEngine>,
37+
config: &ClickHouseConfig,
38+
_since: Option<&str>,
39+
) -> Result<Vec<Issue>, DiagnoseError> {
40+
let client = ClickHouseClient::new(config)
41+
.map_err(|e| DiagnoseError::ClickHouseConnection(format!("{}", e)))?;
42+
43+
// Get recent errors with significant counts
44+
let query = "SELECT
45+
name,
46+
value,
47+
last_error_time,
48+
last_error_message
49+
FROM system.errors
50+
WHERE value > 0
51+
ORDER BY value DESC
52+
LIMIT 10
53+
FORMAT JSON";
54+
55+
debug!("Executing errors query: {}", query);
56+
57+
let result = tokio::time::timeout(
58+
std::time::Duration::from_secs(DIAGNOSTIC_QUERY_TIMEOUT_SECS),
59+
client.execute_sql(query),
60+
)
61+
.await
62+
.map_err(|_| DiagnoseError::QueryTimeout(DIAGNOSTIC_QUERY_TIMEOUT_SECS))?
63+
.map_err(|e| DiagnoseError::QueryFailed(format!("{}", e)))?;
64+
65+
let json_response: Value = serde_json::from_str(&result)
66+
.map_err(|e| DiagnoseError::ParseError(format!("{}", e)))?;
67+
68+
let data = json_response
69+
.get("data")
70+
.and_then(|v| v.as_array())
71+
.ok_or_else(|| {
72+
DiagnoseError::ParseError("Missing 'data' field in response".to_string())
73+
})?;
74+
75+
let mut issues = Vec::new();
76+
77+
for row in data {
78+
let name = row
79+
.get("name")
80+
.and_then(|v| v.as_str())
81+
.unwrap_or("UNKNOWN");
82+
let value = row.get("value").and_then(|v| v.as_u64()).unwrap_or(0);
83+
let last_error_message = row
84+
.get("last_error_message")
85+
.and_then(|v| v.as_str())
86+
.unwrap_or("");
87+
88+
// Skip if no occurrences
89+
if value == 0 {
90+
continue;
91+
}
92+
93+
let severity = if value > 100 {
94+
Severity::Error
95+
} else if value > 10 {
96+
Severity::Warning
97+
} else {
98+
Severity::Info
99+
};
100+
101+
let mut details = Map::new();
102+
details.insert("error_name".to_string(), json!(name));
103+
details.insert("occurrence_count".to_string(), json!(value));
104+
details.insert(
105+
"last_error_time".to_string(),
106+
row.get("last_error_time").cloned().unwrap_or(json!("")),
107+
);
108+
if !last_error_message.is_empty() {
109+
details.insert("last_error_message".to_string(), json!(last_error_message));
110+
}
111+
112+
issues.push(Issue {
113+
severity,
114+
source: "system.errors".to_string(),
115+
component: component.clone(),
116+
error_type: "system_error".to_string(),
117+
message: format!("Error '{}' occurred {} times. Last: {}", name, value, last_error_message),
118+
details,
119+
suggested_action: "Review error pattern and recent query logs. Check ClickHouse server logs for more details.".to_string(),
120+
related_queries: vec![
121+
"SELECT * FROM system.errors WHERE value > 0 ORDER BY value DESC".to_string(),
122+
format!("SELECT * FROM system.query_log WHERE exception LIKE '%{}%' ORDER BY event_time DESC LIMIT 10", name),
123+
],
124+
});
125+
}
126+
127+
Ok(issues)
128+
}
129+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
//! Diagnostic provider for checking merge failures from system.metrics
2+
3+
use log::debug;
4+
use serde_json::{json, Map, Value};
5+
6+
use super::{Component, DiagnoseError, DiagnosticProvider, Issue, Severity};
7+
use crate::infrastructure::olap::clickhouse::client::ClickHouseClient;
8+
use crate::infrastructure::olap::clickhouse::config::ClickHouseConfig;
9+
use crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine;
10+
11+
/// Query timeout for diagnostic checks (30 seconds)
12+
const DIAGNOSTIC_QUERY_TIMEOUT_SECS: u64 = 30;
13+
14+
/// Diagnostic provider for checking merge failures from system.metrics
15+
pub struct MergeFailureDiagnostic;
16+
17+
#[async_trait::async_trait]
18+
impl DiagnosticProvider for MergeFailureDiagnostic {
19+
fn name(&self) -> &str {
20+
"merge_failures"
21+
}
22+
23+
fn applicable_to(&self, _component: &Component, _engine: Option<&ClickhouseEngine>) -> bool {
24+
// Merge failures from system.metrics are system-wide, not component-specific
25+
// This should be run separately outside the component loop
26+
false
27+
}
28+
29+
fn is_system_wide(&self) -> bool {
30+
true
31+
}
32+
33+
async fn diagnose(
34+
&self,
35+
component: &Component,
36+
_engine: Option<&ClickhouseEngine>,
37+
config: &ClickHouseConfig,
38+
_since: Option<&str>,
39+
) -> Result<Vec<Issue>, DiagnoseError> {
40+
let client = ClickHouseClient::new(config)
41+
.map_err(|e| DiagnoseError::ClickHouseConnection(format!("{}", e)))?;
42+
43+
let mut issues = Vec::new();
44+
45+
// Check system.metrics for background merge failures
46+
// Note: This is a system-wide metric, not per-table, but we report it per-table for context
47+
let metrics_query =
48+
"SELECT value FROM system.metrics WHERE metric = 'FailedBackgroundMerges' FORMAT JSON";
49+
50+
debug!("Executing merge failure metrics query: {}", metrics_query);
51+
52+
let result = tokio::time::timeout(
53+
std::time::Duration::from_secs(DIAGNOSTIC_QUERY_TIMEOUT_SECS),
54+
client.execute_sql(metrics_query),
55+
)
56+
.await
57+
.map_err(|_| DiagnoseError::QueryTimeout(DIAGNOSTIC_QUERY_TIMEOUT_SECS))?
58+
.map_err(|e| DiagnoseError::QueryFailed(format!("{}", e)))?;
59+
60+
let json_response: Value = serde_json::from_str(&result)
61+
.map_err(|e| DiagnoseError::ParseError(format!("{}", e)))?;
62+
63+
let failed_merges = json_response
64+
.get("data")
65+
.and_then(|v| v.as_array())
66+
.and_then(|arr| arr.first())
67+
.and_then(|row| row.get("value"))
68+
.and_then(|v| v.as_u64())
69+
.unwrap_or(0);
70+
71+
if failed_merges > 0 {
72+
let severity = if failed_merges > 10 {
73+
Severity::Error
74+
} else {
75+
Severity::Warning
76+
};
77+
78+
let mut details = Map::new();
79+
details.insert("failed_merges".to_string(), json!(failed_merges));
80+
81+
issues.push(Issue {
82+
severity,
83+
source: "system.metrics".to_string(),
84+
component: component.clone(),
85+
error_type: "merge_failures".to_string(),
86+
message: format!(
87+
"Background merge failures detected: {} failed merges currently in system. This may affect table maintenance.",
88+
failed_merges
89+
),
90+
details,
91+
suggested_action: "Check system.errors for merge failure details. Review disk space and memory availability. Consider increasing merge-related settings if failures persist.".to_string(),
92+
related_queries: vec![
93+
"SELECT * FROM system.errors WHERE name LIKE '%Merge%' ORDER BY last_error_time DESC LIMIT 10".to_string(),
94+
"SELECT * FROM system.metrics WHERE metric LIKE '%Merge%'".to_string(),
95+
format!(
96+
"SELECT * FROM system.merges WHERE database = '{}' AND table = '{}'",
97+
config.db_name, component.name
98+
),
99+
],
100+
});
101+
}
102+
103+
Ok(issues)
104+
}
105+
}

0 commit comments

Comments
 (0)