chore(rkl+rks): adjust rks-rkl communication, refactor rks resource watch#580
chore(rkl+rks): adjust rks-rkl communication, refactor rks resource watch#580InChh wants to merge 4 commits intork8s-dev:mainfrom
Conversation
InChh
commented
Apr 12, 2026
- Add bidirectional stream for rks-rkl communication. Only used for status detection module(hold RksClient instance instead of establish connection every time)
- Refactor ControllerManager resource watch function
- Fix test errors
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 683a2d3d87
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let mut stream = self_cloned.conn.accept_bi().await?; | ||
| let msg = stream.fetch_msg().await?; | ||
| info!("fetched bi stream message: {msg}"); | ||
| if is_worker { | ||
| log_error!(dispatch_worker_bi(msg, &mut stream, &self_cloned.shared).await); |
There was a problem hiding this comment.
Skip bi-stream accept loop for non-worker sessions
This loop accepts every incoming bi stream even when is_worker is false, but the non-worker branch does not dispatch it anywhere. In Verified::serve, user sessions already start spawn_user_bistream_loop() to handle attach/relay streams, so this creates two concurrent accept_bi() consumers on the same connection; whichever task wins can consume and drop the stream, causing user attach/exec bi-stream requests to hang or fail intermittently.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Pull request overview
This PR updates RKS↔RKL QUIC communication to support bi-directional streams (used by the RKL daemon status pipeline), refactors the RKS ControllerManager resource watch logic into a reusable per-kind loop, and adjusts/extends tests to match the updated APIs and resource coverage.
Changes:
- Add worker bi-stream request handling on the RKS node server/dispatcher and switch RKL status/probe/PLEG paths to use a shared daemon QUIC client + bi-stream requests.
- Refactor
ControllerManager::start_watchto watch a list of supported resources via a common snapshot/watch loop and add Job watch support inXlineStore. - Update tests to use
ContainerSpec: Defaultand add controller-manager + Job GC coverage.
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| project/rks/src/node/server.rs | Adds concurrent uni + bi-stream dispatch loops for a connection. |
| project/rks/src/node/dispatch.rs | Adds worker bi-stream message dispatch + shared pod-status update helper. |
| project/rks/src/controllers/manager.rs | Refactors resource watching into generic per-resource watch tasks. |
| project/rks/src/api/xlinestore.rs | Adds jobs snapshot/watch helpers for ControllerManager. |
| project/rkl/src/daemon/client.rs | Introduces global shared daemon QUIC client getter for status/probe paths. |
| project/rkl/src/daemon/status.rs | Switches UID-based pod lookups to use bi-stream requests via shared client. |
| project/rkl/src/daemon/status/status_manager.rs | Removes per-call reconnects; uses shared client + bi-stream update path. |
| project/rkl/src/daemon/status/probe/probe_manager.rs | Restores probes via shared client + bi-stream ListPod. |
| project/rkl/src/daemon/status/pod.rs | Lists pods using shared client + bi-stream ListPod. |
| project/rkl/src/daemon/status/pleg.rs | Removes addr/tls from PLEG; uses shared client-based pod listing. |
| project/rkl/src/daemon/pod_worker.rs | Uses shared client-based pod lookup; adds sandbox-event filtering. |
| project/rkl/src/daemon/mod.rs | Wires daemon components to new shared-client-based constructors. |
| project/rkl/src/task.rs | Changes the pause/sandbox image reference. |
| project/common/src/lib.rs | Derives Default for ContainerSpec to simplify construction. |
| project/rks/tests/test_controller_manager.rs | New tests validating watcher dispatch, snapshot replay, update/delete translation. |
| project/rks/tests/test_garbage_collector.rs | Adds Job-owned pod GC regression test + refactors container spec helper. |
| project/rks/tests/test_scheduler.rs | Updates test pod creation and adds node-affinity constraint. |
| project/rks/tests/test_replicaset.rs | Simplifies container spec construction with Default. |
| project/rks/tests/test_deployment.rs | Simplifies container spec construction with Default. |
| project/rkl/src/commands/container/mod.rs | Updates tests to use ContainerSpec: Default. |
| let self_cloned = self.clone(); | ||
| let bi_stream_task = tokio::spawn(async move { | ||
| // Main loop: accept bi-directional stream application messages for ongoing communication | ||
| loop { | ||
| let mut stream = self_cloned.conn.accept_bi().await?; | ||
| let msg = stream.fetch_msg().await?; | ||
| info!("fetched bi stream message: {msg}"); | ||
| if is_worker { | ||
| log_error!(dispatch_worker_bi(msg, &mut stream, &self_cloned.shared).await); | ||
| } | ||
| } | ||
| #[allow(unreachable_code)] | ||
| Ok::<(), anyhow::Error>(()) | ||
| }); | ||
|
|
||
| uni_stream_task.await??; | ||
| bi_stream_task.await??; |
| async fn dispatch_loop(self: Arc<Self>, is_worker: bool) -> anyhow::Result<()> { | ||
| let self_cloned = self.clone(); | ||
| let uni_stream_task = tokio::spawn(async move { | ||
| // Main loop: accept application messages for ongoing communication | ||
| loop { | ||
| let msg = self_cloned.conn.fetch_msg().await?; | ||
| info!("fetched message: {msg}"); | ||
|
|
||
| if is_worker { | ||
| log_error!(dispatch_worker(msg, &self.conn, &self.shared).await); | ||
| continue; | ||
| if is_worker { | ||
| log_error!(dispatch_worker(msg, &self_cloned.conn, &self_cloned.shared).await); | ||
| continue; | ||
| } | ||
|
|
||
| log_error!(dispatch_user(msg, &self_cloned.conn, &self_cloned.shared).await) | ||
| } | ||
| #[allow(unreachable_code)] | ||
| Ok::<(), anyhow::Error>(()) | ||
| }); | ||
|
|
||
| log_error!(dispatch_user(msg, &self.conn, &self.shared).await) | ||
| } | ||
| let self_cloned = self.clone(); | ||
| let bi_stream_task = tokio::spawn(async move { | ||
| // Main loop: accept bi-directional stream application messages for ongoing communication | ||
| loop { | ||
| let mut stream = self_cloned.conn.accept_bi().await?; | ||
| let msg = stream.fetch_msg().await?; | ||
| info!("fetched bi stream message: {msg}"); | ||
| if is_worker { | ||
| log_error!(dispatch_worker_bi(msg, &mut stream, &self_cloned.shared).await); | ||
| } | ||
| } | ||
| #[allow(unreachable_code)] | ||
| Ok::<(), anyhow::Error>(()) | ||
| }); | ||
|
|
||
| uni_stream_task.await??; | ||
| bi_stream_task.await??; | ||
| Ok(()) |
| RksMessage::GetPodByUid(pod_uid) => { | ||
| info!( | ||
| target: "rks::node::worker_dispatch_bi", | ||
| "retrieved Pod with UID {pod_uid} for bi-directional request" | ||
| ); | ||
|
|
||
| if let Some(pod) = xline_store | ||
| .list_pods() | ||
| .await? | ||
| .into_iter() | ||
| .find(|p| p.metadata.uid == pod_uid) | ||
| { |
| for spec in RESOURCE_WATCH_SPECS { | ||
| self.clone().spawn_resource_watch(store.clone(), spec); | ||
| } | ||
| Ok(()) | ||
| } | ||
|
|
| let resp = client.get(prefix, opts).await?; | ||
|
|
||
| let mut items = Vec::new(); | ||
| let rev = resp.header().unwrap().revision(); |
| impl StatusManager { | ||
| /// Creates a new [`StatusManager`] with a QUIC connection to the rks API server. | ||
| /// | ||
| /// # Arguments | ||
| /// * `server_addr` - The rks API server address (e.g., "127.0.0.1:6000") | ||
| /// * `tls_cfg` - TLS configuration for the QUIC connection | ||
| /// | ||
| /// # Errors | ||
| /// Returns an error if the QUIC connection fails to establish. | ||
| pub async fn try_new( | ||
| server_address: String, | ||
| tls_cfg: Arc<TLSConnectionArgs>, | ||
| ) -> anyhow::Result<Self> { | ||
| /// Creates a new [`StatusManager`]. | ||
| pub async fn new() -> Self { | ||
| let pod_statuses = Arc::new(DashMap::new()); | ||
| let pending_container_readiness = Arc::new(DashMap::new()); | ||
| let pod_status_update_signal = Arc::new(Notify::new()); | ||
| let api_status_versions = Arc::new(DashMap::new()); | ||
| Ok(StatusManager { | ||
| server_address, | ||
| tls_cfg, | ||
| StatusManager { | ||
| pod_statuses, | ||
| pending_container_readiness, | ||
| pod_status_update_signal, | ||
| api_status_versions, | ||
| sync_loop_handle: None, | ||
| }) | ||
| } | ||
| } |
| // 1. Get sandbox bundle path | ||
| let sandbox_spec = ContainerSpec { | ||
| name: "sandbox".to_string(), | ||
| // FIXME: SHOULD define a const variable image name | ||
| image: "pause:3.9".to_string(), | ||
| image: "lingbou/pause:3.9".to_string(), | ||
| ports: vec![], | ||
| args: vec![], | ||
| tty: false, |
| // 1. Get sandbox bundle path | ||
| let sandbox_spec = ContainerSpec { | ||
| name: "sandbox".to_string(), | ||
| // FIXME: SHOULD define a const variable image name | ||
| image: "pause:3.9".to_string(), | ||
| image: "lingbou/pause:3.9".to_string(), | ||
| // image: "/home/harry/Documents/rk8s/project/test/bundles/pause".to_string(), | ||
| ports: vec![], | ||
| args: vec![], |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7997dcf07a
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| uni_stream_task.await??; | ||
| bi_stream_task.await??; |
There was a problem hiding this comment.
Surface bi-stream failures before awaiting uni loop
dispatch_loop awaits uni_stream_task to completion before checking bi_stream_task, so a bi-stream accept/parse failure is effectively ignored while uni traffic (e.g., heartbeats) keeps flowing. In a worker session, that leaves the connection registered but stops all worker bi-stream RPC handling (GetPodByUid, ListPod, UpdatePodStatus), causing status/probe sync requests over bi streams to fail indefinitely until the uni loop eventually exits.
Useful? React with 👍 / 👎.
| let client = try_get_daemon_client().await?; | ||
| let mut stream = client.open_bi().await?; |
There was a problem hiding this comment.
Retry probe restoration when daemon client is unavailable
restore_existing_probes now immediately depends on the shared daemon client and returns an error as soon as try_get_daemon_client fails, but daemon startup invokes this restore path only once (daemon/mod.rs) right after spawning the connection loop. If the worker connection is not established yet, probe restoration is skipped for the whole run, leaving pre-existing pods without restored liveness/readiness probes until another restart.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3a18366b4a
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| } else { | ||
| time::sleep(Duration::from_secs(1)).await; | ||
| } | ||
| clear_daemon_client().await; |
There was a problem hiding this comment.
Clear stale daemon client before reconnect backoff
run_forever leaves DAEMON_CLIENT populated while sleeping after a disconnect, so try_get_daemon_client returns a dead connection for up to 3 seconds instead of waiting for a fresh one. The new status/probe paths introduced in this commit (get_pod_task_by_uid, list_pods, update_pod_status) then repeatedly call open_bi() on a stale client and fail each cycle, which delays status reconciliation and probe restoration during every reconnect window.
Useful? React with 👍 / 👎.