From 46c5eed235fc583433da0f36dbe0cf8e42a91ce4 Mon Sep 17 00:00:00 2001 From: vmenge Date: Wed, 5 Nov 2025 02:43:35 +0100 Subject: [PATCH 1/3] feat(jobs-agent): shutdown --- orb-jobs-agent/src/handlers/mod.rs | 1 + orb-jobs-agent/src/handlers/shutdown.rs | 33 ++++++++++++++ orb-jobs-agent/src/program.rs | 11 +++-- orb-jobs-agent/tests/shutdown.rs | 57 +++++++++++++++++++++++++ 4 files changed, 98 insertions(+), 4 deletions(-) create mode 100644 orb-jobs-agent/src/handlers/shutdown.rs create mode 100644 orb-jobs-agent/tests/shutdown.rs diff --git a/orb-jobs-agent/src/handlers/mod.rs b/orb-jobs-agent/src/handlers/mod.rs index c9da5fd6..6e71326e 100644 --- a/orb-jobs-agent/src/handlers/mod.rs +++ b/orb-jobs-agent/src/handlers/mod.rs @@ -8,6 +8,7 @@ pub mod read_gimbal; pub mod reboot; pub mod reset_gimbal; pub mod sec_mcu_reboot; +pub mod shutdown; pub mod update_versions; pub mod wifi_add; pub mod wifi_connect; diff --git a/orb-jobs-agent/src/handlers/shutdown.rs b/orb-jobs-agent/src/handlers/shutdown.rs new file mode 100644 index 00000000..d50ed893 --- /dev/null +++ b/orb-jobs-agent/src/handlers/shutdown.rs @@ -0,0 +1,33 @@ +use crate::job_system::ctx::Ctx; +use color_eyre::Result; +use futures::TryFutureExt; +use orb_relay_messages::jobs::v1::JobExecutionUpdate; +use std::{sync::Arc, time::Duration}; +use tokio::{ + task::{self}, + time, +}; +use tracing::{error, info}; + +/// command format: `shutdown` +#[tracing::instrument(skip(ctx))] +pub async fn handler(ctx: Ctx) -> Result { + let execution_id = ctx.execution_id().to_owned(); + info!(execution_id, "Shutting down orb"); + + let shell = Arc::clone(&ctx.deps().shell); + + task::spawn(async move { + time::sleep(Duration::from_secs(5)).await; + let result = shell.exec(&["shutdown", "now"]).and_then(async |child| { + child.wait_with_output().await?; + Ok(()) + }); + + if let Err(e) = result.await { + error!(execution_id, "failed to execute shutdown, err {e}"); + } + }); + + Ok(ctx.success()) +} diff --git a/orb-jobs-agent/src/program.rs b/orb-jobs-agent/src/program.rs index 9c3e81e7..02b985b7 100644 --- a/orb-jobs-agent/src/program.rs +++ b/orb-jobs-agent/src/program.rs @@ -1,8 +1,10 @@ +use std::sync::Arc; + use crate::{ handlers::{ beacon, check_my_orb, logs, mcu, orb_details, read_file, read_gimbal, reboot, - reset_gimbal, sec_mcu_reboot, update_versions, wifi_add, wifi_connect, wifi_ip, - wifi_remove, + reset_gimbal, sec_mcu_reboot, shutdown, update_versions, wifi_add, + wifi_connect, wifi_ip, wifi_remove, }, job_system::handler::JobHandler, settings::Settings, @@ -13,7 +15,7 @@ use tokio::fs; /// Dependencies used by the jobs-agent. pub struct Deps { - pub shell: Box, + pub shell: Arc, pub session_dbus: zbus::Connection, pub settings: Settings, } @@ -24,7 +26,7 @@ impl Deps { S: Shell + 'static, { Self { - shell: Box::new(shell), + shell: Arc::new(shell), session_dbus, settings, } @@ -50,6 +52,7 @@ pub async fn run(deps: Deps) -> Result<()> { .sequential("update_versions", update_versions::handler) .parallel_max("logs", 3, logs::handler) .sequential("reboot", reboot::handler) + .sequential("shutdown", shutdown::handler) .build(deps) .run() .await; diff --git a/orb-jobs-agent/tests/shutdown.rs b/orb-jobs-agent/tests/shutdown.rs new file mode 100644 index 00000000..00ef4095 --- /dev/null +++ b/orb-jobs-agent/tests/shutdown.rs @@ -0,0 +1,57 @@ +use std::{ + process::Stdio, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; + +use async_trait::async_trait; +use color_eyre::Result; +use common::fixture::JobAgentFixture; +use orb_jobs_agent::shell::Shell; +use orb_relay_messages::jobs::v1::JobExecutionStatus; +use tokio::{process::Child, time}; + +mod common; + +#[tokio::test] +async fn it_shuts_orb_down() { + // Arrange + let ms = MockShell::default(); + let fx = JobAgentFixture::new().await; + fx.program().shell(ms.clone()).spawn().await; + + // Act + fx.enqueue_job("shutdown").await.wait_for_completion().await; + + // Assert + let result = fx.execution_updates.read().await; + assert_eq!(result[0].status, JobExecutionStatus::Succeeded as i32); + + time::sleep(Duration::from_secs(5)).await; + let shutdown_called = ms.shutdown_called.load(Ordering::SeqCst); + assert!(shutdown_called); +} + +#[derive(Clone, Debug, Default)] +struct MockShell { + shutdown_called: Arc, +} + +#[async_trait] +impl Shell for MockShell { + async fn exec(&self, cmd: &[&str]) -> Result { + if cmd == ["shutdown", "now"] { + self.shutdown_called.store(true, Ordering::SeqCst); + } + + let child = tokio::process::Command::new("true") + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + Ok(child) + } +} From bb59790ce34473e6348a414d0430fb3055b62c0a Mon Sep 17 00:00:00 2001 From: vmenge Date: Wed, 5 Nov 2025 02:57:34 +0100 Subject: [PATCH 2/3] ree --- orb-jobs-agent/tests/shutdown.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/orb-jobs-agent/tests/shutdown.rs b/orb-jobs-agent/tests/shutdown.rs index 00ef4095..8db893a5 100644 --- a/orb-jobs-agent/tests/shutdown.rs +++ b/orb-jobs-agent/tests/shutdown.rs @@ -16,6 +16,7 @@ use tokio::{process::Child, time}; mod common; +#[cfg_attr(target_os = "macos", test_with::no_env(GITHUB_ACTIONS))] #[tokio::test] async fn it_shuts_orb_down() { // Arrange From 684b619e2cfa47d85bc3132282749a0824c2dc28 Mon Sep 17 00:00:00 2001 From: vmenge Date: Wed, 5 Nov 2025 03:15:51 +0100 Subject: [PATCH 3/3] chore(jobs-agent): increase buffer on shutdown wait time to avoid flakyness --- orb-jobs-agent/src/handlers/shutdown.rs | 2 +- orb-jobs-agent/tests/shutdown.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/orb-jobs-agent/src/handlers/shutdown.rs b/orb-jobs-agent/src/handlers/shutdown.rs index d50ed893..93ffd9f2 100644 --- a/orb-jobs-agent/src/handlers/shutdown.rs +++ b/orb-jobs-agent/src/handlers/shutdown.rs @@ -18,7 +18,7 @@ pub async fn handler(ctx: Ctx) -> Result { let shell = Arc::clone(&ctx.deps().shell); task::spawn(async move { - time::sleep(Duration::from_secs(5)).await; + time::sleep(Duration::from_secs(4)).await; let result = shell.exec(&["shutdown", "now"]).and_then(async |child| { child.wait_with_output().await?; Ok(()) diff --git a/orb-jobs-agent/tests/shutdown.rs b/orb-jobs-agent/tests/shutdown.rs index 8db893a5..8c8e27c4 100644 --- a/orb-jobs-agent/tests/shutdown.rs +++ b/orb-jobs-agent/tests/shutdown.rs @@ -31,7 +31,7 @@ async fn it_shuts_orb_down() { let result = fx.execution_updates.read().await; assert_eq!(result[0].status, JobExecutionStatus::Succeeded as i32); - time::sleep(Duration::from_secs(5)).await; + time::sleep(Duration::from_secs(6)).await; let shutdown_called = ms.shutdown_called.load(Ordering::SeqCst); assert!(shutdown_called); }