Skip to content
Open
632 changes: 620 additions & 12 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ repository = "https://github.com/tower/tower-cli"
aes-gcm = "0.10"
anyhow = "1.0.95"
async-compression = { version = "0.4", features = ["tokio", "gzip"] }
async-trait = "0.1.89"
async_zip = { version = "0.0.16", features = ["tokio", "tokio-fs", "deflate"] }
axum = "0.8.4"
base64 = "0.22"
Expand Down Expand Up @@ -66,6 +67,8 @@ tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
url = { version = "2", features = ["serde"] }
webbrowser = "1"
k8s-openapi = { version = "0.27.0", features = ["v1_31"] }
kube = { version = "3.0.0", features = ["runtime", "derive", "client", "rustls-tls"], default-features = false }

# The profile that 'dist' will build with
[profile.dist]
Expand Down
4 changes: 3 additions & 1 deletion crates/config/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ fn extract_aid_from_jwt(jwt: &str) -> Option<String> {
let payload = parts[1];
let decoded = URL_SAFE_NO_PAD.decode(payload).ok()?;
let json: serde_json::Value = serde_json::from_slice(&decoded).ok()?;
json.get("https://tower.dev/aid")?.as_str().map(String::from)
json.get("https://tower.dev/aid")?
.as_str()
.map(String::from)
}

const DEFAULT_TOWER_URL: &str = "https://api.tower.dev";
Expand Down
40 changes: 20 additions & 20 deletions crates/tower-cmd/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use std::collections::HashMap;
use std::path::PathBuf;
use tower_api::models::Run;
use tower_package::{Package, PackageSpec};
use tower_runtime::{local::LocalApp, App, AppLauncher, OutputReceiver, Status};
use tower_runtime::{OutputReceiver, Status};
use tower_telemetry::{debug, Context};

use std::sync::Arc;
use tokio::sync::{
mpsc::{unbounded_channel, Receiver as MpscReceiver},
mpsc::unbounded_channel,
mpsc::Receiver as MpscReceiver,
oneshot::{self, Receiver as OneshotReceiver},
Mutex,
};
Expand Down Expand Up @@ -168,13 +169,16 @@ where
// Unpack the package
package.unpack().await?;

// Create output channel - simple pattern for CLI
let (sender, receiver) = unbounded_channel();

output::success(&format!("Launching app `{}`", towerfile.app.name));
let output_task = tokio::spawn(output_handler(receiver));

let mut launcher: AppLauncher<LocalApp> = AppLauncher::default();
launcher
// Create backend and launch app
use tower_runtime::backends::cli::CliBackend;
let backend = CliBackend::new(config.cache_dir);
let handle = backend
.launch(
Context::new(),
sender,
Expand All @@ -183,13 +187,12 @@ where
secrets,
params,
env_vars,
config.cache_dir,
)
.await?;

// Monitor app output and status concurrently
let app = Arc::new(Mutex::new(launcher.app.unwrap()));
let status_task = tokio::spawn(monitor_local_status(Arc::clone(&app)));
// Monitor app status concurrently
let handle = Arc::new(Mutex::new(handle));
let status_task = tokio::spawn(monitor_cli_status(Arc::clone(&handle)));

