diff --git a/Cargo.lock b/Cargo.lock index 786e2744..5ea25d7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -168,6 +168,17 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "async_zip" version = "0.0.16" @@ -480,7 +491,7 @@ dependencies = [ [[package]] name = "config" -version = "0.3.42" +version = "0.3.43" dependencies = [ "base64", "chrono", @@ -587,7 +598,7 @@ checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crypto" -version = "0.3.42" +version = "0.3.43" dependencies = [ "aes-gcm", "base64", @@ -2789,18 +2800,28 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -3215,7 +3236,7 @@ dependencies = [ [[package]] name = "testutils" -version = "0.3.42" +version = "0.3.43" dependencies = [ "pem", "rsa", @@ -3485,7 +3506,7 @@ checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" [[package]] name = "tower" -version = "0.3.42" +version = "0.3.43" dependencies = [ "tokio", "tower-api", @@ -3510,7 +3531,7 @@ dependencies = [ [[package]] name = "tower-api" -version = "0.3.42" +version = "0.3.43" dependencies = [ "reqwest", "serde", @@ -3522,7 +3543,7 @@ dependencies = [ [[package]] name = "tower-cmd" -version = "0.3.42" +version = "0.3.43" dependencies = [ "axum", "bytes", @@ -3592,7 +3613,7 @@ checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-package" -version = "0.3.42" +version = "0.3.43" dependencies = [ "async-compression", "config", @@ -3605,18 +3626,19 @@ dependencies = [ "tokio", "tokio-stream", "tokio-tar", - "tokio-util", "tower-telemetry", ] [[package]] name = "tower-runtime" -version = "0.3.42" +version = "0.3.43" dependencies = [ + "async-trait", "chrono", "config", "nix 0.30.1", "snafu", + "tmpdir", "tokio", "tokio-util", "tower-package", @@ -3632,7 +3654,7 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tower-telemetry" -version = "0.3.42" +version = "0.3.43" dependencies = [ "tracing", "tracing-appender", @@ -3641,7 +3663,7 @@ dependencies = [ [[package]] name = "tower-uv" -version = "0.3.42" +version = "0.3.43" dependencies = [ "async-compression", "async_zip", @@ -3655,7 +3677,7 @@ dependencies = [ [[package]] name = "tower-version" -version = "0.3.42" +version = "0.3.43" dependencies = [ "anyhow", "chrono", @@ -3862,9 +3884,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" +checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" dependencies = [ "getrandom 0.3.3", "js-sys", diff --git a/Cargo.toml b/Cargo.toml index aa5c264f..a6fc1635 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ resolver = "2" [workspace.package] edition = "2021" -version = "0.3.42" +version = "0.3.43" description = "Tower is the best way to host Python data apps in production" rust-version = "1.81" authors = ["Brad Heller "] @@ -15,6 +15,7 @@ repository = "https://github.com/tower/tower-cli" aes-gcm = "0.10" anyhow = "1.0.95" async-compression = { version = "0.4", features = ["tokio", "gzip"] } +async-trait = "0.1.89" async_zip = { version = "0.0.16", features = ["tokio", "tokio-fs", "deflate"] } axum = "0.8.4" base64 = "0.22" diff --git a/crates/config/src/session.rs b/crates/config/src/session.rs index 53830ccf..f7a22b07 100644 --- a/crates/config/src/session.rs +++ b/crates/config/src/session.rs @@ -21,7 +21,9 @@ fn extract_aid_from_jwt(jwt: &str) -> Option { let payload = parts[1]; let decoded = URL_SAFE_NO_PAD.decode(payload).ok()?; let json: serde_json::Value = serde_json::from_slice(&decoded).ok()?; - json.get("https://tower.dev/aid")?.as_str().map(String::from) + json.get("https://tower.dev/aid")? + .as_str() + .map(String::from) } const DEFAULT_TOWER_URL: &str = "https://api.tower.dev"; diff --git a/crates/tower-cmd/src/api.rs b/crates/tower-cmd/src/api.rs index 8a49b4a3..772bfb21 100644 --- a/crates/tower-cmd/src/api.rs +++ b/crates/tower-cmd/src/api.rs @@ -367,6 +367,17 @@ pub enum LogStreamError { Unknown, } +impl std::fmt::Display for LogStreamError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + LogStreamError::Reqwest(err) => write!(f, "{err}"), + LogStreamError::Unknown => write!(f, "unknown log stream error"), + } + } +} + +impl std::error::Error for LogStreamError {} + impl From for LogStreamError { fn from(err: reqwest_eventsource::CannotCloneRequestError) -> Self { debug!("Failed to clone request {:?}", err); @@ -399,10 +410,30 @@ async fn drain_run_logs_stream(mut source: EventSource, tx: mpsc::Sender { let event_warning = serde_json::from_str(&message.data); - if let Ok(event) = event_warning { - tx.send(LogStreamEvent::EventWarning(event)).await.ok(); - } else { - debug!("Failed to parse warning message: {:?}", message.data); + match event_warning { + Ok(event) => { + tx.send(LogStreamEvent::EventWarning(event)).await.ok(); + } + Err(err) => { + let warning_data = serde_json::from_str(&message.data); + match warning_data { + Ok(data) => { + let event = tower_api::models::EventWarning { + data, + event: tower_api::models::event_warning::Event::Warning, + id: None, + retry: None, + }; + tx.send(LogStreamEvent::EventWarning(event)).await.ok(); + } + Err(_) => { + debug!( + "Failed to parse warning message: {:?}. Error: {}", + message.data, err + ); + } + } + } } } _ => { diff --git a/crates/tower-cmd/src/apps.rs b/crates/tower-cmd/src/apps.rs index db86e857..31652eea 100644 --- a/crates/tower-cmd/src/apps.rs +++ b/crates/tower-cmd/src/apps.rs @@ -1,7 +1,9 @@ use clap::{value_parser, Arg, ArgMatches, Command}; use config::Config; +use tokio::sync::oneshot; +use tokio::time::{sleep, Duration, Instant}; -use tower_api::models::Run; +use tower_api::models::{Run, RunLogLine}; use crate::{api, output}; @@ -19,6 +21,13 @@ pub fn apps_cmd() -> Command { ) .subcommand( Command::new("logs") + .arg( + Arg::new("follow") + .short('f') + .long("follow") + .help("Follow logs in real time") + .action(clap::ArgAction::SetTrue), + ) .allow_external_subcommands(true) .override_usage("tower apps logs [OPTIONS] #") .after_help("Example: tower apps logs hello-world#11") @@ -54,6 +63,12 @@ pub fn apps_cmd() -> Command { pub async fn do_logs(config: Config, cmd: &ArgMatches) { let (name, seq) = extract_app_name_and_run("logs", cmd.subcommand()); + let follow = cmd.get_one::("follow").copied().unwrap_or(false); + + if follow { + follow_logs(config, name, seq).await; + return; + } if let Ok(resp) = api::describe_run_logs(&config, &name, seq).await { for line in resp.log_lines { @@ -217,3 +232,471 @@ fn extract_app_name(subcmd: &str, cmd: Option<(&str, &ArgMatches)>) -> String { ); output::die(&line); } + +const FOLLOW_BACKOFF_INITIAL: Duration = Duration::from_millis(500); +const FOLLOW_BACKOFF_MAX: Duration = Duration::from_secs(5); +const LOG_DRAIN_DURATION: Duration = Duration::from_secs(5); +const RUN_START_POLL_INTERVAL: Duration = Duration::from_millis(500); +const RUN_START_MESSAGE_DELAY: Duration = Duration::from_secs(3); + +async fn follow_logs(config: Config, name: String, seq: i64) { + let enable_ctrl_c = !output::get_output_mode().is_mcp(); + let mut backoff = FOLLOW_BACKOFF_INITIAL; + let mut cancel_monitor: Option> = None; + let mut last_line_num: Option = None; + + loop { + let mut run = match api::describe_run(&config, &name, seq).await { + Ok(res) => res.run, + Err(err) => output::tower_error_and_die(err, "Fetching run details failed"), + }; + + if is_run_finished(&run) { + if let Ok(resp) = api::describe_run_logs(&config, &name, seq).await { + for line in resp.log_lines { + emit_log_if_new(&line, &mut last_line_num); + } + } + return; + } + + if !is_run_started(&run) { + let wait_started = Instant::now(); + let mut notified = false; + loop { + sleep(RUN_START_POLL_INTERVAL).await; + // Avoid blank output on slow starts while keeping fast starts quiet. + if should_notify_run_wait(notified, wait_started.elapsed()) { + output::write("Waiting for run to start...\n"); + notified = true; + } + run = match api::describe_run(&config, &name, seq).await { + Ok(res) => res.run, + Err(err) => output::tower_error_and_die(err, "Fetching run details failed"), + }; + if is_run_finished(&run) { + if let Ok(resp) = api::describe_run_logs(&config, &name, seq).await { + for line in resp.log_lines { + emit_log_if_new(&line, &mut last_line_num); + } + } + return; + } + if is_run_started(&run) { + break; + } + } + } + + // Cancel any prior watcher so we don't accumulate pollers after reconnects. + if let Some(cancel) = cancel_monitor.take() { + let _ = cancel.send(()); + } + let (cancel_tx, cancel_rx) = oneshot::channel(); + cancel_monitor = Some(cancel_tx); + let run_complete = monitor_run_completion(&config, &name, seq, cancel_rx); + match api::stream_run_logs(&config, &name, seq).await { + Ok(log_stream) => { + // Reset after a successful connection so transient drops recover quickly. + backoff = FOLLOW_BACKOFF_INITIAL; + match stream_logs_until_complete( + log_stream, + run_complete, + enable_ctrl_c, + &run.dollar_link, + &mut last_line_num, + ) + .await + { + Ok(LogFollowOutcome::Completed) => { + if let Some(cancel) = cancel_monitor.take() { + let _ = cancel.send(()); + } + return; + } + Ok(LogFollowOutcome::Interrupted) => { + if let Some(cancel) = cancel_monitor.take() { + let _ = cancel.send(()); + } + return; + } + Ok(LogFollowOutcome::Disconnected) => {} + Err(_) => { + if let Some(cancel) = cancel_monitor.take() { + let _ = cancel.send(()); + } + return; + } + } + } + Err(err) => { + if is_fatal_stream_error(&err) { + output::error(&format!("Failed to stream run logs: {}", err)); + return; + } + output::error(&format!("Failed to stream run logs: {}", err)); + sleep(backoff).await; + backoff = next_backoff(backoff); + continue; + } + } + + let latest = match api::describe_run(&config, &name, seq).await { + Ok(res) => res.run, + Err(err) => output::tower_error_and_die(err, "Fetching run details failed"), + }; + if is_run_finished(&latest) { + return; + } + + sleep(backoff).await; + backoff = next_backoff(backoff); + } +} + +fn next_backoff(current: Duration) -> Duration { + let next = current.checked_mul(2).unwrap_or(FOLLOW_BACKOFF_MAX); + if next > FOLLOW_BACKOFF_MAX { + FOLLOW_BACKOFF_MAX + } else { + next + } +} + +enum LogFollowOutcome { + Completed, + Disconnected, + Interrupted, +} + +async fn stream_logs_until_complete( + mut log_stream: tokio::sync::mpsc::Receiver, + mut run_complete: oneshot::Receiver, + enable_ctrl_c: bool, + run_link: &str, + last_line_num: &mut Option, +) -> Result { + loop { + tokio::select! { + event = log_stream.recv() => match event { + Some(api::LogStreamEvent::EventLog(log)) => { + emit_log_if_new(&log, last_line_num); + }, + Some(api::LogStreamEvent::EventWarning(warning)) => { + output::write(&format!("Warning: {}\n", warning.data.content)); + } + None => return Ok(LogFollowOutcome::Disconnected), + }, + res = &mut run_complete => { + match res { + Ok(_) => { + drain_remaining_logs(log_stream, last_line_num).await; + return Ok(LogFollowOutcome::Completed); + } + // If monitoring failed, keep following and let the caller retry. + Err(_) => return Ok(LogFollowOutcome::Disconnected), + } + }, + _ = tokio::signal::ctrl_c(), if enable_ctrl_c => { + output::write("Received Ctrl+C, stopping log streaming...\n"); + output::write("Note: The run will continue in Tower cloud\n"); + output::write(&format!(" See more: {}\n", run_link)); + return Ok(LogFollowOutcome::Interrupted); + }, + } + } +} + +async fn drain_remaining_logs( + mut log_stream: tokio::sync::mpsc::Receiver, + last_line_num: &mut Option, +) { + let _ = tokio::time::timeout(LOG_DRAIN_DURATION, async { + while let Some(event) = log_stream.recv().await { + match event { + api::LogStreamEvent::EventLog(log) => { + emit_log_if_new(&log, last_line_num); + } + api::LogStreamEvent::EventWarning(warning) => { + output::write(&format!("Warning: {}\n", warning.data.content)); + } + } + } + }) + .await; +} + +fn emit_log_if_new(log: &RunLogLine, last_line_num: &mut Option) { + if should_emit_line(last_line_num, log.line_num) { + output::remote_log_event(log); + } +} + +fn should_emit_line(last_line_num: &mut Option, line_num: i64) -> bool { + if last_line_num.map_or(true, |last| line_num > last) { + *last_line_num = Some(line_num); + true + } else { + false + } +} + +fn is_fatal_stream_error(err: &api::LogStreamError) -> bool { + match err { + api::LogStreamError::Reqwest(reqwest_err) => reqwest_err + .status() + .map(|status| status.is_client_error() && status.as_u16() != 429) + .unwrap_or(false), + api::LogStreamError::Unknown => false, + } +} + +fn monitor_run_completion( + config: &Config, + app_name: &str, + seq: i64, + mut cancel: oneshot::Receiver<()>, +) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + let config_clone = config.clone(); + let app_name = app_name.to_string(); + + tokio::spawn(async move { + let mut failures = 0; + loop { + tokio::select! { + _ = &mut cancel => return, + result = api::describe_run(&config_clone, &app_name, seq) => match result { + Ok(res) => { + failures = 0; + if is_run_finished(&res.run) { + let _ = tx.send(res.run); + return; + } + } + Err(_) => { + failures += 1; + if failures >= 5 { + output::error( + "Failed to monitor run completion after repeated errors", + ); + return; + } + } + }, + } + sleep(Duration::from_millis(500)).await; + } + }); + + rx +} + +fn is_run_finished(run: &Run) -> bool { + match run.status { + // Be explicit about terminal states so new non-terminal statuses + // don't cause us to stop following logs too early. + tower_api::models::run::Status::Crashed + | tower_api::models::run::Status::Errored + | tower_api::models::run::Status::Exited + | tower_api::models::run::Status::Cancelled => true, + _ => false, + } +} + +fn is_run_started(run: &Run) -> bool { + match run.status { + tower_api::models::run::Status::Scheduled | tower_api::models::run::Status::Pending => { + false + } + _ => true, + } +} + +fn should_notify_run_wait(already_notified: bool, elapsed: Duration) -> bool { + !already_notified && elapsed >= RUN_START_MESSAGE_DELAY +} + +#[cfg(test)] +mod tests { + use super::is_run_finished; + use super::{ + apps_cmd, is_run_started, next_backoff, should_emit_line, should_notify_run_wait, + stream_logs_until_complete, LogFollowOutcome, FOLLOW_BACKOFF_INITIAL, FOLLOW_BACKOFF_MAX, + }; + use tokio::sync::{mpsc, oneshot}; + use tokio::time::Duration; + use tower_api::models::run::Status; + use tower_api::models::Run; + + #[test] + fn test_follow_flag_parsing() { + let matches = apps_cmd() + .try_get_matches_from(["apps", "logs", "--follow", "hello-world#11"]) + .unwrap(); + let (cmd, sub_matches) = matches.subcommand().unwrap(); + + assert_eq!(cmd, "logs"); + assert_eq!(sub_matches.get_one::("follow"), Some(&true)); + assert_eq!( + sub_matches.subcommand().map(|(name, _)| name), + Some("hello-world#11") + ); + } + + #[test] + fn test_terminal_statuses_explicit() { + let non_terminal = [Status::Scheduled, Status::Pending, Status::Running]; + for status in non_terminal { + let run = Run { + status, + ..Default::default() + }; + assert!(!is_run_finished(&run)); + } + + let terminal = [ + Status::Crashed, + Status::Errored, + Status::Exited, + Status::Cancelled, + ]; + for status in terminal { + let run = Run { + status, + ..Default::default() + }; + assert!(is_run_finished(&run)); + } + } + + #[test] + fn test_status_variants_exhaustive() { + let status = Status::Scheduled; + match status { + Status::Scheduled => {} + Status::Pending => {} + Status::Running => {} + Status::Crashed => {} + Status::Errored => {} + Status::Exited => {} + Status::Cancelled => {} + } + } + + #[test] + fn test_run_started_statuses() { + let not_started = [Status::Scheduled, Status::Pending]; + for status in not_started { + let run = Run { + status, + ..Default::default() + }; + assert!(!is_run_started(&run)); + } + + let started = [ + Status::Running, + Status::Crashed, + Status::Errored, + Status::Exited, + Status::Cancelled, + ]; + for status in started { + let run = Run { + status, + ..Default::default() + }; + assert!(is_run_started(&run)); + } + } + + #[test] + fn test_run_wait_notification_logic() { + assert!(!should_notify_run_wait( + true, + super::RUN_START_MESSAGE_DELAY + )); + assert!(!should_notify_run_wait( + false, + super::RUN_START_MESSAGE_DELAY - Duration::from_millis(1) + )); + assert!(should_notify_run_wait( + false, + super::RUN_START_MESSAGE_DELAY + )); + } + + #[tokio::test] + async fn test_stream_completion_on_run_finish() { + let (tx, rx) = mpsc::channel(1); + let (done_tx, done_rx) = oneshot::channel(); + let mut last_line_num = None; + + let done_task = tokio::spawn(async move { + let _ = done_tx.send(Run::default()); + tokio::time::sleep(Duration::from_millis(10)).await; + drop(tx); + }); + + let res = stream_logs_until_complete(rx, done_rx, false, "link", &mut last_line_num).await; + done_task.await.unwrap(); + + assert!(matches!(res, Ok(LogFollowOutcome::Completed))); + } + + #[tokio::test] + async fn test_stream_disconnection_on_close() { + let (tx, rx) = mpsc::channel(1); + drop(tx); + let (_done_tx, done_rx) = oneshot::channel::(); + let mut last_line_num = None; + + let res = stream_logs_until_complete(rx, done_rx, false, "link", &mut last_line_num).await; + + assert!(matches!(res, Ok(LogFollowOutcome::Disconnected))); + } + + #[test] + fn test_backoff_growth_and_cap() { + let mut backoff = FOLLOW_BACKOFF_INITIAL; + backoff = next_backoff(backoff); + assert_eq!(backoff, Duration::from_secs(1)); + backoff = next_backoff(backoff); + assert_eq!(backoff, Duration::from_secs(2)); + backoff = next_backoff(backoff); + assert_eq!(backoff, Duration::from_secs(4)); + backoff = next_backoff(backoff); + assert_eq!(backoff, FOLLOW_BACKOFF_MAX); + backoff = next_backoff(backoff); + assert_eq!(backoff, FOLLOW_BACKOFF_MAX); + } + + #[test] + fn test_duplicate_line_filtering() { + let mut last_line_num = None; + assert!(should_emit_line(&mut last_line_num, 1)); + assert_eq!(last_line_num, Some(1)); + assert!(!should_emit_line(&mut last_line_num, 1)); + assert_eq!(last_line_num, Some(1)); + assert!(!should_emit_line(&mut last_line_num, 0)); + assert_eq!(last_line_num, Some(1)); + assert!(should_emit_line(&mut last_line_num, 2)); + assert_eq!(last_line_num, Some(2)); + assert!(should_emit_line(&mut last_line_num, 10)); + assert_eq!(last_line_num, Some(10)); + } + + #[test] + fn test_out_of_order_log_handling() { + let mut last_line_num = None; + assert!(should_emit_line(&mut last_line_num, 1)); + assert_eq!(last_line_num, Some(1)); + assert!(should_emit_line(&mut last_line_num, 3)); + assert_eq!(last_line_num, Some(3)); + assert!(!should_emit_line(&mut last_line_num, 2)); + assert_eq!(last_line_num, Some(3)); + assert!(should_emit_line(&mut last_line_num, 4)); + assert_eq!(last_line_num, Some(4)); + } +} diff --git a/crates/tower-cmd/src/lib.rs b/crates/tower-cmd/src/lib.rs index 483f7c48..cd4fcbda 100644 --- a/crates/tower-cmd/src/lib.rs +++ b/crates/tower-cmd/src/lib.rs @@ -85,8 +85,10 @@ impl App { if let Some(latest) = Self::check_latest_version().await { let current = tower_version::current_version(); - if current != latest { - output::write_update_message(&latest, ¤t); + if tower_version::is_older_version(current, &latest) { + output::write_update_available_message(&latest, current); + } else if tower_version::is_newer_version(current, &latest) { + output::write_dev_version_message(current, &latest); } } diff --git a/crates/tower-cmd/src/mcp.rs b/crates/tower-cmd/src/mcp.rs index a7a5974d..af0a5b27 100644 --- a/crates/tower-cmd/src/mcp.rs +++ b/crates/tower-cmd/src/mcp.rs @@ -597,7 +597,7 @@ impl TowerService { } #[tool( - description = "Deploy your app to Tower cloud. Prerequisites: 1) Create Towerfile, 2) Create app with tower_apps_create. Optional working_directory parameter specifies which project directory to deploy from." + description = "Deploy to Tower cloud. Prerequisites: Towerfile, tower_apps_create. Optional: working_directory." )] async fn tower_deploy( &self, @@ -612,7 +612,7 @@ impl TowerService { } #[tool( - description = "Run your app locally using the local Towerfile and source files. Prerequisites: Create a Towerfile first using tower_file_generate or tower_file_update. Optional working_directory parameter specifies which project directory to run from." + description = "Run locally using Towerfile (preferred during development, has access to tower secrets). Prerequisites: Towerfile. Optional: working_directory." )] async fn tower_run_local( &self, @@ -651,7 +651,7 @@ impl TowerService { } #[tool( - description = "Run your app remotely on Tower cloud. Prerequisites: 1) Create Towerfile, 2) Create app with tower_apps_create, 3) Deploy with tower_deploy" + description = "Run on Tower cloud. Prerequisites: Towerfile, tower_apps_create, tower_deploy." )] async fn tower_run_remote( &self, @@ -702,9 +702,7 @@ impl TowerService { } } - #[tool( - description = "Read and parse the current Towerfile configuration. Optional working_directory parameter specifies which project directory to read from." - )] + #[tool(description = "Read and parse Towerfile configuration. Optional: working_directory.")] async fn tower_file_read( &self, Parameters(request): Parameters, @@ -717,7 +715,7 @@ impl TowerService { } #[tool( - description = "Update Towerfile app configuration. Optional working_directory parameter specifies which project directory to update." + description = "Update Towerfile config (app name, script, description, source). Use this instead of editing TOML. Optional: working_directory." )] async fn tower_file_update( &self, @@ -752,7 +750,7 @@ impl TowerService { } #[tool( - description = "Add a new parameter to the Towerfile. Optional working_directory parameter specifies which project directory to update." + description = "Add parameter to Towerfile. Use this instead of editing TOML. Optional: working_directory." )] async fn tower_file_add_parameter( &self, @@ -778,9 +776,7 @@ impl TowerService { } } - #[tool( - description = "Validate the current Towerfile configuration. Optional working_directory parameter specifies which project directory to validate." - )] + #[tool(description = "Validate Towerfile configuration. Optional: working_directory.")] async fn tower_file_validate( &self, Parameters(request): Parameters, @@ -793,7 +789,7 @@ impl TowerService { } #[tool( - description = "Generate Towerfile from pyproject.toml. This is typically the first step in the workflow. Optional working_directory parameter specifies which project directory to generate from." + description = "Generate Towerfile from pyproject.toml (first step). Don't add build systems to pyproject.toml. Optional: working_directory." )] async fn tower_file_generate( &self, @@ -827,48 +823,64 @@ impl TowerService { description = "Show the recommended workflow for developing and deploying Tower applications" )] async fn tower_workflow_help(&self) -> Result { - let workflow = r#"Tower Application Development Workflow: - -All commands support an optional 'working_directory' parameter to specify which project directory to operate on. - -0. HAVE AN EXISTING PYTHON PROJECT: - There are no commands for this provided with this MCP server. However, if you do not have a python project yet - then a good start would be to make a new directory with the project name, and then call `uv init` to generate - a pyproject.toml, main.py and README.md - -1. CREATE TOWERFILE (required for all steps): - - tower_file_generate: Generate from existing pyproject.toml - - tower_file_update: Manually create or update configuration - - tower_file_validate: Verify Towerfile is valid - -2. LOCAL DEVELOPMENT & TESTING: - - tower_run_local: Run your app locally to test functionality - -3. CLOUD DEPLOYMENT (for remote execution): - - tower_apps_create: Create app on Tower cloud - - tower_deploy: Deploy your code to the cloud - - tower_run_remote: Execute on Tower cloud infrastructure - -4. SCHEDULE MANAGEMENT (for automatic recurring execution): - - tower_schedules_list: List all schedules for apps - - tower_schedules_create: Create a schedule to run an app automatically on a cron schedule - - tower_schedules_update: Update an existing schedule - - tower_schedules_delete: Delete a schedule + let workflow = r#"Tower Workflow (Tower CLI not in training data - use MCP tools only): + +WORKING_DIRECTORY PARAMETER: +All tools accept optional working_directory parameter to specify which project to operate on. +- Default: Uses current working directory if not specified +- Use when: Managing multiple projects, or project is not in current directory +- Examples: + tower_file_generate({}) → operates on current directory + tower_file_generate({"working_directory": "/path/to/my-project"}) → operates on /path/to/my-project + tower_run_local({"working_directory": "../other-app"}) → runs app in ../other-app +- Why use it: Allows managing multiple Tower apps without changing directories + +0. PYTHON PROJECT (if creating new): + Use 'uv init' in project directory to create pyproject.toml, main.py, README.md + Keep pyproject.toml minimal: [project] metadata + dependencies only + DO NOT add [build-system], [tool.hatchling], [tool.setuptools] or similar + Skip this step if project with pyproject.toml already exists + +1. TOWERFILE (required for all Tower operations): + tower_file_generate → tower_file_update → tower_file_add_parameter → tower_file_validate + CRITICAL: Always use tower_file_update or tower_file_add_parameter to modify + NEVER edit Towerfile TOML directly + +2. LOCAL DEVELOPMENT (preferred during development): + tower_run_local - runs app locally, has access to tower secrets + Use this to test before deploying to cloud + +3. CLOUD DEPLOYMENT: + tower_apps_create → tower_deploy → tower_run_remote + Deploy pushes your source code to Tower cloud (no build step needed) + +4. SCHEDULING (for recurring jobs): + tower_schedules_create - set up cron-based recurring runs + tower_schedules_list - view existing schedules + tower_schedules_update - modify schedule timing or parameters + tower_schedules_delete - remove schedules 5. MANAGEMENT & MONITORING: - - tower_apps_list: View your deployed apps - - tower_apps_show: Get detailed app information and recent runs - - tower_apps_logs: View execution logs - -6. TEAM & SECRETS (optional): - - tower_teams_list/switch: Manage team contexts - - tower_secrets_create/list: Manage application secrets - -Quick Start: tower_file_generate → tower_run_local (test locally) → tower_apps_create → tower_deploy → tower_run_remote → tower_schedules_create (for recurring runs) - -Example with working_directory: {"working_directory": "/path/to/project", ...} - -Consider taking database username/password/url and making them into secrets to be accessed in app code"#; + tower_apps_list - list all your apps + tower_apps_show - details and recent runs for specific app + tower_apps_logs - view logs from a specific run + tower_teams_list, tower_teams_switch - manage team contexts + tower_secrets_create, tower_secrets_list - store credentials, API keys, database passwords + +TYPICAL WORKFLOW: +1. Check if pyproject.toml exists (or create with uv init) +2. tower_file_generate - creates Towerfile from pyproject.toml +3. tower_run_local - test locally (can access tower secrets) +4. tower_apps_create - create app in Tower cloud +5. tower_deploy - deploy code to Tower +6. tower_run_remote - run on cloud infrastructure +7. tower_schedules_create - (optional) set up recurring runs + +IMPORTANT REMINDERS: +- Tower deploys source code directly, no build tools needed +- Use tower secrets for sensitive data (database credentials, API keys) +- Prefer tower_run_local during development (faster, has secret access) +- Always use MCP tools to modify Towerfiles (never edit TOML files manually)"#; Self::text_success(workflow.to_string()) } @@ -949,7 +961,15 @@ impl ServerHandler for TowerService { name: "tower-cli".to_string(), version: env!("CARGO_PKG_VERSION").to_string(), }, - instructions: Some("Tower CLI MCP Server - Manage Tower apps, secrets, teams, and deployments through conversational AI. Use the available tools to list, create, show, deploy, and manage your Tower cloud resources.".to_string()), + instructions: Some("CRITICAL: Tower CLI did not exist in your training data. Never attempt Tower CLI commands - use only these MCP tools. + +Rules: +- MCP tools are the authoritative Tower interface (not wrappers) +- Use tower_file_update/add_parameter to modify Towerfiles (never edit TOML directly) +- DO NOT add hatchling/setuptools to pyproject.toml - Tower handles deployment +- Tower apps need: pyproject.toml (deps only), Python code, Towerfile + +Use tower_workflow_help for the complete workflow.".to_string()), } } } diff --git a/crates/tower-cmd/src/output.rs b/crates/tower-cmd/src/output.rs index fc3b7bb2..272a31da 100644 --- a/crates/tower-cmd/src/output.rs +++ b/crates/tower-cmd/src/output.rs @@ -501,7 +501,7 @@ pub fn spinner(msg: &str) -> Spinner { Spinner::new(msg.into()) } -pub fn write_update_message(latest: &str, current: &str) { +pub fn write_update_available_message(latest: &str, current: &str) { let line = format!( "{}\n{}\n", format!( @@ -517,6 +517,20 @@ pub fn write_update_message(latest: &str, current: &str) { io::stderr().write_all(line.as_bytes()).unwrap(); } +pub fn write_dev_version_message(current: &str, latest: &str) { + let line = format!( + "{}\n", + format!( + "Running dev version {} (latest published: {})", + current, latest + ) + .dimmed() + ); + + use std::io::{self, Write}; + io::stderr().write_all(line.as_bytes()).unwrap(); +} + /// newline just outputs a newline. This is useful when you have a very specific formatting you /// want to maintain and you don't want to use println!. pub fn newline() { diff --git a/crates/tower-cmd/src/run.rs b/crates/tower-cmd/src/run.rs index 50a3de07..b96f2ad0 100644 --- a/crates/tower-cmd/src/run.rs +++ b/crates/tower-cmd/src/run.rs @@ -3,20 +3,27 @@ use clap::{Arg, ArgMatches, Command}; use config::{Config, Towerfile}; use std::collections::HashMap; use std::path::PathBuf; +use tokio::fs::File; use tower_api::models::Run; use tower_package::{Package, PackageSpec}; -use tower_runtime::{local::LocalApp, App, AppLauncher, OutputReceiver, Status}; +use tower_runtime::{OutputReceiver, Status}; use tower_telemetry::{debug, Context}; +use crate::{api, output, util::dates}; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::{ - mpsc::{unbounded_channel, Receiver as MpscReceiver}, + mpsc::Receiver as MpscReceiver, oneshot::{self, Receiver as OneshotReceiver}, Mutex, }; use tokio::time::{sleep, timeout, Duration}; - -use crate::{api, output, util::dates}; +use tower_runtime::execution::ExecutionHandle; +use tower_runtime::execution::{ + CacheBackend, CacheConfig, CacheIsolation, ExecutionBackend, ExecutionSpec, ResourceLimits, + RuntimeConfig as ExecRuntimeConfig, +}; +use tower_runtime::subprocess::SubprocessBackend; pub fn run_cmd() -> Command { Command::new("run") @@ -148,7 +155,7 @@ where env_vars.insert("TOWER_URL".to_string(), config.tower_url.to_string()); // There should always be a session, if there isn't one then I'm not sure how we got here? - let session = config.session.ok_or(Error::NoSession)?; + let session = config.session.as_ref().ok_or(Error::NoSession)?; env_vars.insert("TOWER_JWT".to_string(), session.token.jwt.to_string()); @@ -162,34 +169,42 @@ where } } - // Build the package - let mut package = build_package(&towerfile).await?; - - // Unpack the package - package.unpack().await?; - - let (sender, receiver) = unbounded_channel(); - + // Build the package (creates tar.gz) + let package = build_package(&towerfile).await?; output::success(&format!("Launching app `{}`", towerfile.app.name)); - let output_task = tokio::spawn(output_handler(receiver)); - let mut launcher: AppLauncher = AppLauncher::default(); - launcher - .launch( - Context::new(), - sender, - package, - env.to_string(), - secrets, + // Open the tar.gz file as a stream + let package_path = package + .package_file_path + .as_ref() + .expect("Package must have a file path"); + let package_file = File::open(package_path).await?; + + let backend = SubprocessBackend::new(config.cache_dir.clone()); + let run_id = format!( + "cli-run-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + ); + let handle = backend + .create(build_cli_execution_spec( + config, + env, params, + secrets, env_vars, - config.cache_dir, - ) + package_file, + run_id, + )) .await?; + let receiver = handle.logs().await?; + let output_task = tokio::spawn(output_handler(receiver)); - // Monitor app output and status concurrently - let app = Arc::new(Mutex::new(launcher.app.unwrap())); - let status_task = tokio::spawn(monitor_local_status(Arc::clone(&app))); + // Monitor app status concurrently + let handle = Arc::new(Mutex::new(handle)); + let status_task = tokio::spawn(monitor_cli_status(Arc::clone(&handle))); // Wait for app to complete or SIGTERM let status_result = tokio::select! { @@ -199,7 +214,7 @@ where }, _ = tokio::signal::ctrl_c(), if !output::get_output_mode().is_mcp() => { output::write("\nReceived Ctrl+C, stopping local run...\n"); - app.lock().await.terminate().await.ok(); + handle.lock().await.terminate().await.ok(); return Ok(output_task.await.unwrap()); } }; @@ -222,6 +237,52 @@ where Ok(final_result) } +fn build_cli_execution_spec( + config: Config, + env: &str, + params: HashMap, + secrets: HashMap, + env_vars: HashMap, + package_stream: File, + run_id: String, +) -> ExecutionSpec { + let spec = ExecutionSpec { + id: run_id, + package_stream: Box::new(package_stream), + runtime: ExecRuntimeConfig { + image: "local".to_string(), + version: None, + cache: CacheConfig { + enable_bundle_cache: true, + enable_runtime_cache: true, + enable_dependency_cache: true, + backend: match config.cache_dir.clone() { + Some(dir) => CacheBackend::Local { cache_dir: dir }, + None => CacheBackend::None, + }, + isolation: CacheIsolation::None, + }, + entrypoint: None, + command: None, + }, + environment: env.to_string(), + secrets, + parameters: params, + env_vars, + resources: ResourceLimits { + cpu_millicores: None, + memory_mb: None, + storage_mb: None, + max_pids: None, + gpu_count: 0, + timeout_seconds: 3600, + }, + networking: None, + telemetry_ctx: Context::new(), + }; + spec +} + /// do_run_local is the entrypoint for running an app locally. It will load the Towerfile, build /// the package, and launch the app. The relevant package is cleaned up after execution is /// complete. @@ -595,8 +656,12 @@ async fn monitor_output(mut output: OutputReceiver) { /// monitor_local_status is a helper function that will monitor the status of a given app and waits for /// it to progress to a terminal state. -async fn monitor_local_status(app: Arc>) -> Status { - debug!("Starting status monitoring for LocalApp"); +async fn monitor_cli_status( + handle: Arc>, +) -> Status { + use tower_runtime::execution::ExecutionHandle as _; + + debug!("Starting status monitoring for CLI execution"); let mut check_count = 0; let mut err_count = 0; @@ -604,11 +669,11 @@ async fn monitor_local_status(app: Arc>) -> Status { check_count += 1; debug!( - "Status check #{}, attempting to get app status", + "Status check #{}, attempting to get CLI handle status", check_count ); - match app.lock().await.status().await { + match handle.lock().await.status().await { Ok(status) => { // We reset the error count to indicate that we can intermittently get statuses. err_count = 0; @@ -616,30 +681,27 @@ async fn monitor_local_status(app: Arc>) -> Status { match status { Status::Exited => { debug!("Run exited cleanly, stopping status monitoring"); - - // We're done. Exit this loop and function. return status; } Status::Crashed { .. } => { debug!("Run crashed, stopping status monitoring"); - - // We're done. Exit this loop and function. return status; } _ => { + debug!("Handle status: other, continuing to monitor"); sleep(Duration::from_millis(100)).await; } } } Err(e) => { - debug!("Failed to get app status: {:?}", e); + debug!("Failed to get handle status: {:?}", e); err_count += 1; // If we get five errors in a row, we abandon monitoring. if err_count >= 5 { - debug!("Failed to get app status after 5 attempts, giving up"); + debug!("Failed to get handle status after 5 attempts, giving up"); output::error("An error occured while monitoring your local run status!"); - return tower_runtime::Status::Crashed { code: -1 }; + return Status::Crashed { code: -1 }; } // Otherwise, keep on keepin' on. diff --git a/crates/tower-package/Cargo.toml b/crates/tower-package/Cargo.toml index 98efbaa4..1e302058 100644 --- a/crates/tower-package/Cargo.toml +++ b/crates/tower-package/Cargo.toml @@ -10,13 +10,12 @@ license = { workspace = true } async-compression = { workspace = true } config = { workspace = true } glob = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } sha2 = { workspace = true } -snafu = { workspace = true } +snafu = { workspace = true } tmpdir = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true } tokio-stream = { workspace = true } tokio-tar = { workspace = true } -tokio-util = { workspace = true } tower-telemetry = { workspace = true } diff --git a/crates/tower-runtime/Cargo.toml b/crates/tower-runtime/Cargo.toml index 8ab42fb5..37b47f62 100644 --- a/crates/tower-runtime/Cargo.toml +++ b/crates/tower-runtime/Cargo.toml @@ -7,14 +7,16 @@ rust-version = { workspace = true } license = { workspace = true } [dependencies] +async-trait = { workspace = true } chrono = { workspace = true } nix = { workspace = true } -snafu = { workspace = true } +snafu = { workspace = true } +tmpdir = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } -tower-package = { workspace = true } -tower-telemetry = { workspace = true } -tower-uv = { workspace = true } +tower-package = { workspace = true } +tower-telemetry = { workspace = true } +tower-uv = { workspace = true } [dev-dependencies] config = { workspace = true } diff --git a/crates/tower-runtime/src/errors.rs b/crates/tower-runtime/src/errors.rs index 19c8ed10..0326cd27 100644 --- a/crates/tower-runtime/src/errors.rs +++ b/crates/tower-runtime/src/errors.rs @@ -17,14 +17,14 @@ pub enum Error { #[snafu(display("not implemented"))] NotImplemented, - #[snafu(display("bundle download failed"))] - BundleDownloadFailed, + #[snafu(display("package download failed"))] + PackageDownloadFailed, - #[snafu(display("bundle create failed"))] - BundleCreateFailed, + #[snafu(display("package create failed"))] + PackageCreateFailed, - #[snafu(display("bundle unpack failed"))] - BundleUnpackFailed, + #[snafu(display("package unpack failed"))] + PackageUnpackFailed, #[snafu(display("container already initialized"))] AlreadyInitialized, @@ -64,6 +64,18 @@ pub enum Error { #[snafu(display("cancelled"))] Cancelled, + + #[snafu(display("app not started"))] + AppNotStarted, + + #[snafu(display("no execution handle"))] + NoHandle, + + #[snafu(display("invalid package"))] + InvalidPackage, + + #[snafu(display("dependency installation failed"))] + DependencyInstallationFailed, } impl From for Error { @@ -91,3 +103,9 @@ impl From for Error { } } } + +impl From for Error { + fn from(_: tower_package::Error) -> Self { + Error::PackageUnpackFailed + } +} diff --git a/crates/tower-runtime/src/execution.rs b/crates/tower-runtime/src/execution.rs new file mode 100644 index 00000000..47d6dd26 --- /dev/null +++ b/crates/tower-runtime/src/execution.rs @@ -0,0 +1,243 @@ +//! Execution backend abstraction for Tower +//! +//! This module provides traits and types for abstracting execution backends, +//! allowing Tower to support multiple compute substrates (local processes, +//! Kubernetes pods, etc.) through a uniform interface. + +use async_trait::async_trait; +use std::collections::HashMap; +use std::path::PathBuf; +use tokio::io::AsyncRead; + +use crate::errors::Error; +use crate::{OutputReceiver, Status}; + +// ============================================================================ +// Core Execution Types +// ============================================================================ + +/// ExecutionSpec describes what to execute and how +pub struct ExecutionSpec { + /// Unique identifier for this execution (e.g., run_id) + pub id: String, + + /// Package as a stream of tar.gz data + pub package_stream: Box, + + /// Runtime configuration (image, version, etc.) + pub runtime: RuntimeConfig, + + /// Environment name (e.g., "production", "staging", "default") + pub environment: String, + + /// Secret key-value pairs to inject + pub secrets: HashMap, + + /// Parameter key-value pairs to inject + pub parameters: HashMap, + + /// Additional environment variables + pub env_vars: HashMap, + + /// Resource limits for execution + pub resources: ResourceLimits, + + /// Networking configuration (for service workloads) + pub networking: Option, + + /// Telemetry context for tracing + pub telemetry_ctx: tower_telemetry::Context, +} + +/// RuntimeConfig specifies the execution runtime environment +#[derive(Debug, Clone)] +pub struct RuntimeConfig { + /// Runtime image to use (e.g., "towerhq/tower-runtime:python-3.11") + pub image: String, + + /// Specific version/tag if applicable + pub version: Option, + + /// Cache configuration + pub cache: CacheConfig, + + /// Entrypoint override (if not using bundle's default) + pub entrypoint: Option>, + + /// Command override (if not using bundle's default) + pub command: Option>, +} + +/// CacheConfig describes what should be cached +#[derive(Debug, Clone)] +pub struct CacheConfig { + /// Enable bundle caching (content-addressable by checksum) + pub enable_bundle_cache: bool, + + /// Enable runtime layer caching (container image layers) + pub enable_runtime_cache: bool, + + /// Enable dependency caching (language-specific, e.g., pip cache, node_modules) + pub enable_dependency_cache: bool, + + /// Cache backend to use + pub backend: CacheBackend, + + /// Cache isolation strategy + pub isolation: CacheIsolation, +} + +/// CacheIsolation defines security boundaries for caches +#[derive(Debug, Clone)] +pub enum CacheIsolation { + /// Global sharing (safe for immutable content-addressable caches) + Global, + + /// Per-account isolation + PerAccount { account_id: String }, + + /// Per-app isolation + PerApp { app_id: String }, + + /// No isolation + None, +} + +/// CacheBackend describes where caches are stored +#[derive(Debug, Clone)] +pub enum CacheBackend { + /// Local filesystem cache + Local { cache_dir: PathBuf }, + + /// No caching + None, +} + +/// ResourceLimits defines compute resource constraints +#[derive(Debug, Clone)] +pub struct ResourceLimits { + /// CPU limit in millicores (e.g., 1000 = 1 CPU) + pub cpu_millicores: Option, + + /// Memory limit in megabytes + pub memory_mb: Option, + + /// Ephemeral storage limit in megabytes + pub storage_mb: Option, + + /// Maximum number of processes + pub max_pids: Option, + + /// GPU count + pub gpu_count: u32, + + /// Execution timeout in seconds + pub timeout_seconds: u32, +} + +/// NetworkingSpec defines networking requirements +#[derive(Debug, Clone)] +pub struct NetworkingSpec { + /// Port the app listens on + pub port: u16, + + /// Whether this app needs a stable service endpoint + pub expose_service: bool, + + /// Service name (for DNS) + pub service_name: Option, +} + +// ============================================================================ +// Execution Backend Trait +// ============================================================================ + +/// ExecutionBackend abstracts the compute substrate +#[async_trait] +pub trait ExecutionBackend: Send + Sync { + /// The handle type this backend returns + type Handle: ExecutionHandle; + + /// Create a new execution environment + async fn create(&self, spec: ExecutionSpec) -> Result; + + /// Get backend capabilities + fn capabilities(&self) -> BackendCapabilities; + + /// Cleanup backend resources + async fn cleanup(&self) -> Result<(), Error>; +} + +/// BackendCapabilities describes what a backend supports +#[derive(Debug, Clone)] +pub struct BackendCapabilities { + /// Backend name + pub name: String, + + /// Supports persistent volumes for caching + pub supports_persistent_cache: bool, + + /// Supports pre-warmed environments + pub supports_prewarming: bool, + + /// Supports network isolation + pub supports_network_isolation: bool, + + /// Supports service endpoints + pub supports_service_endpoints: bool, + + /// Typical startup latency in milliseconds + pub typical_cold_start_ms: u64, + pub typical_warm_start_ms: u64, + + /// Maximum concurrent executions + pub max_concurrent_executions: Option, +} + +// ============================================================================ +// Execution Handle Trait +// ============================================================================ + +/// ExecutionHandle represents a running execution +#[async_trait] +pub trait ExecutionHandle: Send + Sync { + /// Get a unique identifier for this execution + fn id(&self) -> &str; + + /// Get current execution status + async fn status(&self) -> Result; + + /// Subscribe to log stream + async fn logs(&self) -> Result; + + /// Terminate execution gracefully + async fn terminate(&mut self) -> Result<(), Error>; + + /// Force kill execution + async fn kill(&mut self) -> Result<(), Error>; + + /// Wait for execution to complete + async fn wait_for_completion(&self) -> Result; + + /// Get service endpoint + async fn service_endpoint(&self) -> Result, Error>; + + /// Cleanup resources + async fn cleanup(&mut self) -> Result<(), Error>; +} + +/// ServiceEndpoint describes how to reach a running service +#[derive(Debug, Clone)] +pub struct ServiceEndpoint { + /// Host/IP to connect to + pub host: String, + + /// Port to connect to + pub port: u16, + + /// Protocol (http, https, tcp, etc.) + pub protocol: String, + + /// Full URL if applicable (e.g., "http://app-run-123.default.svc.cluster.local:8080") + pub url: Option, +} diff --git a/crates/tower-runtime/src/lib.rs b/crates/tower-runtime/src/lib.rs index 195c3e26..74091edc 100644 --- a/crates/tower-runtime/src/lib.rs +++ b/crates/tower-runtime/src/lib.rs @@ -5,10 +5,11 @@ use std::path::PathBuf; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tower_package::Package; -use tower_telemetry::debug; pub mod errors; +pub mod execution; pub mod local; +pub mod subprocess; use errors::Error; @@ -47,7 +48,7 @@ pub type OutputReceiver = UnboundedReceiver; pub type OutputSender = UnboundedSender; -pub trait App { +pub trait App: Send + Sync { // start will start the process fn start(opts: StartOptions) -> impl Future> + Send where @@ -60,73 +61,6 @@ pub trait App { fn status(&self) -> impl Future> + Send; } -pub struct AppLauncher { - pub app: Option, -} - -impl std::default::Default for AppLauncher { - fn default() -> Self { - Self { app: None } - } -} - -impl AppLauncher { - pub async fn launch( - &mut self, - ctx: tower_telemetry::Context, - output_sender: OutputSender, - package: Package, - environment: String, - secrets: HashMap, - parameters: HashMap, - env_vars: HashMap, - cache_dir: Option, - ) -> Result<(), Error> { - let cwd = package.unpacked_path.clone().unwrap().to_path_buf(); - - let opts = StartOptions { - ctx, - output_sender, - cwd: Some(cwd), - environment, - secrets, - parameters, - package, - env_vars, - cache_dir, - }; - - // NOTE: This is a really awful hack to force any existing app to drop itself. Not certain - // this is exactly what we want to do... - self.app = None; - - let res = A::start(opts).await; - - if let Ok(app) = res { - self.app = Some(app); - Ok(()) - } else { - self.app = None; - Err(res.err().unwrap()) - } - } - - pub async fn terminate(&mut self) -> Result<(), Error> { - if let Some(app) = &mut self.app { - if let Err(err) = app.terminate().await { - debug!("failed to terminate app: {}", err); - Err(err) - } else { - self.app = None; - Ok(()) - } - } else { - // There's no app, so nothing to terminate. - Ok(()) - } - } -} - pub struct StartOptions { pub ctx: tower_telemetry::Context, pub package: Package, diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index ee4968e2..ec2ea64f 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -222,7 +222,13 @@ async fn execute_local_app( )); // Wait for venv to finish up. - wait_for_process(ctx.clone(), &cancel_token, child).await; + let res = wait_for_process(ctx.clone(), &cancel_token, child).await; + + if res != 0 { + // If the venv process failed, we want to return an error. + let _ = sx.send(res); + return Err(Error::VirtualEnvCreationFailed); + } // Check once more if the process was cancelled before we do a uv sync. The sync itself, // once started, will take a while and we have logic for checking for cancellation. @@ -269,7 +275,13 @@ async fn execute_local_app( )); // Let's wait for the setup to finish. We don't care about the results. - wait_for_process(ctx.clone(), &cancel_token, child).await; + let res = wait_for_process(ctx.clone(), &cancel_token, child).await; + + if res != 0 { + // If the sync process failed, we want to return an error. + let _ = sx.send(res); + return Err(Error::DependencyInstallationFailed); + } } } @@ -341,6 +353,18 @@ impl App for LocalApp { }) } + async fn terminate(&mut self) -> Result<(), Error> { + self.terminator.cancel(); + + // Now we should wait for the join handle to finish. + if let Some(execute_handle) = self.execute_handle.take() { + let _ = execute_handle.await; + self.execute_handle = None; + } + + Ok(()) + } + async fn status(&self) -> Result { let mut status = self.status.lock().await; @@ -367,18 +391,6 @@ impl App for LocalApp { } } } - - async fn terminate(&mut self) -> Result<(), Error> { - self.terminator.cancel(); - - // Now we should wait for the join handle to finish. - if let Some(execute_handle) = self.execute_handle.take() { - let _ = execute_handle.await; - self.execute_handle = None; - } - - Ok(()) - } } async fn execute_bash_program( diff --git a/crates/tower-runtime/src/subprocess.rs b/crates/tower-runtime/src/subprocess.rs new file mode 100644 index 00000000..713722f2 --- /dev/null +++ b/crates/tower-runtime/src/subprocess.rs @@ -0,0 +1,200 @@ +//! Subprocess execution backend + +use crate::errors::Error; +use crate::execution::{ + BackendCapabilities, CacheBackend, ExecutionBackend, ExecutionHandle, ExecutionSpec, + ServiceEndpoint, +}; +use crate::local::LocalApp; +use crate::{App, OutputReceiver, StartOptions, Status}; + +use async_trait::async_trait; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; +use tokio::sync::Mutex; +use tokio::time::Duration; +use tower_package::Package; + +/// SubprocessBackend executes apps as a subprocess +pub struct SubprocessBackend { + /// Optional default cache directory to use + cache_dir: Option, +} + +impl SubprocessBackend { + pub fn new(cache_dir: Option) -> Self { + Self { cache_dir } + } + + /// Receive package stream and unpack it + /// + /// Takes a stream of tar.gz data, saves it to a temp file, and unpacks it + /// Returns the Package (which keeps the temp directory alive) + async fn receive_and_unpack_package( + &self, + mut package_stream: Box, + ) -> Result { + // Create temp directory for this package + let temp_dir = tmpdir::TmpDir::new("tower-package") + .await + .map_err(|_| Error::PackageCreateFailed)?; + + // Save stream to tar.gz file + let tar_gz_path = temp_dir.to_path_buf().join("package.tar.gz"); + let mut file = File::create(&tar_gz_path) + .await + .map_err(|_| Error::PackageCreateFailed)?; + + tokio::io::copy(&mut package_stream, &mut file) + .await + .map_err(|_| Error::PackageCreateFailed)?; + + file.flush().await.map_err(|_| Error::PackageCreateFailed)?; + drop(file); + + // Unpack the package + let mut package = Package::default(); + package.package_file_path = Some(tar_gz_path); + package.tmp_dir = Some(temp_dir); + package.unpack().await?; + + Ok(package) + } +} + +#[async_trait] +impl ExecutionBackend for SubprocessBackend { + type Handle = SubprocessHandle; + + async fn create(&self, spec: ExecutionSpec) -> Result { + // Convert ExecutionSpec to StartOptions for LocalApp + let (output_sender, output_receiver) = tokio::sync::mpsc::unbounded_channel(); + + // Get cache_dir from spec or use backend default + let cache_dir = match &spec.runtime.cache.backend { + CacheBackend::Local { cache_dir } => Some(cache_dir.clone()), + _ => self.cache_dir.clone(), + }; + + // Receive package stream and unpack it + let package = self.receive_and_unpack_package(spec.package_stream).await?; + + let unpacked_path = package + .unpacked_path + .clone() + .ok_or(Error::PackageUnpackFailed)?; + + let opts = StartOptions { + ctx: spec.telemetry_ctx, + package: Package::from_unpacked_path(unpacked_path).await?, + cwd: None, // LocalApp determines cwd from package + environment: spec.environment, + secrets: spec.secrets, + parameters: spec.parameters, + env_vars: spec.env_vars, + output_sender: output_sender.clone(), + cache_dir, + }; + + // Start the LocalApp + let app = LocalApp::start(opts).await?; + + Ok(SubprocessHandle { + id: spec.id, + app: Arc::new(Mutex::new(app)), + output_receiver: Arc::new(Mutex::new(output_receiver)), + _package: package, // Keep package alive so temp dir doesn't get cleaned up + }) + } + + fn capabilities(&self) -> BackendCapabilities { + BackendCapabilities { + name: "local".to_string(), + supports_persistent_cache: true, + supports_prewarming: false, + supports_network_isolation: false, + supports_service_endpoints: false, + typical_cold_start_ms: 1000, // ~1s for venv + sync + typical_warm_start_ms: 100, // ~100ms with warm cache + max_concurrent_executions: None, // Limited by system resources + } + } + + async fn cleanup(&self) -> Result<(), Error> { + // Nothing to cleanup for local backend + Ok(()) + } +} + +/// SubprocessHandle provides lifecycle management for a subprocess execution +pub struct SubprocessHandle { + id: String, + app: Arc>, + output_receiver: Arc>, + _package: Package, // Keep package alive to prevent temp dir cleanup +} + +#[async_trait] +impl ExecutionHandle for SubprocessHandle { + fn id(&self) -> &str { + &self.id + } + + async fn status(&self) -> Result { + let app = self.app.lock().await; + app.status().await + } + + async fn logs(&self) -> Result { + // Create a new channel for log streaming + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + // Spawn a task to forward Output from the internal receiver + let output_receiver = self.output_receiver.clone(); + tokio::spawn(async move { + let mut receiver = output_receiver.lock().await; + while let Some(output) = receiver.recv().await { + if tx.send(output).is_err() { + break; // Receiver dropped + } + } + }); + + Ok(rx) + } + + async fn terminate(&mut self) -> Result<(), Error> { + let mut app = self.app.lock().await; + app.terminate().await + } + + async fn kill(&mut self) -> Result<(), Error> { + // For local processes, kill is the same as terminate + self.terminate().await + } + + async fn wait_for_completion(&self) -> Result { + loop { + let status = self.status().await?; + match status { + Status::None | Status::Running => { + tokio::time::sleep(Duration::from_millis(100)).await; + } + _ => return Ok(status), + } + } + } + + async fn service_endpoint(&self) -> Result, Error> { + // Local backend doesn't support service endpoints + Ok(None) + } + + async fn cleanup(&mut self) -> Result<(), Error> { + // Ensure the app is terminated + self.terminate().await?; + Ok(()) + } +} diff --git a/crates/tower-runtime/tests/example-apps/05-broken-dependencies/README.md b/crates/tower-runtime/tests/example-apps/05-broken-dependencies/README.md new file mode 100644 index 00000000..e69de29b diff --git a/crates/tower-runtime/tests/example-apps/05-broken-dependencies/Towerfile b/crates/tower-runtime/tests/example-apps/05-broken-dependencies/Towerfile new file mode 100644 index 00000000..0f5b958c --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/05-broken-dependencies/Towerfile @@ -0,0 +1,3 @@ +[app] +name = "05-broken-dependencies" +script = "./main.py" diff --git a/crates/tower-runtime/tests/example-apps/05-broken-dependencies/main.py b/crates/tower-runtime/tests/example-apps/05-broken-dependencies/main.py new file mode 100644 index 00000000..0efff4c8 --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/05-broken-dependencies/main.py @@ -0,0 +1 @@ +print("This should never run because dependency installation should fail") diff --git a/crates/tower-runtime/tests/example-apps/05-broken-dependencies/pyproject.toml b/crates/tower-runtime/tests/example-apps/05-broken-dependencies/pyproject.toml new file mode 100644 index 00000000..ce68498c --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/05-broken-dependencies/pyproject.toml @@ -0,0 +1,9 @@ +[project] +name = "05-broken-dependencies" +version = "0.1.0" +description = "App with a non-existent dependency to test failure handling" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "this-package-definitely-does-not-exist-xyz-123456789>=1.0.0", +] diff --git a/crates/tower-runtime/tests/local_test.rs b/crates/tower-runtime/tests/local_test.rs index d8ee4a6d..02069308 100644 --- a/crates/tower-runtime/tests/local_test.rs +++ b/crates/tower-runtime/tests/local_test.rs @@ -328,3 +328,50 @@ async fn test_running_app_with_secret() { let status = app.status().await.expect("Failed to get app status"); assert!(status == Status::Exited, "App should be running"); } + +#[tokio::test] +async fn test_abort_on_dependency_installation_failure() { + debug!("Running 05-broken-dependencies"); + // This test verifies that when dependency installation fails (uv sync returns non-zero), + // the app correctly reports a Crashed status rather than continuing execution. + let broken_deps_dir = get_example_app_dir("05-broken-dependencies"); + let package = build_package_from_dir(&broken_deps_dir).await; + let (sender, mut receiver) = unbounded_channel(); + + let opts = StartOptions { + ctx: tower_telemetry::Context::new(), + package, + output_sender: sender, + cwd: None, + environment: "local".to_string(), + secrets: HashMap::new(), + parameters: HashMap::new(), + env_vars: HashMap::new(), + cache_dir: Some(config::default_cache_dir()), + }; + + // Start the app using the LocalApp runtime + let app = LocalApp::start(opts).await.expect("Failed to start app"); + + // Drain all output - we need to consume the channel for the app to complete + while let Some(output) = receiver.recv().await { + debug!("Received output: {:?}", output.line); + } + + // The status should be Crashed since dependency installation failed + let status = app.status().await.expect("Failed to get app status"); + match status { + Status::Crashed { code } => { + assert!(code != 0, "Exit code should be non-zero, got {}", code); + } + Status::Exited => { + panic!("App should have crashed due to dependency installation failure, not exited successfully"); + } + Status::Running => { + panic!("App should not still be running"); + } + Status::None => { + panic!("App should have a status"); + } + } +} diff --git a/crates/tower-version/src/lib.rs b/crates/tower-version/src/lib.rs index 3deb1449..4d8202c6 100644 --- a/crates/tower-version/src/lib.rs +++ b/crates/tower-version/src/lib.rs @@ -42,3 +42,39 @@ pub async fn check_latest_version() -> Result> { } Ok(None) } + +fn parse_version(v: &str) -> Option<(u32, u32, u32)> { + let parts: Vec<_> = v.split('.').filter_map(|p| p.parse::().ok()).collect(); + (parts.len() == 3).then(|| (parts[0], parts[1], parts[2])) +} + +pub fn is_older_version(current: &str, latest: &str) -> bool { + matches!((parse_version(current), parse_version(latest)), (Some(c), Some(l)) if c < l) +} + +pub fn is_newer_version(current: &str, latest: &str) -> bool { + matches!((parse_version(current), parse_version(latest)), (Some(c), Some(l)) if c > l) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_current_version_equal() { + assert!(!is_older_version("0.3.39", "0.3.39")); + assert!(!is_newer_version("0.3.39", "0.3.39")); + } + + #[test] + fn test_older_version() { + assert!(is_older_version("0.3.38", "0.3.39")); + assert!(!is_newer_version("0.3.38", "0.3.39")); + } + + #[test] + fn test_newer_version() { + assert!(!is_older_version("0.3.40", "0.3.39")); + assert!(is_newer_version("0.3.40", "0.3.39")); + } +} diff --git a/pyproject.toml b/pyproject.toml index 3d1998b5..b0e89ac6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "maturin" [project] name = "tower" -version = "0.3.42" +version = "0.3.43" description = "Tower CLI and runtime environment for Tower." authors = [{ name = "Tower Computing Inc.", email = "brad@tower.dev" }] readme = "README.md" diff --git a/tests/integration/features/cli_runs.feature b/tests/integration/features/cli_runs.feature index 5afa3e40..ede7b436 100644 --- a/tests/integration/features/cli_runs.feature +++ b/tests/integration/features/cli_runs.feature @@ -38,3 +38,18 @@ Feature: CLI Run Commands And I run "tower run" via CLI Then the output should show "First log before run completes" And the output should show "Second log after run completes" + + Scenario: CLI apps logs follow should stream logs and drain after completion + Given I have a simple hello world application named "app-logs-after-completion" + When I run "tower deploy --create" via CLI + And I run "tower run --detached" via CLI and capture run number + And I run "tower apps logs --follow {app_name}#{run_number}" via CLI using created app name and run number + Then the output should show "First log before run completes" + And the output should show "Second log after run completes" + + Scenario: CLI apps logs follow should display warnings + Given I have a simple hello world application named "app-logs-warning" + When I run "tower deploy --create" via CLI + And I run "tower run --detached" via CLI and capture run number + And I run "tower apps logs --follow {app_name}#{run_number}" via CLI using created app name and run number + Then the output should show "Warning: Rate limit approaching" diff --git a/tests/integration/features/steps/cli_steps.py b/tests/integration/features/steps/cli_steps.py index 24a510ec..a491e491 100644 --- a/tests/integration/features/steps/cli_steps.py +++ b/tests/integration/features/steps/cli_steps.py @@ -6,6 +6,7 @@ import shutil import json import shlex +import re from datetime import datetime from pathlib import Path from behave import given, when, then @@ -55,6 +56,37 @@ def step_run_cli_command(context, command): raise +@step('I run "{command}" via CLI using created app name') +def step_run_cli_command_with_app_name(context, command): + """Run a Tower CLI command with the generated app name injected.""" + if not hasattr(context, "app_name"): + raise AssertionError("Expected context.app_name to be set by app setup step") + formatted = command.format(app_name=context.app_name) + step_run_cli_command(context, formatted) + + +@step('I run "{command}" via CLI and capture run number') +def step_run_cli_command_capture_run_number(context, command): + """Run a Tower CLI command and capture the run number from its output.""" + step_run_cli_command(context, command) + output = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", context.cli_output) + match = re.search(r"Run #(?P\d+)", output) + if not match: + raise AssertionError(f"Expected run number in output, got: {output}") + context.run_number = match.group("number") + + +@step('I run "{command}" via CLI using created app name and run number') +def step_run_cli_command_with_app_name_and_run(context, command): + """Run a Tower CLI command with the generated app name and run number injected.""" + if not hasattr(context, "app_name"): + raise AssertionError("Expected context.app_name to be set by app setup step") + if not hasattr(context, "run_number"): + raise AssertionError("Expected context.run_number to be set by run step") + formatted = command.format(app_name=context.app_name, run_number=context.run_number) + step_run_cli_command(context, formatted) + + @step("timestamps should be yellow colored") def step_timestamps_should_be_yellow(context): """Verify timestamps are colored yellow (ANSI code 33)""" diff --git a/tests/integration/features/steps/mcp_steps.py b/tests/integration/features/steps/mcp_steps.py index 9e8090c2..1fa07c3d 100644 --- a/tests/integration/features/steps/mcp_steps.py +++ b/tests/integration/features/steps/mcp_steps.py @@ -127,6 +127,7 @@ def create_towerfile( """Create a Towerfile for testing - pure function with no side effects beyond file creation""" app_name = unique_app_name(context, app_name, force_new=True) + context.app_name = app_name template_dir = Path(__file__).parents[2] / "templates" @@ -880,7 +881,7 @@ def step_then_receive_workflow_help_sse(context): ), f"Expected success response, got: {context.mcp_response}" assert "content" in context.mcp_response content = context.mcp_response["content"][0].text - assert "Tower Application Development Workflow" in content + assert "Tower Workflow" in content @then("I should receive workflow help content via HTTP") @@ -897,7 +898,7 @@ def step_then_receive_workflow_help_stdio(context): result = context.mcp_response["result"] assert "content" in result content = result["content"][0]["text"] - assert "Tower Application Development Workflow" in content + assert "Tower Workflow" in content @given('I have a simple hello world application named "{app_name}"') diff --git a/tests/integration/run_tests.py b/tests/integration/run_tests.py index 7f59d8ed..7ff598ee 100755 --- a/tests/integration/run_tests.py +++ b/tests/integration/run_tests.py @@ -18,6 +18,19 @@ def log(msg): print(f"\033[36m[test-runner]\033[0m {msg}") +def reset_session_fixture(test_home): + """Reset the session.json fixture to its committed state before tests. + + The CLI may modify session.json during MCP operations (like team switching), + so we restore it to the canonical committed version before each test run. + """ + session_file = test_home / ".config" / "tower" / "session.json" + subprocess.run( + ["git", "checkout", str(session_file)], + capture_output=True, + ) + + def check_mock_server_health(url): """Check if the mock server is running and responding.""" try: @@ -129,6 +142,8 @@ def main(): return 1 finally: + reset_session_fixture(test_home) + if mock_process: log("Stopping mock server...") mock_process.terminate() diff --git a/tests/mock-api-server/main.py b/tests/mock-api-server/main.py index cf949e70..e597d0b4 100644 --- a/tests/mock-api-server/main.py +++ b/tests/mock-api-server/main.py @@ -272,7 +272,11 @@ async def describe_run(name: str, seq: int): # For logs-after-completion test apps, complete quickly to test log draining # Use 1 second so CLI has time to start streaming before completion - completion_threshold = 1.0 if "logs-after-completion" in name else 5.0 + completion_threshold = ( + 1.0 + if "logs-after-completion" in name or "logs-warning" in name + else 5.0 + ) if elapsed > completion_threshold: run_data["status"] = "exited" @@ -491,6 +495,11 @@ def make_log_event(seq: int, line_num: int, content: str, timestamp: str): return f"event: log\ndata: {json.dumps(make_log_data(seq, line_num, content, timestamp))}\n\n" +def make_warning_event(content: str, timestamp: str): + data = {"data": {"content": content, "reported_at": timestamp}, "event": "warning"} + return f"event: warning\ndata: {json.dumps(data)}\n\n" + + @app.get("/v1/apps/{name}/runs/{seq}/logs") async def describe_run_logs(name: str, seq: int): """Mock endpoint for getting run logs.""" @@ -519,6 +528,14 @@ async def generate_logs_after_completion_test_stream(seq: int): ) +async def generate_warning_log_stream(seq: int): + """Stream a warning and a couple of logs, then finish.""" + yield make_warning_event("Rate limit approaching", "2025-08-22T12:00:00Z") + yield make_log_event(seq, 1, "Warning stream log 1", "2025-08-22T12:00:00Z") + await asyncio.sleep(1.2) + yield make_log_event(seq, 2, "Warning stream log 2", "2025-08-22T12:00:01Z") + + async def generate_normal_log_stream(seq: int): """Normal log stream for regular tests.""" for line_num, content, timestamp in NORMAL_LOG_ENTRIES: @@ -533,7 +550,9 @@ async def stream_run_logs(name: str, seq: int): if name not in mock_apps_db: raise HTTPException(status_code=404, detail=f"App '{name}' not found") - if "logs-after-completion" in name: + if "logs-warning" in name: + stream = generate_warning_log_stream(seq) + elif "logs-after-completion" in name: stream = generate_logs_after_completion_test_stream(seq) else: stream = generate_normal_log_stream(seq) diff --git a/uv.lock b/uv.lock index 620c0b67..51bf7d0e 100644 --- a/uv.lock +++ b/uv.lock @@ -2744,7 +2744,7 @@ wheels = [ [[package]] name = "tower" -version = "0.3.42" +version = "0.3.43" source = { editable = "." } dependencies = [ { name = "attrs" },