Skip to content

Commit d256424

Browse files
authored
ENG-1359: Add deep health check for Consumption API (#2995)
<!-- CURSOR_SUMMARY --> > [!NOTE] > Adds concurrent health checks (Redis, ClickHouse, Redpanda, Consumption API) with /_moose_internal/health support, and updates E2E tests and docs accordingly. > > - **Health Checks (server)**: > - Rework `GET /health` to run concurrent checks (Redis, ClickHouse, Redpanda) and add Consumption API probe via `/_moose_internal/health`; return `healthy`/`unhealthy` arrays with 200/503. > - **Consumption API (TS & Python)**: > - Add internal endpoint `/_moose_internal/health` returning `{ status, timestamp }`. > - **E2E Tests**: > - Update health assertions to allow 200 or 503 and expect `healthy`/`unhealthy` fields. > - Add `verifyProxyHealth` and `verifyConsumptionApiInternalHealth` utilities and tests to validate proxy and internal health endpoints. > - **Docs**: > - Expand health monitoring section with response schema, status codes, monitored services, concurrency/timeout notes, and internal endpoint mention. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 3035deb. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 47710c1 commit d256424

File tree

7 files changed

+294
-31
lines changed

7 files changed

+294
-31
lines changed

apps/framework-cli-e2e/test/dotenv-config.test.ts

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,14 @@ describe("typescript template tests - .env file configuration", function () {
125125

126126
it("should load .env files with correct precedence (.env < .env.dev < .env.local)", async () => {
127127
// Verify server is running on port 9992 (from .env.local)
128+
// Note: Health endpoint may return 503 if some services are unhealthy,
129+
// but a response means the server is running on the correct port
128130
const response = await fetch("http://localhost:9992/health");
129-
expect(response.ok).to.be.true;
131+
expect(response.status).to.be.oneOf([200, 503]);
130132

131133
const health = await response.json();
132134
expect(health).to.have.property("healthy");
135+
expect(health).to.have.property("unhealthy");
133136

134137
console.log(
135138
"✓ Server is running on port 9992 from .env.local (correct precedence)",
@@ -138,8 +141,14 @@ describe("typescript template tests - .env file configuration", function () {
138141

139142
it("should have .env.local values accessible via environment", async () => {
140143
// The .env files are loaded, we can verify the server responds correctly
144+
// Note: Health endpoint may return 503 if some services are unhealthy,
145+
// but getting a valid JSON response proves the server is configured correctly
141146
const response = await fetch("http://localhost:9992/health");
142-
expect(response.status).to.equal(200);
147+
expect(response.status).to.be.oneOf([200, 503]);
148+
149+
const health = await response.json();
150+
expect(health).to.have.property("healthy");
151+
expect(health).to.have.property("unhealthy");
143152

144153
console.log("✓ .env.local configuration is active");
145154
});
@@ -232,11 +241,14 @@ describe("python template tests - .env file configuration", function () {
232241

233242
it("should load .env files with correct precedence (.env < .env.dev < .env.local)", async () => {
234243
// Verify server is running on port 9982 (from .env.local)
244+
// Note: Health endpoint may return 503 if some services are unhealthy,
245+
// but a response means the server is running on the correct port
235246
const response = await fetch("http://localhost:9982/health");
236-
expect(response.ok).to.be.true;
247+
expect(response.status).to.be.oneOf([200, 503]);
237248

238249
const health = await response.json();
239250
expect(health).to.have.property("healthy");
251+
expect(health).to.have.property("unhealthy");
240252

241253
console.log(
242254
"✓ Python server is running on port 9982 from .env.local (correct precedence)",
@@ -245,8 +257,14 @@ describe("python template tests - .env file configuration", function () {
245257

246258
it("should have .env.local values accessible via environment", async () => {
247259
// The .env files are loaded, we can verify the server responds correctly
260+
// Note: Health endpoint may return 503 if some services are unhealthy,
261+
// but getting a valid JSON response proves the server is configured correctly
248262
const response = await fetch("http://localhost:9982/health");
249-
expect(response.status).to.equal(200);
263+
expect(response.status).to.be.oneOf([200, 503]);
264+
265+
const health = await response.json();
266+
expect(health).to.have.property("healthy");
267+
expect(health).to.have.property("unhealthy");
250268

251269
console.log("✓ Python .env.local configuration is active");
252270
});

apps/framework-cli-e2e/test/templates.test.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ import {
4444
withRetries,
4545
verifyConsumptionApi,
4646
verifyVersionedConsumptionApi,
47+
verifyProxyHealth,
48+
verifyConsumptionApiInternalHealth,
4749
verifyConsumerLogs,
4850
createTempTestDirectory,
4951
setupTypeScriptProject,
@@ -875,6 +877,32 @@ const createTemplateTestSuite = (config: TemplateTestConfig) => {
875877
await clickhouse.close();
876878
});
877879

880+
it("should include Consumption API in proxy health check (healthy)", async function () {
881+
this.timeout(TIMEOUTS.TEST_SETUP_MS);
882+
883+
// Verify that the proxy health endpoint includes "Consumption API" in healthy list
884+
// Expected healthy services: Redis, ClickHouse, Redpanda, Consumption API
885+
await verifyProxyHealth([
886+
"Redis",
887+
"ClickHouse",
888+
"Redpanda",
889+
"Consumption API",
890+
]);
891+
892+
console.log(
893+
"✅ Proxy health check correctly includes Consumption API",
894+
);
895+
});
896+
897+
it("should have working internal health endpoint (/_moose_internal/health)", async function () {
898+
this.timeout(TIMEOUTS.TEST_SETUP_MS);
899+
900+
// Verify the consumption API internal health endpoint works
901+
await verifyConsumptionApiInternalHealth();
902+
903+
console.log("✅ Internal health endpoint works correctly");
904+
});
905+
878906
it("should serve WebApp at custom mountPath with Express framework", async function () {
879907
this.timeout(TIMEOUTS.TEST_SETUP_MS);
880908

apps/framework-cli-e2e/test/utils/api-utils.ts

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,3 +158,107 @@ export const verifyVersionedConsumptionApi = async (
158158
},
159159
);
160160
};
161+
162+
/**
163+
* Verifies the proxy health endpoint (/health) returns expected healthy/unhealthy services
164+
*/
165+
export const verifyProxyHealth = async (
166+
expectedHealthy: string[],
167+
expectedUnhealthy: string[] = [],
168+
): Promise<void> => {
169+
await withRetries(
170+
async () => {
171+
const response = await fetch(`${SERVER_CONFIG.url}/health`);
172+
173+
// If we expect unhealthy services, status should be 503
174+
// Otherwise it should be 200
175+
const expectedStatus = expectedUnhealthy.length > 0 ? 503 : 200;
176+
if (response.status !== expectedStatus) {
177+
const text = await response.text();
178+
throw new Error(
179+
`Expected status ${expectedStatus}, got ${response.status}: ${text}`,
180+
);
181+
}
182+
183+
const json = (await response.json()) as {
184+
healthy: string[];
185+
unhealthy: string[];
186+
};
187+
188+
console.log("Health check response:", json);
189+
190+
// Verify healthy list
191+
for (const service of expectedHealthy) {
192+
if (!json.healthy.includes(service)) {
193+
throw new Error(
194+
`Expected "${service}" in healthy list, got: ${json.healthy.join(", ")}`,
195+
);
196+
}
197+
}
198+
199+
// Verify unhealthy list
200+
for (const service of expectedUnhealthy) {
201+
if (!json.unhealthy.includes(service)) {
202+
throw new Error(
203+
`Expected "${service}" in unhealthy list, got: ${json.unhealthy.join(", ")}`,
204+
);
205+
}
206+
}
207+
208+
// Note: We don't enforce exact counts because services are conditionally
209+
// checked based on feature flags (features.apis, features.olap, features.streaming_engine).
210+
// The test verifies that expected services are present in the correct state,
211+
// but allows for additional services that may be enabled in the environment.
212+
},
213+
{
214+
attempts: RETRY_CONFIG.API_VERIFICATION_ATTEMPTS,
215+
delayMs: RETRY_CONFIG.API_VERIFICATION_DELAY_MS,
216+
},
217+
);
218+
};
219+
220+
/**
221+
* Verifies the consumption API internal health endpoint (/_moose_internal/health)
222+
*/
223+
export const verifyConsumptionApiInternalHealth = async (): Promise<void> => {
224+
await withRetries(
225+
async () => {
226+
// Note: The internal health endpoint runs on the consumption API port (4001 by default)
227+
// which is the same as SERVER_CONFIG.consumptionApiUrl but without the /api prefix
228+
const consumptionApiPort = new URL(SERVER_CONFIG.url).port || "4000";
229+
const consumptionPort = parseInt(consumptionApiPort) + 1; // Default: 4001
230+
const healthUrl = `http://localhost:${consumptionPort}/_moose_internal/health`;
231+
232+
const response = await fetch(healthUrl);
233+
if (!response.ok) {
234+
const text = await response.text();
235+
throw new Error(`${response.status}: ${text}`);
236+
}
237+
238+
const json = (await response.json()) as {
239+
status: string;
240+
timestamp: string;
241+
};
242+
243+
console.log("Internal health check response:", json);
244+
245+
if (json.status !== "healthy") {
246+
throw new Error(`Expected status "healthy", got "${json.status}"`);
247+
}
248+
249+
if (!json.timestamp) {
250+
throw new Error("Missing timestamp in response");
251+
}
252+
253+
// Verify timestamp is a valid ISO 8601 string
254+
const timestamp = new Date(json.timestamp);
255+
if (isNaN(timestamp.getTime())) {
256+
throw new Error(`Invalid timestamp: ${json.timestamp}`);
257+
}
258+
},
259+
{
260+
attempts: RETRY_CONFIG.API_VERIFICATION_ATTEMPTS,
261+
delayMs: RETRY_CONFIG.API_VERIFICATION_DELAY_MS,
262+
},
263+
);
264+
};

apps/framework-cli/src/cli/local_webserver.rs

Lines changed: 83 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -817,38 +817,98 @@ async fn health_route(
817817
project: &Project,
818818
redis_client: &Arc<RedisClient>,
819819
) -> Result<Response<Full<Bytes>>, hyper::http::Error> {
820-
// Check Redis connectivity
821-
let redis_healthy = redis_client.is_connected();
820+
use std::time::Duration;
821+
use tokio::task::JoinSet;
822822

823-
// Prepare healthy and unhealthy lists
824-
let mut healthy = Vec::new();
825-
let mut unhealthy = Vec::new();
823+
let mut join_set = JoinSet::new();
826824

827-
if redis_healthy {
828-
healthy.push("Redis")
829-
} else {
830-
unhealthy.push("Redis")
831-
}
825+
// Spawn Redis check
826+
let redis_client_clone = redis_client.clone();
827+
join_set.spawn(async move {
828+
let healthy = redis_client_clone.is_connected();
829+
("Redis", healthy)
830+
});
832831

833-
// Check Redpanda/Kafka connectivity only if streaming is enabled
832+
// Spawn Redpanda check (if enabled)
834833
if project.features.streaming_engine {
835-
match kafka::client::health_check(&project.redpanda_config).await {
836-
Ok(_) => healthy.push("Redpanda"),
837-
Err(e) => {
838-
warn!("Health check: Redpanda unavailable: {}", e);
839-
unhealthy.push("Redpanda")
834+
let redpanda_config = project.redpanda_config.clone();
835+
join_set.spawn(async move {
836+
match kafka::client::health_check(&redpanda_config).await {
837+
Ok(_) => ("Redpanda", true),
838+
Err(e) => {
839+
warn!("Health check: Redpanda unavailable: {}", e);
840+
("Redpanda", false)
841+
}
840842
}
841-
}
843+
});
842844
}
843845

844-
// Check ClickHouse connectivity only if OLAP is enabled
846+
// Spawn ClickHouse check (if enabled)
845847
if project.features.olap {
846-
let olap_client = clickhouse::create_client(project.clickhouse_config.clone());
847-
match olap_client.client.query("SELECT 1").execute().await {
848-
Ok(_) => healthy.push("ClickHouse"),
848+
let clickhouse_config = project.clickhouse_config.clone();
849+
join_set.spawn(async move {
850+
let olap_client = clickhouse::create_client(clickhouse_config);
851+
match olap_client.client.query("SELECT 1").execute().await {
852+
Ok(_) => ("ClickHouse", true),
853+
Err(e) => {
854+
warn!("Health check: ClickHouse unavailable: {}", e);
855+
("ClickHouse", false)
856+
}
857+
}
858+
});
859+
}
860+
861+
// Spawn Consumption API check (if enabled)
862+
if project.features.apis {
863+
let consumption_api_port = project.http_server_config.proxy_port;
864+
join_set.spawn(async move {
865+
let health_url = format!(
866+
"http://localhost:{}/_moose_internal/health",
867+
consumption_api_port
868+
);
869+
let client = match reqwest::Client::builder()
870+
.timeout(Duration::from_secs(2))
871+
.build()
872+
{
873+
Ok(c) => c,
874+
Err(e) => {
875+
warn!("Health check: Failed to create HTTP client: {}", e);
876+
return ("Consumption API", false);
877+
}
878+
};
879+
880+
match client.get(&health_url).send().await {
881+
Ok(response) if response.status().is_success() => ("Consumption API", true),
882+
Ok(response) => {
883+
warn!(
884+
"Health check: Consumption API returned status {}",
885+
response.status()
886+
);
887+
("Consumption API", false)
888+
}
889+
Err(e) => {
890+
warn!("Health check: Consumption API unavailable: {}", e);
891+
("Consumption API", false)
892+
}
893+
}
894+
});
895+
}
896+
897+
// Collect results from JoinSet
898+
let mut healthy = Vec::new();
899+
let mut unhealthy = Vec::new();
900+
901+
while let Some(result) = join_set.join_next().await {
902+
match result {
903+
Ok((service, is_healthy)) => {
904+
if is_healthy {
905+
healthy.push(service);
906+
} else {
907+
unhealthy.push(service);
908+
}
909+
}
849910
Err(e) => {
850-
warn!("Health check: ClickHouse unavailable: {}", e);
851-
unhealthy.push("ClickHouse")
911+
warn!("Health check task failed: {}", e);
852912
}
853913
}
854914
}

apps/framework-cli/src/framework/python/wrappers/consumption_runner.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,18 @@ def handle_request(self):
178178
raw_path = parsed_path.path
179179
method = self.command
180180

181+
# Health check - checked before all other routes
182+
if raw_path == "/_moose_internal/health":
183+
self.send_response(200)
184+
self.send_header("Content-Type", "application/json")
185+
self.end_headers()
186+
response = json.dumps({
187+
"status": "healthy",
188+
"timestamp": datetime.now(timezone.utc).isoformat()
189+
})
190+
self.wfile.write(response.encode())
191+
return
192+
181193
# Read request body for POST/PUT/PATCH methods
182194
content_length = int(self.headers.get('Content-Length', 0))
183195
request_body = self.rfile.read(content_length) if content_length > 0 else b''

0 commit comments

Comments
 (0)