// Wait for app to complete or SIGTERM
let status_result = tokio::select! {
Expand All @@ -199,7 +202,7 @@ where
},
_ = tokio::signal::ctrl_c(), if !output::get_output_mode().is_mcp() => {
output::write("\nReceived Ctrl+C, stopping local run...\n");
app.lock().await.terminate().await.ok();
handle.lock().await.terminate().await.ok();
return Ok(output_task.await.unwrap());
}
};
Expand Down Expand Up @@ -595,51 +598,48 @@ async fn monitor_output(mut output: OutputReceiver) {

/// monitor_local_status is a helper function that will monitor the status of a given app and waits for
/// it to progress to a terminal state.
async fn monitor_local_status(app: Arc<Mutex<LocalApp>>) -> Status {
debug!("Starting status monitoring for LocalApp");
async fn monitor_cli_status(handle: Arc<Mutex<tower_runtime::backends::cli::CliHandle>>) -> Status {
Comment on lines 599 to +601
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was renamed to "cli" but runner in third party infrastructure (e.g. self-hosted runners) will use local processes too, not Kubernetes...

debug!("Starting status monitoring for CLI execution");
let mut check_count = 0;
let mut err_count = 0;

loop {
check_count += 1;

debug!(
"Status check #{}, attempting to get app status",
"Status check #{}, attempting to get CLI handle status",
check_count
);

match app.lock().await.status().await {
match handle.lock().await.status().await {
Ok(status) => {
// We reset the error count to indicate that we can intermittently get statuses.
err_count = 0;

match status {
Status::Exited => {
debug!("Run exited cleanly, stopping status monitoring");

// We're done. Exit this loop and function.
return status;
}
Status::Crashed { .. } => {
debug!("Run crashed, stopping status monitoring");

// We're done. Exit this loop and function.
return status;
}
_ => {
debug!("Handle status: other, continuing to monitor");
sleep(Duration::from_millis(100)).await;
}
}
}
Err(e) => {
debug!("Failed to get app status: {:?}", e);
debug!("Failed to get handle status: {:?}", e);
err_count += 1;

// If we get five errors in a row, we abandon monitoring.
if err_count >= 5 {
debug!("Failed to get app status after 5 attempts, giving up");
debug!("Failed to get handle status after 5 attempts, giving up");
output::error("An error occured while monitoring your local run status!");
return tower_runtime::Status::Crashed { code: -1 };
return Status::Crashed { code: -1 };
}

// Otherwise, keep on keepin' on.
Expand Down
13 changes: 9 additions & 4 deletions crates/tower-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@ rust-version = { workspace = true }
license = { workspace = true }

[dependencies]
async-trait = { workspace = true }
chrono = { workspace = true }
nix = { workspace = true }
snafu = { workspace = true }
snafu = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
tower-package = { workspace = true }
tower-telemetry = { workspace = true }
tower-uv = { workspace = true }
tower-package = { workspace = true }
tower-telemetry = { workspace = true }
tower-uv = { workspace = true }

# K8s dependencies (optional)
k8s-openapi = { workspace = true }
kube = { workspace = true }

[dev-dependencies]
config = { workspace = true }
90 changes: 90 additions & 0 deletions crates/tower-runtime/src/backends/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//! Simple backend for CLI --local runs
//!
//! This backend follows a simpler pattern than SubprocessBackend:
//! - The caller creates the output channel and passes the sender
//! - The caller keeps the receiver for direct consumption
//! - No need for complex handle.logs() method
//! - Single consumer model (better for CLI)

use crate::errors::Error;
use crate::local::LocalApp;
use crate::{App, OutputSender, StartOptions, Status};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
use tower_package::Package;

/// CliBackend executes apps as local subprocesses for CLI --local runs
pub struct CliBackend {
/// Optional default cache directory to use
cache_dir: Option<PathBuf>,
}

impl CliBackend {
pub fn new(cache_dir: Option<PathBuf>) -> Self {
Self { cache_dir }
}

/// Launch an app with the given parameters
///
/// Unlike SubprocessBackend.create(), this takes OutputSender directly
/// so the caller can immediately start consuming output from their receiver.
pub async fn launch(
&self,
ctx: tower_telemetry::Context,
output_sender: OutputSender,
package: Package,
environment: String,
secrets: HashMap<String, String>,
parameters: HashMap<String, String>,
env_vars: HashMap<String, String>,
) -> Result<CliHandle, Error> {
let opts = StartOptions {
ctx,
package,
cwd: None, // LocalApp determines cwd from package
environment,
secrets,
parameters,
env_vars,
output_sender,
cache_dir: self.cache_dir.clone(),
};

// Start the LocalApp
let app = LocalApp::start(opts).await?;

Ok(CliHandle {
app: Arc::new(Mutex::new(Some(app))),
})
}
}

/// CliHandle provides lifecycle management for a CLI local subprocess execution
pub struct CliHandle {
app: Arc<Mutex<Option<LocalApp>>>,
}

impl CliHandle {
/// Get current execution status
pub async fn status(&self) -> Result<Status, Error> {
let guard = self.app.lock().await;
if let Some(app) = guard.as_ref() {
app.status().await
} else {
// App has already been terminated
Ok(Status::Crashed { code: -1 })
}
}

/// Terminate execution gracefully
pub async fn terminate(&mut self) -> Result<(), Error> {
let mut guard = self.app.lock().await;
if let Some(mut app) = guard.take() {
app.terminate().await
} else {
Ok(())
}
}
}
Loading