diff --git a/Cargo.lock b/Cargo.lock index abe512d3..4201a821 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -168,6 +168,17 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "async_zip" version = "0.0.16" @@ -2789,18 +2800,28 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -3613,6 +3634,7 @@ dependencies = [ name = "tower-runtime" version = "0.3.39" dependencies = [ + "async-trait", "chrono", "config", "nix 0.30.1", @@ -3862,9 +3884,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" +checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" dependencies = [ "getrandom 0.3.3", "js-sys", diff --git a/Cargo.toml b/Cargo.toml index 3d4dbb79..3a01da08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ repository = "https://github.com/tower/tower-cli" aes-gcm = "0.10" anyhow = "1.0.95" async-compression = { version = "0.4", features = ["tokio", "gzip"] } +async-trait = "0.1.89" async_zip = { version = "0.0.16", features = ["tokio", "tokio-fs", "deflate"] } axum = "0.8.4" base64 = "0.22" diff --git a/crates/config/src/session.rs b/crates/config/src/session.rs index 53830ccf..f7a22b07 100644 --- a/crates/config/src/session.rs +++ b/crates/config/src/session.rs @@ -21,7 +21,9 @@ fn extract_aid_from_jwt(jwt: &str) -> Option { let payload = parts[1]; let decoded = URL_SAFE_NO_PAD.decode(payload).ok()?; let json: serde_json::Value = serde_json::from_slice(&decoded).ok()?; - json.get("https://tower.dev/aid")?.as_str().map(String::from) + json.get("https://tower.dev/aid")? + .as_str() + .map(String::from) } const DEFAULT_TOWER_URL: &str = "https://api.tower.dev"; diff --git a/crates/tower-cmd/src/run.rs b/crates/tower-cmd/src/run.rs index 50a3de07..cea55397 100644 --- a/crates/tower-cmd/src/run.rs +++ b/crates/tower-cmd/src/run.rs @@ -5,18 +5,24 @@ use std::collections::HashMap; use std::path::PathBuf; use tower_api::models::Run; use tower_package::{Package, PackageSpec}; -use tower_runtime::{local::LocalApp, App, AppLauncher, OutputReceiver, Status}; +use tower_runtime::{OutputReceiver, Status}; use tower_telemetry::{debug, Context}; +use crate::{api, output, util::dates}; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::{ - mpsc::{unbounded_channel, Receiver as MpscReceiver}, + mpsc::Receiver as MpscReceiver, oneshot::{self, Receiver as OneshotReceiver}, Mutex, }; use tokio::time::{sleep, timeout, Duration}; - -use crate::{api, output, util::dates}; +use tower_runtime::execution::ExecutionHandle; +use tower_runtime::execution::{ + CacheBackend, CacheConfig, CacheIsolation, ExecutionBackend, ExecutionSpec, PackageRef, + ResourceLimits, RuntimeConfig as ExecRuntimeConfig, +}; +use tower_runtime::subprocess::SubprocessBackend; pub fn run_cmd() -> Command { Command::new("run") @@ -148,7 +154,7 @@ where env_vars.insert("TOWER_URL".to_string(), config.tower_url.to_string()); // There should always be a session, if there isn't one then I'm not sure how we got here? - let session = config.session.ok_or(Error::NoSession)?; + let session = config.session.as_ref().ok_or(Error::NoSession)?; env_vars.insert("TOWER_JWT".to_string(), session.token.jwt.to_string()); @@ -164,32 +170,34 @@ where // Build the package let mut package = build_package(&towerfile).await?; - // Unpack the package package.unpack().await?; - - let (sender, receiver) = unbounded_channel(); - output::success(&format!("Launching app `{}`", towerfile.app.name)); - let output_task = tokio::spawn(output_handler(receiver)); - - let mut launcher: AppLauncher = AppLauncher::default(); - launcher - .launch( - Context::new(), - sender, - package, - env.to_string(), - secrets, + let backend = SubprocessBackend::new(config.cache_dir.clone()); + let run_id = format!( + "cli-run-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + ); + let handle = backend + .create(build_cli_execution_spec( + config, + env, params, + secrets, env_vars, - config.cache_dir, - ) + &mut package, + run_id, + )) .await?; + let receiver = handle.logs().await?; + let output_task = tokio::spawn(output_handler(receiver)); - // Monitor app output and status concurrently - let app = Arc::new(Mutex::new(launcher.app.unwrap())); - let status_task = tokio::spawn(monitor_local_status(Arc::clone(&app))); + // Monitor app status concurrently + let handle = Arc::new(Mutex::new(handle)); + let status_task = tokio::spawn(monitor_cli_status(Arc::clone(&handle))); // Wait for app to complete or SIGTERM let status_result = tokio::select! { @@ -199,7 +207,7 @@ where }, _ = tokio::signal::ctrl_c(), if !output::get_output_mode().is_mcp() => { output::write("\nReceived Ctrl+C, stopping local run...\n"); - app.lock().await.terminate().await.ok(); + handle.lock().await.terminate().await.ok(); return Ok(output_task.await.unwrap()); } }; @@ -222,6 +230,57 @@ where Ok(final_result) } +fn build_cli_execution_spec( + config: Config, + env: &str, + params: HashMap, + secrets: HashMap, + env_vars: HashMap, + package: &mut Package, + run_id: String, +) -> ExecutionSpec { + let spec = ExecutionSpec { + id: run_id, + package: PackageRef::Local { + path: package + .unpacked_path + .clone() + .expect("Package must be unpacked before execution"), + }, + runtime: ExecRuntimeConfig { + image: "local".to_string(), + version: None, + cache: CacheConfig { + enable_bundle_cache: true, + enable_runtime_cache: true, + enable_dependency_cache: true, + backend: match config.cache_dir.clone() { + Some(dir) => CacheBackend::Local { cache_dir: dir }, + None => CacheBackend::None, + }, + isolation: CacheIsolation::None, + }, + entrypoint: None, + command: None, + }, + environment: env.to_string(), + secrets, + parameters: params, + env_vars, + resources: ResourceLimits { + cpu_millicores: None, + memory_mb: None, + storage_mb: None, + max_pids: None, + gpu_count: 0, + timeout_seconds: 3600, + }, + networking: None, + telemetry_ctx: Context::new(), + }; + spec +} + /// do_run_local is the entrypoint for running an app locally. It will load the Towerfile, build /// the package, and launch the app. The relevant package is cleaned up after execution is /// complete. @@ -595,8 +654,12 @@ async fn monitor_output(mut output: OutputReceiver) { /// monitor_local_status is a helper function that will monitor the status of a given app and waits for /// it to progress to a terminal state. -async fn monitor_local_status(app: Arc>) -> Status { - debug!("Starting status monitoring for LocalApp"); +async fn monitor_cli_status( + handle: Arc>, +) -> Status { + use tower_runtime::execution::ExecutionHandle as _; + + debug!("Starting status monitoring for CLI execution"); let mut check_count = 0; let mut err_count = 0; @@ -604,11 +667,11 @@ async fn monitor_local_status(app: Arc>) -> Status { check_count += 1; debug!( - "Status check #{}, attempting to get app status", + "Status check #{}, attempting to get CLI handle status", check_count ); - match app.lock().await.status().await { + match handle.lock().await.status().await { Ok(status) => { // We reset the error count to indicate that we can intermittently get statuses. err_count = 0; @@ -616,30 +679,27 @@ async fn monitor_local_status(app: Arc>) -> Status { match status { Status::Exited => { debug!("Run exited cleanly, stopping status monitoring"); - - // We're done. Exit this loop and function. return status; } Status::Crashed { .. } => { debug!("Run crashed, stopping status monitoring"); - - // We're done. Exit this loop and function. return status; } _ => { + debug!("Handle status: other, continuing to monitor"); sleep(Duration::from_millis(100)).await; } } } Err(e) => { - debug!("Failed to get app status: {:?}", e); + debug!("Failed to get handle status: {:?}", e); err_count += 1; // If we get five errors in a row, we abandon monitoring. if err_count >= 5 { - debug!("Failed to get app status after 5 attempts, giving up"); + debug!("Failed to get handle status after 5 attempts, giving up"); output::error("An error occured while monitoring your local run status!"); - return tower_runtime::Status::Crashed { code: -1 }; + return Status::Crashed { code: -1 }; } // Otherwise, keep on keepin' on. diff --git a/crates/tower-runtime/Cargo.toml b/crates/tower-runtime/Cargo.toml index 8ab42fb5..fbeb31bb 100644 --- a/crates/tower-runtime/Cargo.toml +++ b/crates/tower-runtime/Cargo.toml @@ -7,14 +7,15 @@ rust-version = { workspace = true } license = { workspace = true } [dependencies] +async-trait = { workspace = true } chrono = { workspace = true } nix = { workspace = true } -snafu = { workspace = true } +snafu = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } -tower-package = { workspace = true } -tower-telemetry = { workspace = true } -tower-uv = { workspace = true } +tower-package = { workspace = true } +tower-telemetry = { workspace = true } +tower-uv = { workspace = true } [dev-dependencies] config = { workspace = true } diff --git a/crates/tower-runtime/src/errors.rs b/crates/tower-runtime/src/errors.rs index 19c8ed10..8af364b8 100644 --- a/crates/tower-runtime/src/errors.rs +++ b/crates/tower-runtime/src/errors.rs @@ -64,6 +64,15 @@ pub enum Error { #[snafu(display("cancelled"))] Cancelled, + + #[snafu(display("app not started"))] + AppNotStarted, + + #[snafu(display("no execution handle"))] + NoHandle, + + #[snafu(display("invalid package"))] + InvalidPackage, } impl From for Error { diff --git a/crates/tower-runtime/src/execution.rs b/crates/tower-runtime/src/execution.rs new file mode 100644 index 00000000..5e901244 --- /dev/null +++ b/crates/tower-runtime/src/execution.rs @@ -0,0 +1,250 @@ +//! Execution backend abstraction for Tower +//! +//! This module provides traits and types for abstracting execution backends, +//! allowing Tower to support multiple compute substrates (local processes, +//! Kubernetes pods, etc.) through a uniform interface. + +use async_trait::async_trait; +use std::collections::HashMap; +use std::path::PathBuf; + +use crate::errors::Error; +use crate::{OutputReceiver, Status}; + +// ============================================================================ +// Core Execution Types +// ============================================================================ + +/// ExecutionSpec describes what to execute and how +#[derive(Debug, Clone)] +pub struct ExecutionSpec { + /// Unique identifier for this execution (e.g., run_id) + pub id: String, + + /// Package reference (how to get the application code) + pub package: PackageRef, + + /// Runtime configuration (image, version, etc.) + pub runtime: RuntimeConfig, + + /// Environment name (e.g., "production", "staging", "default") + pub environment: String, + + /// Secret key-value pairs to inject + pub secrets: HashMap, + + /// Parameter key-value pairs to inject + pub parameters: HashMap, + + /// Additional environment variables + pub env_vars: HashMap, + + /// Resource limits for execution + pub resources: ResourceLimits, + + /// Networking configuration (for service workloads) + pub networking: Option, + + /// Telemetry context for tracing + pub telemetry_ctx: tower_telemetry::Context, +} + +/// PackageRef describes where to get the application bundle +#[derive(Debug, Clone)] +pub enum PackageRef { + /// Local filesystem path + Local { path: PathBuf }, +} + +/// RuntimeConfig specifies the execution runtime environment +#[derive(Debug, Clone)] +pub struct RuntimeConfig { + /// Runtime image to use (e.g., "towerhq/tower-runtime:python-3.11") + pub image: String, + + /// Specific version/tag if applicable + pub version: Option, + + /// Cache configuration + pub cache: CacheConfig, + + /// Entrypoint override (if not using bundle's default) + pub entrypoint: Option>, + + /// Command override (if not using bundle's default) + pub command: Option>, +} + +/// CacheConfig describes what should be cached +#[derive(Debug, Clone)] +pub struct CacheConfig { + /// Enable bundle caching (content-addressable by checksum) + pub enable_bundle_cache: bool, + + /// Enable runtime layer caching (container image layers) + pub enable_runtime_cache: bool, + + /// Enable dependency caching (language-specific, e.g., pip cache, node_modules) + pub enable_dependency_cache: bool, + + /// Cache backend to use + pub backend: CacheBackend, + + /// Cache isolation strategy + pub isolation: CacheIsolation, +} + +/// CacheIsolation defines security boundaries for caches +#[derive(Debug, Clone)] +pub enum CacheIsolation { + /// Global sharing (safe for immutable content-addressable caches) + Global, + + /// Per-account isolation + PerAccount { account_id: String }, + + /// Per-app isolation + PerApp { app_id: String }, + + /// No isolation + None, +} + +/// CacheBackend describes where caches are stored +#[derive(Debug, Clone)] +pub enum CacheBackend { + /// Local filesystem cache + Local { cache_dir: PathBuf }, + + /// No caching + None, +} + +/// ResourceLimits defines compute resource constraints +#[derive(Debug, Clone)] +pub struct ResourceLimits { + /// CPU limit in millicores (e.g., 1000 = 1 CPU) + pub cpu_millicores: Option, + + /// Memory limit in megabytes + pub memory_mb: Option, + + /// Ephemeral storage limit in megabytes + pub storage_mb: Option, + + /// Maximum number of processes + pub max_pids: Option, + + /// GPU count + pub gpu_count: u32, + + /// Execution timeout in seconds + pub timeout_seconds: u32, +} + +/// NetworkingSpec defines networking requirements +#[derive(Debug, Clone)] +pub struct NetworkingSpec { + /// Port the app listens on + pub port: u16, + + /// Whether this app needs a stable service endpoint + pub expose_service: bool, + + /// Service name (for DNS) + pub service_name: Option, +} + +// ============================================================================ +// Execution Backend Trait +// ============================================================================ + +/// ExecutionBackend abstracts the compute substrate +#[async_trait] +pub trait ExecutionBackend: Send + Sync { + /// The handle type this backend returns + type Handle: ExecutionHandle; + + /// Create a new execution environment + async fn create(&self, spec: ExecutionSpec) -> Result; + + /// Get backend capabilities + fn capabilities(&self) -> BackendCapabilities; + + /// Cleanup backend resources + async fn cleanup(&self) -> Result<(), Error>; +} + +/// BackendCapabilities describes what a backend supports +#[derive(Debug, Clone)] +pub struct BackendCapabilities { + /// Backend name + pub name: String, + + /// Supports persistent volumes for caching + pub supports_persistent_cache: bool, + + /// Supports pre-warmed environments + pub supports_prewarming: bool, + + /// Supports network isolation + pub supports_network_isolation: bool, + + /// Supports service endpoints + pub supports_service_endpoints: bool, + + /// Typical startup latency in milliseconds + pub typical_cold_start_ms: u64, + pub typical_warm_start_ms: u64, + + /// Maximum concurrent executions + pub max_concurrent_executions: Option, +} + +// ============================================================================ +// Execution Handle Trait +// ============================================================================ + +/// ExecutionHandle represents a running execution +#[async_trait] +pub trait ExecutionHandle: Send + Sync { + /// Get a unique identifier for this execution + fn id(&self) -> &str; + + /// Get current execution status + async fn status(&self) -> Result; + + /// Subscribe to log stream + async fn logs(&self) -> Result; + + /// Terminate execution gracefully + async fn terminate(&mut self) -> Result<(), Error>; + + /// Force kill execution + async fn kill(&mut self) -> Result<(), Error>; + + /// Wait for execution to complete + async fn wait_for_completion(&self) -> Result; + + /// Get service endpoint + async fn service_endpoint(&self) -> Result, Error>; + + /// Cleanup resources + async fn cleanup(&mut self) -> Result<(), Error>; +} + +/// ServiceEndpoint describes how to reach a running service +#[derive(Debug, Clone)] +pub struct ServiceEndpoint { + /// Host/IP to connect to + pub host: String, + + /// Port to connect to + pub port: u16, + + /// Protocol (http, https, tcp, etc.) + pub protocol: String, + + /// Full URL if applicable (e.g., "http://app-run-123.default.svc.cluster.local:8080") + pub url: Option, +} diff --git a/crates/tower-runtime/src/lib.rs b/crates/tower-runtime/src/lib.rs index 195c3e26..74091edc 100644 --- a/crates/tower-runtime/src/lib.rs +++ b/crates/tower-runtime/src/lib.rs @@ -5,10 +5,11 @@ use std::path::PathBuf; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tower_package::Package; -use tower_telemetry::debug; pub mod errors; +pub mod execution; pub mod local; +pub mod subprocess; use errors::Error; @@ -47,7 +48,7 @@ pub type OutputReceiver = UnboundedReceiver; pub type OutputSender = UnboundedSender; -pub trait App { +pub trait App: Send + Sync { // start will start the process fn start(opts: StartOptions) -> impl Future> + Send where @@ -60,73 +61,6 @@ pub trait App { fn status(&self) -> impl Future> + Send; } -pub struct AppLauncher { - pub app: Option, -} - -impl std::default::Default for AppLauncher { - fn default() -> Self { - Self { app: None } - } -} - -impl AppLauncher { - pub async fn launch( - &mut self, - ctx: tower_telemetry::Context, - output_sender: OutputSender, - package: Package, - environment: String, - secrets: HashMap, - parameters: HashMap, - env_vars: HashMap, - cache_dir: Option, - ) -> Result<(), Error> { - let cwd = package.unpacked_path.clone().unwrap().to_path_buf(); - - let opts = StartOptions { - ctx, - output_sender, - cwd: Some(cwd), - environment, - secrets, - parameters, - package, - env_vars, - cache_dir, - }; - - // NOTE: This is a really awful hack to force any existing app to drop itself. Not certain - // this is exactly what we want to do... - self.app = None; - - let res = A::start(opts).await; - - if let Ok(app) = res { - self.app = Some(app); - Ok(()) - } else { - self.app = None; - Err(res.err().unwrap()) - } - } - - pub async fn terminate(&mut self) -> Result<(), Error> { - if let Some(app) = &mut self.app { - if let Err(err) = app.terminate().await { - debug!("failed to terminate app: {}", err); - Err(err) - } else { - self.app = None; - Ok(()) - } - } else { - // There's no app, so nothing to terminate. - Ok(()) - } - } -} - pub struct StartOptions { pub ctx: tower_telemetry::Context, pub package: Package, diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index ee4968e2..57d15ba0 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -341,6 +341,18 @@ impl App for LocalApp { }) } + async fn terminate(&mut self) -> Result<(), Error> { + self.terminator.cancel(); + + // Now we should wait for the join handle to finish. + if let Some(execute_handle) = self.execute_handle.take() { + let _ = execute_handle.await; + self.execute_handle = None; + } + + Ok(()) + } + async fn status(&self) -> Result { let mut status = self.status.lock().await; @@ -367,18 +379,6 @@ impl App for LocalApp { } } } - - async fn terminate(&mut self) -> Result<(), Error> { - self.terminator.cancel(); - - // Now we should wait for the join handle to finish. - if let Some(execute_handle) = self.execute_handle.take() { - let _ = execute_handle.await; - self.execute_handle = None; - } - - Ok(()) - } } async fn execute_bash_program( diff --git a/crates/tower-runtime/src/subprocess.rs b/crates/tower-runtime/src/subprocess.rs new file mode 100644 index 00000000..57e99e41 --- /dev/null +++ b/crates/tower-runtime/src/subprocess.rs @@ -0,0 +1,155 @@ +//! Subprocess execution backend + +use crate::errors::Error; +use crate::execution::{ + BackendCapabilities, CacheBackend, ExecutionBackend, ExecutionHandle, ExecutionSpec, + PackageRef, ServiceEndpoint, +}; +use crate::local::LocalApp; +use crate::{App, OutputReceiver, StartOptions, Status}; + +use async_trait::async_trait; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio::time::Duration; +use tower_package::Package; + +/// SubprocessBackend executes apps as a subprocess +pub struct SubprocessBackend { + /// Optional default cache directory to use + cache_dir: Option, +} + +impl SubprocessBackend { + pub fn new(cache_dir: Option) -> Self { + Self { cache_dir } + } +} + +#[async_trait] +impl ExecutionBackend for SubprocessBackend { + type Handle = SubprocessHandle; + + async fn create(&self, spec: ExecutionSpec) -> Result { + // Convert ExecutionSpec to StartOptions for LocalApp + let (output_sender, output_receiver) = tokio::sync::mpsc::unbounded_channel(); + + // Get cache_dir from spec or use backend default + let cache_dir = match &spec.runtime.cache.backend { + CacheBackend::Local { cache_dir } => Some(cache_dir.clone()), + _ => self.cache_dir.clone(), + }; + + let opts = StartOptions { + ctx: spec.telemetry_ctx, + package: match spec.package { + PackageRef::Local { path } => Package::from_unpacked_path(path).await, + }, + cwd: None, // LocalApp determines cwd from package + environment: spec.environment, + secrets: spec.secrets, + parameters: spec.parameters, + env_vars: spec.env_vars, + output_sender: output_sender.clone(), + cache_dir, + }; + + // Start the LocalApp + let app = LocalApp::start(opts).await?; + + Ok(SubprocessHandle { + id: spec.id, + app: Arc::new(Mutex::new(app)), + output_receiver: Arc::new(Mutex::new(output_receiver)), + }) + } + + fn capabilities(&self) -> BackendCapabilities { + BackendCapabilities { + name: "local".to_string(), + supports_persistent_cache: true, + supports_prewarming: false, + supports_network_isolation: false, + supports_service_endpoints: false, + typical_cold_start_ms: 1000, // ~1s for venv + sync + typical_warm_start_ms: 100, // ~100ms with warm cache + max_concurrent_executions: None, // Limited by system resources + } + } + + async fn cleanup(&self) -> Result<(), Error> { + // Nothing to cleanup for local backend + Ok(()) + } +} + +/// SubprocessHandle provides lifecycle management for a subprocess execution +pub struct SubprocessHandle { + id: String, + app: Arc>, + output_receiver: Arc>, +} + +#[async_trait] +impl ExecutionHandle for SubprocessHandle { + fn id(&self) -> &str { + &self.id + } + + async fn status(&self) -> Result { + let app = self.app.lock().await; + app.status().await + } + + async fn logs(&self) -> Result { + // Create a new channel for log streaming + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + // Spawn a task to forward Output from the internal receiver + let output_receiver = self.output_receiver.clone(); + tokio::spawn(async move { + let mut receiver = output_receiver.lock().await; + while let Some(output) = receiver.recv().await { + if tx.send(output).is_err() { + break; // Receiver dropped + } + } + }); + + Ok(rx) + } + + async fn terminate(&mut self) -> Result<(), Error> { + let mut app = self.app.lock().await; + app.terminate().await + } + + async fn kill(&mut self) -> Result<(), Error> { + // For local processes, kill is the same as terminate + self.terminate().await + } + + async fn wait_for_completion(&self) -> Result { + loop { + let status = self.status().await?; + match status { + Status::None | Status::Running => { + tokio::time::sleep(Duration::from_millis(100)).await; + } + _ => return Ok(status), + } + } + } + + async fn service_endpoint(&self) -> Result, Error> { + // Local backend doesn't support service endpoints + Ok(None) + } + + async fn cleanup(&mut self) -> Result<(), Error> { + // Ensure the app is terminated + self.terminate().await?; + Ok(()) + } +}