Skip to content

Commit 96c0621

Browse files
committed
feat: added health service impl.
1 parent a42f717 commit 96c0621

File tree

1 file changed

+69
-0
lines changed

1 file changed

+69
-0
lines changed

src/flow_health/mod.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use futures_core::Stream;
2+
use std::pin::Pin;
3+
4+
use tonic::{Request, Response, Status};
5+
use tonic_health::pb::{
6+
HealthCheckRequest, HealthCheckResponse, health_check_response::ServingStatus,
7+
health_server::Health,
8+
};
9+
10+
#[derive(Debug)]
11+
pub struct HealthService {
12+
nats_url: String,
13+
}
14+
15+
impl HealthService {
16+
pub fn new(nats_url: String) -> Self {
17+
Self { nats_url }
18+
}
19+
20+
async fn check_nats_connection(&self) -> bool {
21+
match async_nats::connect(&self.nats_url).await {
22+
Ok(_client) => {
23+
// Successfully connected to NATS
24+
true
25+
}
26+
Err(_) => false,
27+
}
28+
}
29+
}
30+
31+
#[tonic::async_trait]
32+
impl Health for HealthService {
33+
async fn check(
34+
&self,
35+
request: Request<HealthCheckRequest>,
36+
) -> Result<Response<HealthCheckResponse>, Status> {
37+
let service = request.into_inner().service.to_lowercase();
38+
39+
match service.as_str() {
40+
"liveness" => Ok(Response::new(HealthCheckResponse {
41+
status: ServingStatus::Serving as i32,
42+
})),
43+
"readiness" => {
44+
let nats_healthy = self.check_nats_connection().await;
45+
let status = if nats_healthy {
46+
ServingStatus::Serving
47+
} else {
48+
ServingStatus::NotServing
49+
};
50+
Ok(Response::new(HealthCheckResponse {
51+
status: status as i32,
52+
}))
53+
}
54+
_ => Err(Status::invalid_argument(
55+
"Unknown service. Only `liveness` and `readiness` are supported.",
56+
)),
57+
}
58+
}
59+
60+
type WatchStream =
61+
Pin<Box<dyn Stream<Item = Result<HealthCheckResponse, Status>> + Send + 'static>>;
62+
63+
async fn watch(
64+
&self,
65+
_request: Request<HealthCheckRequest>,
66+
) -> Result<Response<Self::WatchStream>, Status> {
67+
Err(Status::unimplemented("Watch is not implemented"))
68+
}
69+
}

0 commit comments

Comments
 (0)