From fd79bafe4aeeaf8ee32898c4211d13d7d03f5d45 Mon Sep 17 00:00:00 2001 From: Burak Dede Date: Sun, 18 Jan 2026 11:48:36 +0100 Subject: [PATCH 1/7] Add new `--follow` mode for `tower apps logs` with resilient streaming and tests (#171) * feat (cli): add follow mode for apps logs * fix(cli): improve follow retry handling * refactor: reset follow backoff and improve log errors * reset monitor failures and cancel follow watchers * harden apps logs follow and add regression tests for - deduplicate log lines across reconnects / might be an issue - add unit test for deduplication and out of order logs * fix linter issues * wait for run start before log streaming --- crates/tower-cmd/src/api.rs | 39 +- crates/tower-cmd/src/apps.rs | 479 +++++++++++++++++- tests/integration/features/cli_runs.feature | 15 + tests/integration/features/steps/cli_steps.py | 32 ++ tests/integration/features/steps/mcp_steps.py | 1 + tests/mock-api-server/main.py | 23 +- 6 files changed, 582 insertions(+), 7 deletions(-) diff --git a/crates/tower-cmd/src/api.rs b/crates/tower-cmd/src/api.rs index 8a49b4a3..772bfb21 100644 --- a/crates/tower-cmd/src/api.rs +++ b/crates/tower-cmd/src/api.rs @@ -367,6 +367,17 @@ pub enum LogStreamError { Unknown, } +impl std::fmt::Display for LogStreamError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + LogStreamError::Reqwest(err) => write!(f, "{err}"), + LogStreamError::Unknown => write!(f, "unknown log stream error"), + } + } +} + +impl std::error::Error for LogStreamError {} + impl From for LogStreamError { fn from(err: reqwest_eventsource::CannotCloneRequestError) -> Self { debug!("Failed to clone request {:?}", err); @@ -399,10 +410,30 @@ async fn drain_run_logs_stream(mut source: EventSource, tx: mpsc::Sender { let event_warning = serde_json::from_str(&message.data); - if let Ok(event) = event_warning { - tx.send(LogStreamEvent::EventWarning(event)).await.ok(); - } else { - debug!("Failed to parse warning message: {:?}", message.data); + match event_warning { + Ok(event) => { + tx.send(LogStreamEvent::EventWarning(event)).await.ok(); + } + Err(err) => { + let warning_data = serde_json::from_str(&message.data); + match warning_data { + Ok(data) => { + let event = tower_api::models::EventWarning { + data, + event: tower_api::models::event_warning::Event::Warning, + id: None, + retry: None, + }; + tx.send(LogStreamEvent::EventWarning(event)).await.ok(); + } + Err(_) => { + debug!( + "Failed to parse warning message: {:?}. Error: {}", + message.data, err + ); + } + } + } } } _ => { diff --git a/crates/tower-cmd/src/apps.rs b/crates/tower-cmd/src/apps.rs index db86e857..a95a6251 100644 --- a/crates/tower-cmd/src/apps.rs +++ b/crates/tower-cmd/src/apps.rs @@ -1,7 +1,9 @@ use clap::{value_parser, Arg, ArgMatches, Command}; use config::Config; +use tokio::sync::oneshot; +use tokio::time::{sleep, Duration, Instant}; -use tower_api::models::Run; +use tower_api::models::{Run, RunLogLine}; use crate::{api, output}; @@ -19,6 +21,13 @@ pub fn apps_cmd() -> Command { ) .subcommand( Command::new("logs") + .arg( + Arg::new("follow") + .short('f') + .long("follow") + .help("Follow logs in real time") + .action(clap::ArgAction::SetTrue), + ) .allow_external_subcommands(true) .override_usage("tower apps logs [OPTIONS] #") .after_help("Example: tower apps logs hello-world#11") @@ -54,6 +63,12 @@ pub fn apps_cmd() -> Command { pub async fn do_logs(config: Config, cmd: &ArgMatches) { let (name, seq) = extract_app_name_and_run("logs", cmd.subcommand()); + let follow = cmd.get_one::("follow").copied().unwrap_or(false); + + if follow { + follow_logs(config, name, seq).await; + return; + } if let Ok(resp) = api::describe_run_logs(&config, &name, seq).await { for line in resp.log_lines { @@ -217,3 +232,465 @@ fn extract_app_name(subcmd: &str, cmd: Option<(&str, &ArgMatches)>) -> String { ); output::die(&line); } + +const FOLLOW_BACKOFF_INITIAL: Duration = Duration::from_millis(500); +const FOLLOW_BACKOFF_MAX: Duration = Duration::from_secs(5); +const LOG_DRAIN_DURATION: Duration = Duration::from_secs(5); +const RUN_START_POLL_INTERVAL: Duration = Duration::from_millis(500); +const RUN_START_MESSAGE_DELAY: Duration = Duration::from_secs(3); + +async fn follow_logs(config: Config, name: String, seq: i64) { + let enable_ctrl_c = !output::get_output_mode().is_mcp(); + let mut backoff = FOLLOW_BACKOFF_INITIAL; + let mut cancel_monitor: Option> = None; + let mut last_line_num: Option = None; + + loop { + let mut run = match api::describe_run(&config, &name, seq).await { + Ok(res) => res.run, + Err(err) => output::tower_error_and_die(err, "Fetching run details failed"), + }; + + if is_run_finished(&run) { + if let Ok(resp) = api::describe_run_logs(&config, &name, seq).await { + for line in resp.log_lines { + emit_log_if_new(&line, &mut last_line_num); + } + } + return; + } + + if !is_run_started(&run) { + let wait_started = Instant::now(); + let mut notified = false; + loop { + sleep(RUN_START_POLL_INTERVAL).await; + // Avoid blank output on slow starts while keeping fast starts quiet. + if should_notify_run_wait(notified, wait_started.elapsed()) { + output::write("Waiting for run to start...\n"); + notified = true; + } + run = match api::describe_run(&config, &name, seq).await { + Ok(res) => res.run, + Err(err) => output::tower_error_and_die(err, "Fetching run details failed"), + }; + if is_run_finished(&run) { + if let Ok(resp) = api::describe_run_logs(&config, &name, seq).await { + for line in resp.log_lines { + emit_log_if_new(&line, &mut last_line_num); + } + } + return; + } + if is_run_started(&run) { + break; + } + } + } + + // Cancel any prior watcher so we don't accumulate pollers after reconnects. + if let Some(cancel) = cancel_monitor.take() { + let _ = cancel.send(()); + } + let (cancel_tx, cancel_rx) = oneshot::channel(); + cancel_monitor = Some(cancel_tx); + let run_complete = monitor_run_completion(&config, &name, seq, cancel_rx); + match api::stream_run_logs(&config, &name, seq).await { + Ok(log_stream) => { + // Reset after a successful connection so transient drops recover quickly. + backoff = FOLLOW_BACKOFF_INITIAL; + match stream_logs_until_complete( + log_stream, + run_complete, + enable_ctrl_c, + &run.dollar_link, + &mut last_line_num, + ) + .await + { + Ok(LogFollowOutcome::Completed) => { + if let Some(cancel) = cancel_monitor.take() { + let _ = cancel.send(()); + } + return; + } + Ok(LogFollowOutcome::Interrupted) => { + if let Some(cancel) = cancel_monitor.take() { + let _ = cancel.send(()); + } + return; + } + Ok(LogFollowOutcome::Disconnected) => {} + Err(_) => { + if let Some(cancel) = cancel_monitor.take() { + let _ = cancel.send(()); + } + return; + } + } + } + Err(err) => { + if is_fatal_stream_error(&err) { + output::error(&format!("Failed to stream run logs: {}", err)); + return; + } + output::error(&format!("Failed to stream run logs: {}", err)); + sleep(backoff).await; + backoff = next_backoff(backoff); + continue; + } + } + + let latest = match api::describe_run(&config, &name, seq).await { + Ok(res) => res.run, + Err(err) => output::tower_error_and_die(err, "Fetching run details failed"), + }; + if is_run_finished(&latest) { + return; + } + + sleep(backoff).await; + backoff = next_backoff(backoff); + } +} + +fn next_backoff(current: Duration) -> Duration { + let next = current.checked_mul(2).unwrap_or(FOLLOW_BACKOFF_MAX); + if next > FOLLOW_BACKOFF_MAX { + FOLLOW_BACKOFF_MAX + } else { + next + } +} + +enum LogFollowOutcome { + Completed, + Disconnected, + Interrupted, +} + +async fn stream_logs_until_complete( + mut log_stream: tokio::sync::mpsc::Receiver, + mut run_complete: oneshot::Receiver, + enable_ctrl_c: bool, + run_link: &str, + last_line_num: &mut Option, +) -> Result { + loop { + tokio::select! { + event = log_stream.recv() => match event { + Some(api::LogStreamEvent::EventLog(log)) => { + emit_log_if_new(&log, last_line_num); + }, + Some(api::LogStreamEvent::EventWarning(warning)) => { + output::write(&format!("Warning: {}\n", warning.data.content)); + } + None => return Ok(LogFollowOutcome::Disconnected), + }, + res = &mut run_complete => { + match res { + Ok(_) => { + drain_remaining_logs(log_stream, last_line_num).await; + return Ok(LogFollowOutcome::Completed); + } + // If monitoring failed, keep following and let the caller retry. + Err(_) => return Ok(LogFollowOutcome::Disconnected), + } + }, + _ = tokio::signal::ctrl_c(), if enable_ctrl_c => { + output::write("Received Ctrl+C, stopping log streaming...\n"); + output::write("Note: The run will continue in Tower cloud\n"); + output::write(&format!(" See more: {}\n", run_link)); + return Ok(LogFollowOutcome::Interrupted); + }, + } + } +} + +async fn drain_remaining_logs( + mut log_stream: tokio::sync::mpsc::Receiver, + last_line_num: &mut Option, +) { + let _ = tokio::time::timeout(LOG_DRAIN_DURATION, async { + while let Some(event) = log_stream.recv().await { + match event { + api::LogStreamEvent::EventLog(log) => { + emit_log_if_new(&log, last_line_num); + } + api::LogStreamEvent::EventWarning(warning) => { + output::write(&format!("Warning: {}\n", warning.data.content)); + } + } + } + }) + .await; +} + +fn emit_log_if_new(log: &RunLogLine, last_line_num: &mut Option) { + if should_emit_line(last_line_num, log.line_num) { + output::remote_log_event(log); + } +} + +fn should_emit_line(last_line_num: &mut Option, line_num: i64) -> bool { + if last_line_num.map_or(true, |last| line_num > last) { + *last_line_num = Some(line_num); + true + } else { + false + } +} + +fn is_fatal_stream_error(err: &api::LogStreamError) -> bool { + match err { + api::LogStreamError::Reqwest(reqwest_err) => reqwest_err + .status() + .map(|status| status.is_client_error() && status.as_u16() != 429) + .unwrap_or(false), + api::LogStreamError::Unknown => false, + } +} + +fn monitor_run_completion( + config: &Config, + app_name: &str, + seq: i64, + mut cancel: oneshot::Receiver<()>, +) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + let config_clone = config.clone(); + let app_name = app_name.to_string(); + + tokio::spawn(async move { + let mut failures = 0; + loop { + tokio::select! { + _ = &mut cancel => return, + result = api::describe_run(&config_clone, &app_name, seq) => match result { + Ok(res) => { + failures = 0; + if is_run_finished(&res.run) { + let _ = tx.send(res.run); + return; + } + } + Err(_) => { + failures += 1; + if failures >= 5 { + output::error( + "Failed to monitor run completion after repeated errors", + ); + return; + } + } + }, + } + sleep(Duration::from_millis(500)).await; + } + }); + + rx +} + +fn is_run_finished(run: &Run) -> bool { + match run.status { + // Be explicit about terminal states so new non-terminal statuses + // don't cause us to stop following logs too early. + tower_api::models::run::Status::Crashed + | tower_api::models::run::Status::Errored + | tower_api::models::run::Status::Exited + | tower_api::models::run::Status::Cancelled => true, + _ => false, + } +} + +fn is_run_started(run: &Run) -> bool { + match run.status { + tower_api::models::run::Status::Scheduled | tower_api::models::run::Status::Pending => { + false + } + _ => true, + } +} + +fn should_notify_run_wait(already_notified: bool, elapsed: Duration) -> bool { + !already_notified && elapsed >= RUN_START_MESSAGE_DELAY +} + +#[cfg(test)] +mod tests { + use super::{ + apps_cmd, is_run_started, next_backoff, should_emit_line, should_notify_run_wait, + stream_logs_until_complete, LogFollowOutcome, FOLLOW_BACKOFF_INITIAL, FOLLOW_BACKOFF_MAX, + }; + use super::is_run_finished; + use tokio::sync::{mpsc, oneshot}; + use tokio::time::Duration; + use tower_api::models::run::Status; + use tower_api::models::Run; + + #[test] + fn test_follow_flag_parsing() { + let matches = apps_cmd() + .try_get_matches_from(["apps", "logs", "--follow", "hello-world#11"]) + .unwrap(); + let (cmd, sub_matches) = matches.subcommand().unwrap(); + + assert_eq!(cmd, "logs"); + assert_eq!(sub_matches.get_one::("follow"), Some(&true)); + assert_eq!( + sub_matches.subcommand().map(|(name, _)| name), + Some("hello-world#11") + ); + } + + #[test] + fn test_terminal_statuses_explicit() { + let non_terminal = [Status::Scheduled, Status::Pending, Status::Running]; + for status in non_terminal { + let run = Run { + status, + ..Default::default() + }; + assert!(!is_run_finished(&run)); + } + + let terminal = [ + Status::Crashed, + Status::Errored, + Status::Exited, + Status::Cancelled, + ]; + for status in terminal { + let run = Run { + status, + ..Default::default() + }; + assert!(is_run_finished(&run)); + } + } + + #[test] + fn test_status_variants_exhaustive() { + let status = Status::Scheduled; + match status { + Status::Scheduled => {} + Status::Pending => {} + Status::Running => {} + Status::Crashed => {} + Status::Errored => {} + Status::Exited => {} + Status::Cancelled => {} + } + } + + #[test] + fn test_run_started_statuses() { + let not_started = [Status::Scheduled, Status::Pending]; + for status in not_started { + let run = Run { + status, + ..Default::default() + }; + assert!(!is_run_started(&run)); + } + + let started = [ + Status::Running, + Status::Crashed, + Status::Errored, + Status::Exited, + Status::Cancelled, + ]; + for status in started { + let run = Run { + status, + ..Default::default() + }; + assert!(is_run_started(&run)); + } + } + + #[test] + fn test_run_wait_notification_logic() { + assert!(!should_notify_run_wait(true, super::RUN_START_MESSAGE_DELAY)); + assert!(!should_notify_run_wait( + false, + super::RUN_START_MESSAGE_DELAY - Duration::from_millis(1) + )); + assert!(should_notify_run_wait(false, super::RUN_START_MESSAGE_DELAY)); + } + + #[tokio::test] + async fn test_stream_completion_on_run_finish() { + let (tx, rx) = mpsc::channel(1); + let (done_tx, done_rx) = oneshot::channel(); + let mut last_line_num = None; + + let done_task = tokio::spawn(async move { + let _ = done_tx.send(Run::default()); + tokio::time::sleep(Duration::from_millis(10)).await; + drop(tx); + }); + + let res = stream_logs_until_complete(rx, done_rx, false, "link", &mut last_line_num).await; + done_task.await.unwrap(); + + assert!(matches!(res, Ok(LogFollowOutcome::Completed))); + } + + #[tokio::test] + async fn test_stream_disconnection_on_close() { + let (tx, rx) = mpsc::channel(1); + drop(tx); + let (_done_tx, done_rx) = oneshot::channel::(); + let mut last_line_num = None; + + let res = stream_logs_until_complete(rx, done_rx, false, "link", &mut last_line_num).await; + + assert!(matches!(res, Ok(LogFollowOutcome::Disconnected))); + } + + #[test] + fn test_backoff_growth_and_cap() { + let mut backoff = FOLLOW_BACKOFF_INITIAL; + backoff = next_backoff(backoff); + assert_eq!(backoff, Duration::from_secs(1)); + backoff = next_backoff(backoff); + assert_eq!(backoff, Duration::from_secs(2)); + backoff = next_backoff(backoff); + assert_eq!(backoff, Duration::from_secs(4)); + backoff = next_backoff(backoff); + assert_eq!(backoff, FOLLOW_BACKOFF_MAX); + backoff = next_backoff(backoff); + assert_eq!(backoff, FOLLOW_BACKOFF_MAX); + } + + #[test] + fn test_duplicate_line_filtering() { + let mut last_line_num = None; + assert!(should_emit_line(&mut last_line_num, 1)); + assert_eq!(last_line_num, Some(1)); + assert!(!should_emit_line(&mut last_line_num, 1)); + assert_eq!(last_line_num, Some(1)); + assert!(!should_emit_line(&mut last_line_num, 0)); + assert_eq!(last_line_num, Some(1)); + assert!(should_emit_line(&mut last_line_num, 2)); + assert_eq!(last_line_num, Some(2)); + assert!(should_emit_line(&mut last_line_num, 10)); + assert_eq!(last_line_num, Some(10)); + } + + #[test] + fn test_out_of_order_log_handling() { + let mut last_line_num = None; + assert!(should_emit_line(&mut last_line_num, 1)); + assert_eq!(last_line_num, Some(1)); + assert!(should_emit_line(&mut last_line_num, 3)); + assert_eq!(last_line_num, Some(3)); + assert!(!should_emit_line(&mut last_line_num, 2)); + assert_eq!(last_line_num, Some(3)); + assert!(should_emit_line(&mut last_line_num, 4)); + assert_eq!(last_line_num, Some(4)); + } +} diff --git a/tests/integration/features/cli_runs.feature b/tests/integration/features/cli_runs.feature index 5afa3e40..ede7b436 100644 --- a/tests/integration/features/cli_runs.feature +++ b/tests/integration/features/cli_runs.feature @@ -38,3 +38,18 @@ Feature: CLI Run Commands And I run "tower run" via CLI Then the output should show "First log before run completes" And the output should show "Second log after run completes" + + Scenario: CLI apps logs follow should stream logs and drain after completion + Given I have a simple hello world application named "app-logs-after-completion" + When I run "tower deploy --create" via CLI + And I run "tower run --detached" via CLI and capture run number + And I run "tower apps logs --follow {app_name}#{run_number}" via CLI using created app name and run number + Then the output should show "First log before run completes" + And the output should show "Second log after run completes" + + Scenario: CLI apps logs follow should display warnings + Given I have a simple hello world application named "app-logs-warning" + When I run "tower deploy --create" via CLI + And I run "tower run --detached" via CLI and capture run number + And I run "tower apps logs --follow {app_name}#{run_number}" via CLI using created app name and run number + Then the output should show "Warning: Rate limit approaching" diff --git a/tests/integration/features/steps/cli_steps.py b/tests/integration/features/steps/cli_steps.py index 24a510ec..a491e491 100644 --- a/tests/integration/features/steps/cli_steps.py +++ b/tests/integration/features/steps/cli_steps.py @@ -6,6 +6,7 @@ import shutil import json import shlex +import re from datetime import datetime from pathlib import Path from behave import given, when, then @@ -55,6 +56,37 @@ def step_run_cli_command(context, command): raise +@step('I run "{command}" via CLI using created app name') +def step_run_cli_command_with_app_name(context, command): + """Run a Tower CLI command with the generated app name injected.""" + if not hasattr(context, "app_name"): + raise AssertionError("Expected context.app_name to be set by app setup step") + formatted = command.format(app_name=context.app_name) + step_run_cli_command(context, formatted) + + +@step('I run "{command}" via CLI and capture run number') +def step_run_cli_command_capture_run_number(context, command): + """Run a Tower CLI command and capture the run number from its output.""" + step_run_cli_command(context, command) + output = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", context.cli_output) + match = re.search(r"Run #(?P\d+)", output) + if not match: + raise AssertionError(f"Expected run number in output, got: {output}") + context.run_number = match.group("number") + + +@step('I run "{command}" via CLI using created app name and run number') +def step_run_cli_command_with_app_name_and_run(context, command): + """Run a Tower CLI command with the generated app name and run number injected.""" + if not hasattr(context, "app_name"): + raise AssertionError("Expected context.app_name to be set by app setup step") + if not hasattr(context, "run_number"): + raise AssertionError("Expected context.run_number to be set by run step") + formatted = command.format(app_name=context.app_name, run_number=context.run_number) + step_run_cli_command(context, formatted) + + @step("timestamps should be yellow colored") def step_timestamps_should_be_yellow(context): """Verify timestamps are colored yellow (ANSI code 33)""" diff --git a/tests/integration/features/steps/mcp_steps.py b/tests/integration/features/steps/mcp_steps.py index 9e8090c2..f0b217f3 100644 --- a/tests/integration/features/steps/mcp_steps.py +++ b/tests/integration/features/steps/mcp_steps.py @@ -127,6 +127,7 @@ def create_towerfile( """Create a Towerfile for testing - pure function with no side effects beyond file creation""" app_name = unique_app_name(context, app_name, force_new=True) + context.app_name = app_name template_dir = Path(__file__).parents[2] / "templates" diff --git a/tests/mock-api-server/main.py b/tests/mock-api-server/main.py index cf949e70..e597d0b4 100644 --- a/tests/mock-api-server/main.py +++ b/tests/mock-api-server/main.py @@ -272,7 +272,11 @@ async def describe_run(name: str, seq: int): # For logs-after-completion test apps, complete quickly to test log draining # Use 1 second so CLI has time to start streaming before completion - completion_threshold = 1.0 if "logs-after-completion" in name else 5.0 + completion_threshold = ( + 1.0 + if "logs-after-completion" in name or "logs-warning" in name + else 5.0 + ) if elapsed > completion_threshold: run_data["status"] = "exited" @@ -491,6 +495,11 @@ def make_log_event(seq: int, line_num: int, content: str, timestamp: str): return f"event: log\ndata: {json.dumps(make_log_data(seq, line_num, content, timestamp))}\n\n" +def make_warning_event(content: str, timestamp: str): + data = {"data": {"content": content, "reported_at": timestamp}, "event": "warning"} + return f"event: warning\ndata: {json.dumps(data)}\n\n" + + @app.get("/v1/apps/{name}/runs/{seq}/logs") async def describe_run_logs(name: str, seq: int): """Mock endpoint for getting run logs.""" @@ -519,6 +528,14 @@ async def generate_logs_after_completion_test_stream(seq: int): ) +async def generate_warning_log_stream(seq: int): + """Stream a warning and a couple of logs, then finish.""" + yield make_warning_event("Rate limit approaching", "2025-08-22T12:00:00Z") + yield make_log_event(seq, 1, "Warning stream log 1", "2025-08-22T12:00:00Z") + await asyncio.sleep(1.2) + yield make_log_event(seq, 2, "Warning stream log 2", "2025-08-22T12:00:01Z") + + async def generate_normal_log_stream(seq: int): """Normal log stream for regular tests.""" for line_num, content, timestamp in NORMAL_LOG_ENTRIES: @@ -533,7 +550,9 @@ async def stream_run_logs(name: str, seq: int): if name not in mock_apps_db: raise HTTPException(status_code=404, detail=f"App '{name}' not found") - if "logs-after-completion" in name: + if "logs-warning" in name: + stream = generate_warning_log_stream(seq) + elif "logs-after-completion" in name: stream = generate_logs_after_completion_test_stream(seq) else: stream = generate_normal_log_stream(seq) From 45e9c4128b8a14f1ca219816269fd12eef2cf3ef Mon Sep 17 00:00:00 2001 From: Ben Lovell Date: Mon, 19 Jan 2026 17:36:24 +0100 Subject: [PATCH 2/7] feat: improve LLM instructions for MCP (#174) LLMs wouldn't always use the MCP server, and even when they do, they don't always do the right thing (e.g. they create an app with hatchling, even though it's not needed). Here I've touched up the instructions that get inserted into the context when using the MCP server to improve 1-shot performance by LLM agents using the tower MCP --- crates/tower-cmd/src/mcp.rs | 122 +++++++++++------- tests/integration/features/steps/mcp_steps.py | 4 +- 2 files changed, 75 insertions(+), 51 deletions(-) diff --git a/crates/tower-cmd/src/mcp.rs b/crates/tower-cmd/src/mcp.rs index a7a5974d..eeabc1d9 100644 --- a/crates/tower-cmd/src/mcp.rs +++ b/crates/tower-cmd/src/mcp.rs @@ -597,7 +597,7 @@ impl TowerService { } #[tool( - description = "Deploy your app to Tower cloud. Prerequisites: 1) Create Towerfile, 2) Create app with tower_apps_create. Optional working_directory parameter specifies which project directory to deploy from." + description = "Deploy to Tower cloud. Prerequisites: Towerfile, tower_apps_create. Optional: working_directory." )] async fn tower_deploy( &self, @@ -612,7 +612,7 @@ impl TowerService { } #[tool( - description = "Run your app locally using the local Towerfile and source files. Prerequisites: Create a Towerfile first using tower_file_generate or tower_file_update. Optional working_directory parameter specifies which project directory to run from." + description = "Run locally using Towerfile (preferred during development, has access to tower secrets). Prerequisites: Towerfile. Optional: working_directory." )] async fn tower_run_local( &self, @@ -651,7 +651,7 @@ impl TowerService { } #[tool( - description = "Run your app remotely on Tower cloud. Prerequisites: 1) Create Towerfile, 2) Create app with tower_apps_create, 3) Deploy with tower_deploy" + description = "Run on Tower cloud. Prerequisites: Towerfile, tower_apps_create, tower_deploy." )] async fn tower_run_remote( &self, @@ -703,7 +703,7 @@ impl TowerService { } #[tool( - description = "Read and parse the current Towerfile configuration. Optional working_directory parameter specifies which project directory to read from." + description = "Read and parse Towerfile configuration. Optional: working_directory." )] async fn tower_file_read( &self, @@ -717,7 +717,7 @@ impl TowerService { } #[tool( - description = "Update Towerfile app configuration. Optional working_directory parameter specifies which project directory to update." + description = "Update Towerfile config (app name, script, description, source). Use this instead of editing TOML. Optional: working_directory." )] async fn tower_file_update( &self, @@ -752,7 +752,7 @@ impl TowerService { } #[tool( - description = "Add a new parameter to the Towerfile. Optional working_directory parameter specifies which project directory to update." + description = "Add parameter to Towerfile. Use this instead of editing TOML. Optional: working_directory." )] async fn tower_file_add_parameter( &self, @@ -779,7 +779,7 @@ impl TowerService { } #[tool( - description = "Validate the current Towerfile configuration. Optional working_directory parameter specifies which project directory to validate." + description = "Validate Towerfile configuration. Optional: working_directory." )] async fn tower_file_validate( &self, @@ -793,7 +793,7 @@ impl TowerService { } #[tool( - description = "Generate Towerfile from pyproject.toml. This is typically the first step in the workflow. Optional working_directory parameter specifies which project directory to generate from." + description = "Generate Towerfile from pyproject.toml (first step). Don't add build systems to pyproject.toml. Optional: working_directory." )] async fn tower_file_generate( &self, @@ -827,48 +827,64 @@ impl TowerService { description = "Show the recommended workflow for developing and deploying Tower applications" )] async fn tower_workflow_help(&self) -> Result { - let workflow = r#"Tower Application Development Workflow: - -All commands support an optional 'working_directory' parameter to specify which project directory to operate on. - -0. HAVE AN EXISTING PYTHON PROJECT: - There are no commands for this provided with this MCP server. However, if you do not have a python project yet - then a good start would be to make a new directory with the project name, and then call `uv init` to generate - a pyproject.toml, main.py and README.md - -1. CREATE TOWERFILE (required for all steps): - - tower_file_generate: Generate from existing pyproject.toml - - tower_file_update: Manually create or update configuration - - tower_file_validate: Verify Towerfile is valid - -2. LOCAL DEVELOPMENT & TESTING: - - tower_run_local: Run your app locally to test functionality - -3. CLOUD DEPLOYMENT (for remote execution): - - tower_apps_create: Create app on Tower cloud - - tower_deploy: Deploy your code to the cloud - - tower_run_remote: Execute on Tower cloud infrastructure - -4. SCHEDULE MANAGEMENT (for automatic recurring execution): - - tower_schedules_list: List all schedules for apps - - tower_schedules_create: Create a schedule to run an app automatically on a cron schedule - - tower_schedules_update: Update an existing schedule - - tower_schedules_delete: Delete a schedule + let workflow = r#"Tower Workflow (Tower CLI not in training data - use MCP tools only): + +WORKING_DIRECTORY PARAMETER: +All tools accept optional working_directory parameter to specify which project to operate on. +- Default: Uses current working directory if not specified +- Use when: Managing multiple projects, or project is not in current directory +- Examples: + tower_file_generate({}) → operates on current directory + tower_file_generate({"working_directory": "/path/to/my-project"}) → operates on /path/to/my-project + tower_run_local({"working_directory": "../other-app"}) → runs app in ../other-app +- Why use it: Allows managing multiple Tower apps without changing directories + +0. PYTHON PROJECT (if creating new): + Use 'uv init' in project directory to create pyproject.toml, main.py, README.md + Keep pyproject.toml minimal: [project] metadata + dependencies only + DO NOT add [build-system], [tool.hatchling], [tool.setuptools] or similar + Skip this step if project with pyproject.toml already exists + +1. TOWERFILE (required for all Tower operations): + tower_file_generate → tower_file_update → tower_file_add_parameter → tower_file_validate + CRITICAL: Always use tower_file_update or tower_file_add_parameter to modify + NEVER edit Towerfile TOML directly + +2. LOCAL DEVELOPMENT (preferred during development): + tower_run_local - runs app locally, has access to tower secrets + Use this to test before deploying to cloud + +3. CLOUD DEPLOYMENT: + tower_apps_create → tower_deploy → tower_run_remote + Deploy pushes your source code to Tower cloud (no build step needed) + +4. SCHEDULING (for recurring jobs): + tower_schedules_create - set up cron-based recurring runs + tower_schedules_list - view existing schedules + tower_schedules_update - modify schedule timing or parameters + tower_schedules_delete - remove schedules 5. MANAGEMENT & MONITORING: - - tower_apps_list: View your deployed apps - - tower_apps_show: Get detailed app information and recent runs - - tower_apps_logs: View execution logs - -6. TEAM & SECRETS (optional): - - tower_teams_list/switch: Manage team contexts - - tower_secrets_create/list: Manage application secrets - -Quick Start: tower_file_generate → tower_run_local (test locally) → tower_apps_create → tower_deploy → tower_run_remote → tower_schedules_create (for recurring runs) - -Example with working_directory: {"working_directory": "/path/to/project", ...} - -Consider taking database username/password/url and making them into secrets to be accessed in app code"#; + tower_apps_list - list all your apps + tower_apps_show - details and recent runs for specific app + tower_apps_logs - view logs from a specific run + tower_teams_list, tower_teams_switch - manage team contexts + tower_secrets_create, tower_secrets_list - store credentials, API keys, database passwords + +TYPICAL WORKFLOW: +1. Check if pyproject.toml exists (or create with uv init) +2. tower_file_generate - creates Towerfile from pyproject.toml +3. tower_run_local - test locally (can access tower secrets) +4. tower_apps_create - create app in Tower cloud +5. tower_deploy - deploy code to Tower +6. tower_run_remote - run on cloud infrastructure +7. tower_schedules_create - (optional) set up recurring runs + +IMPORTANT REMINDERS: +- Tower deploys source code directly, no build tools needed +- Use tower secrets for sensitive data (database credentials, API keys) +- Prefer tower_run_local during development (faster, has secret access) +- Always use MCP tools to modify Towerfiles (never edit TOML files manually)"#; Self::text_success(workflow.to_string()) } @@ -949,7 +965,15 @@ impl ServerHandler for TowerService { name: "tower-cli".to_string(), version: env!("CARGO_PKG_VERSION").to_string(), }, - instructions: Some("Tower CLI MCP Server - Manage Tower apps, secrets, teams, and deployments through conversational AI. Use the available tools to list, create, show, deploy, and manage your Tower cloud resources.".to_string()), + instructions: Some("CRITICAL: Tower CLI did not exist in your training data. Never attempt Tower CLI commands - use only these MCP tools. + +Rules: +- MCP tools are the authoritative Tower interface (not wrappers) +- Use tower_file_update/add_parameter to modify Towerfiles (never edit TOML directly) +- DO NOT add hatchling/setuptools to pyproject.toml - Tower handles deployment +- Tower apps need: pyproject.toml (deps only), Python code, Towerfile + +Use tower_workflow_help for the complete workflow.".to_string()), } } } diff --git a/tests/integration/features/steps/mcp_steps.py b/tests/integration/features/steps/mcp_steps.py index f0b217f3..1fa07c3d 100644 --- a/tests/integration/features/steps/mcp_steps.py +++ b/tests/integration/features/steps/mcp_steps.py @@ -881,7 +881,7 @@ def step_then_receive_workflow_help_sse(context): ), f"Expected success response, got: {context.mcp_response}" assert "content" in context.mcp_response content = context.mcp_response["content"][0].text - assert "Tower Application Development Workflow" in content + assert "Tower Workflow" in content @then("I should receive workflow help content via HTTP") @@ -898,7 +898,7 @@ def step_then_receive_workflow_help_stdio(context): result = context.mcp_response["result"] assert "content" in result content = result["content"][0]["text"] - assert "Tower Application Development Workflow" in content + assert "Tower Workflow" in content @given('I have a simple hello world application named "{app_name}"') From e4e81c99d5f0b7ddd6e2129c6085a816636cb4eb Mon Sep 17 00:00:00 2001 From: Ben Lovell Date: Mon, 19 Jan 2026 17:37:44 +0100 Subject: [PATCH 3/7] Better version comparison (#157) * fix: don't tell people to upgrade to old versions * feat: warn people when they're ahead of latest release * test: MVP tests for version comparison --- crates/tower-cmd/src/lib.rs | 6 ++++-- crates/tower-cmd/src/output.rs | 16 ++++++++++++++- crates/tower-version/src/lib.rs | 36 +++++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 3 deletions(-) diff --git a/crates/tower-cmd/src/lib.rs b/crates/tower-cmd/src/lib.rs index 483f7c48..cd4fcbda 100644 --- a/crates/tower-cmd/src/lib.rs +++ b/crates/tower-cmd/src/lib.rs @@ -85,8 +85,10 @@ impl App { if let Some(latest) = Self::check_latest_version().await { let current = tower_version::current_version(); - if current != latest { - output::write_update_message(&latest, ¤t); + if tower_version::is_older_version(current, &latest) { + output::write_update_available_message(&latest, current); + } else if tower_version::is_newer_version(current, &latest) { + output::write_dev_version_message(current, &latest); } } diff --git a/crates/tower-cmd/src/output.rs b/crates/tower-cmd/src/output.rs index fc3b7bb2..272a31da 100644 --- a/crates/tower-cmd/src/output.rs +++ b/crates/tower-cmd/src/output.rs @@ -501,7 +501,7 @@ pub fn spinner(msg: &str) -> Spinner { Spinner::new(msg.into()) } -pub fn write_update_message(latest: &str, current: &str) { +pub fn write_update_available_message(latest: &str, current: &str) { let line = format!( "{}\n{}\n", format!( @@ -517,6 +517,20 @@ pub fn write_update_message(latest: &str, current: &str) { io::stderr().write_all(line.as_bytes()).unwrap(); } +pub fn write_dev_version_message(current: &str, latest: &str) { + let line = format!( + "{}\n", + format!( + "Running dev version {} (latest published: {})", + current, latest + ) + .dimmed() + ); + + use std::io::{self, Write}; + io::stderr().write_all(line.as_bytes()).unwrap(); +} + /// newline just outputs a newline. This is useful when you have a very specific formatting you /// want to maintain and you don't want to use println!. pub fn newline() { diff --git a/crates/tower-version/src/lib.rs b/crates/tower-version/src/lib.rs index 3deb1449..4d8202c6 100644 --- a/crates/tower-version/src/lib.rs +++ b/crates/tower-version/src/lib.rs @@ -42,3 +42,39 @@ pub async fn check_latest_version() -> Result> { } Ok(None) } + +fn parse_version(v: &str) -> Option<(u32, u32, u32)> { + let parts: Vec<_> = v.split('.').filter_map(|p| p.parse::().ok()).collect(); + (parts.len() == 3).then(|| (parts[0], parts[1], parts[2])) +} + +pub fn is_older_version(current: &str, latest: &str) -> bool { + matches!((parse_version(current), parse_version(latest)), (Some(c), Some(l)) if c < l) +} + +pub fn is_newer_version(current: &str, latest: &str) -> bool { + matches!((parse_version(current), parse_version(latest)), (Some(c), Some(l)) if c > l) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_current_version_equal() { + assert!(!is_older_version("0.3.39", "0.3.39")); + assert!(!is_newer_version("0.3.39", "0.3.39")); + } + + #[test] + fn test_older_version() { + assert!(is_older_version("0.3.38", "0.3.39")); + assert!(!is_newer_version("0.3.38", "0.3.39")); + } + + #[test] + fn test_newer_version() { + assert!(!is_older_version("0.3.40", "0.3.39")); + assert!(is_newer_version("0.3.40", "0.3.39")); + } +} From 59d84aad3c815cd0715afab216031a342c066a45 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Tue, 20 Jan 2026 13:54:56 +0000 Subject: [PATCH 4/7] Abort execution when installation fails (#177) * When dependency installation fails, we abort execution * Add a test for broken dependencies aborting execution --- crates/tower-runtime/src/errors.rs | 3 ++ crates/tower-runtime/src/local.rs | 16 ++++++- .../05-broken-dependencies/README.md | 0 .../05-broken-dependencies/Towerfile | 3 ++ .../05-broken-dependencies/main.py | 1 + .../05-broken-dependencies/pyproject.toml | 9 ++++ crates/tower-runtime/tests/local_test.rs | 47 +++++++++++++++++++ 7 files changed, 77 insertions(+), 2 deletions(-) create mode 100644 crates/tower-runtime/tests/example-apps/05-broken-dependencies/README.md create mode 100644 crates/tower-runtime/tests/example-apps/05-broken-dependencies/Towerfile create mode 100644 crates/tower-runtime/tests/example-apps/05-broken-dependencies/main.py create mode 100644 crates/tower-runtime/tests/example-apps/05-broken-dependencies/pyproject.toml diff --git a/crates/tower-runtime/src/errors.rs b/crates/tower-runtime/src/errors.rs index 19c8ed10..eae04338 100644 --- a/crates/tower-runtime/src/errors.rs +++ b/crates/tower-runtime/src/errors.rs @@ -64,6 +64,9 @@ pub enum Error { #[snafu(display("cancelled"))] Cancelled, + + #[snafu(display("dependency installation failed"))] + DependencyInstallationFailed, } impl From for Error { diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index ee4968e2..a6296517 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -222,7 +222,13 @@ async fn execute_local_app( )); // Wait for venv to finish up. - wait_for_process(ctx.clone(), &cancel_token, child).await; + let res = wait_for_process(ctx.clone(), &cancel_token, child).await; + + if res != 0 { + // If the venv process failed, we want to return an error. + let _ = sx.send(res); + return Err(Error::VirtualEnvCreationFailed); + } // Check once more if the process was cancelled before we do a uv sync. The sync itself, // once started, will take a while and we have logic for checking for cancellation. @@ -269,7 +275,13 @@ async fn execute_local_app( )); // Let's wait for the setup to finish. We don't care about the results. - wait_for_process(ctx.clone(), &cancel_token, child).await; + let res = wait_for_process(ctx.clone(), &cancel_token, child).await; + + if res != 0 { + // If the sync process failed, we want to return an error. + let _ = sx.send(res); + return Err(Error::DependencyInstallationFailed); + } } } diff --git a/crates/tower-runtime/tests/example-apps/05-broken-dependencies/README.md b/crates/tower-runtime/tests/example-apps/05-broken-dependencies/README.md new file mode 100644 index 00000000..e69de29b diff --git a/crates/tower-runtime/tests/example-apps/05-broken-dependencies/Towerfile b/crates/tower-runtime/tests/example-apps/05-broken-dependencies/Towerfile new file mode 100644 index 00000000..0f5b958c --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/05-broken-dependencies/Towerfile @@ -0,0 +1,3 @@ +[app] +name = "05-broken-dependencies" +script = "./main.py" diff --git a/crates/tower-runtime/tests/example-apps/05-broken-dependencies/main.py b/crates/tower-runtime/tests/example-apps/05-broken-dependencies/main.py new file mode 100644 index 00000000..0efff4c8 --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/05-broken-dependencies/main.py @@ -0,0 +1 @@ +print("This should never run because dependency installation should fail") diff --git a/crates/tower-runtime/tests/example-apps/05-broken-dependencies/pyproject.toml b/crates/tower-runtime/tests/example-apps/05-broken-dependencies/pyproject.toml new file mode 100644 index 00000000..ce68498c --- /dev/null +++ b/crates/tower-runtime/tests/example-apps/05-broken-dependencies/pyproject.toml @@ -0,0 +1,9 @@ +[project] +name = "05-broken-dependencies" +version = "0.1.0" +description = "App with a non-existent dependency to test failure handling" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "this-package-definitely-does-not-exist-xyz-123456789>=1.0.0", +] diff --git a/crates/tower-runtime/tests/local_test.rs b/crates/tower-runtime/tests/local_test.rs index d8ee4a6d..02069308 100644 --- a/crates/tower-runtime/tests/local_test.rs +++ b/crates/tower-runtime/tests/local_test.rs @@ -328,3 +328,50 @@ async fn test_running_app_with_secret() { let status = app.status().await.expect("Failed to get app status"); assert!(status == Status::Exited, "App should be running"); } + +#[tokio::test] +async fn test_abort_on_dependency_installation_failure() { + debug!("Running 05-broken-dependencies"); + // This test verifies that when dependency installation fails (uv sync returns non-zero), + // the app correctly reports a Crashed status rather than continuing execution. + let broken_deps_dir = get_example_app_dir("05-broken-dependencies"); + let package = build_package_from_dir(&broken_deps_dir).await; + let (sender, mut receiver) = unbounded_channel(); + + let opts = StartOptions { + ctx: tower_telemetry::Context::new(), + package, + output_sender: sender, + cwd: None, + environment: "local".to_string(), + secrets: HashMap::new(), + parameters: HashMap::new(), + env_vars: HashMap::new(), + cache_dir: Some(config::default_cache_dir()), + }; + + // Start the app using the LocalApp runtime + let app = LocalApp::start(opts).await.expect("Failed to start app"); + + // Drain all output - we need to consume the channel for the app to complete + while let Some(output) = receiver.recv().await { + debug!("Received output: {:?}", output.line); + } + + // The status should be Crashed since dependency installation failed + let status = app.status().await.expect("Failed to get app status"); + match status { + Status::Crashed { code } => { + assert!(code != 0, "Exit code should be non-zero, got {}", code); + } + Status::Exited => { + panic!("App should have crashed due to dependency installation failure, not exited successfully"); + } + Status::Running => { + panic!("App should not still be running"); + } + Status::None => { + panic!("App should have a status"); + } + } +} From d25dfe9be09be835fd03917138bad290d824aa4e Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Tue, 20 Jan 2026 13:56:22 +0000 Subject: [PATCH 5/7] Bump version to v0.3.43 --- Cargo.lock | 22 +++++++++++----------- Cargo.toml | 2 +- pyproject.toml | 2 +- uv.lock | 2 +- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 786e2744..675e212f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -480,7 +480,7 @@ dependencies = [ [[package]] name = "config" -version = "0.3.42" +version = "0.3.43" dependencies = [ "base64", "chrono", @@ -587,7 +587,7 @@ checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crypto" -version = "0.3.42" +version = "0.3.43" dependencies = [ "aes-gcm", "base64", @@ -3215,7 +3215,7 @@ dependencies = [ [[package]] name = "testutils" -version = "0.3.42" +version = "0.3.43" dependencies = [ "pem", "rsa", @@ -3485,7 +3485,7 @@ checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" [[package]] name = "tower" -version = "0.3.42" +version = "0.3.43" dependencies = [ "tokio", "tower-api", @@ -3510,7 +3510,7 @@ dependencies = [ [[package]] name = "tower-api" -version = "0.3.42" +version = "0.3.43" dependencies = [ "reqwest", "serde", @@ -3522,7 +3522,7 @@ dependencies = [ [[package]] name = "tower-cmd" -version = "0.3.42" +version = "0.3.43" dependencies = [ "axum", "bytes", @@ -3592,7 +3592,7 @@ checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-package" -version = "0.3.42" +version = "0.3.43" dependencies = [ "async-compression", "config", @@ -3611,7 +3611,7 @@ dependencies = [ [[package]] name = "tower-runtime" -version = "0.3.42" +version = "0.3.43" dependencies = [ "chrono", "config", @@ -3632,7 +3632,7 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tower-telemetry" -version = "0.3.42" +version = "0.3.43" dependencies = [ "tracing", "tracing-appender", @@ -3641,7 +3641,7 @@ dependencies = [ [[package]] name = "tower-uv" -version = "0.3.42" +version = "0.3.43" dependencies = [ "async-compression", "async_zip", @@ -3655,7 +3655,7 @@ dependencies = [ [[package]] name = "tower-version" -version = "0.3.42" +version = "0.3.43" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index aa5c264f..da4f236a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ resolver = "2" [workspace.package] edition = "2021" -version = "0.3.42" +version = "0.3.43" description = "Tower is the best way to host Python data apps in production" rust-version = "1.81" authors = ["Brad Heller "] diff --git a/pyproject.toml b/pyproject.toml index 3d1998b5..b0e89ac6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "maturin" [project] name = "tower" -version = "0.3.42" +version = "0.3.43" description = "Tower CLI and runtime environment for Tower." authors = [{ name = "Tower Computing Inc.", email = "brad@tower.dev" }] readme = "README.md" diff --git a/uv.lock b/uv.lock index 620c0b67..51bf7d0e 100644 --- a/uv.lock +++ b/uv.lock @@ -2744,7 +2744,7 @@ wheels = [ [[package]] name = "tower" -version = "0.3.42" +version = "0.3.43" source = { editable = "." } dependencies = [ { name = "attrs" }, From 2fe8ba72e43e111b66acd3b1a7fdd6663d889440 Mon Sep 17 00:00:00 2001 From: Vim <121349594+sammuti@users.noreply.github.com> Date: Tue, 20 Jan 2026 08:21:58 -0700 Subject: [PATCH 6/7] [TOW-1299] App Isolation traits (#159) * impl first iter * Working k8s backend * move k8s backend to tower-runner * Minor * Remove cache abstractions * Refactor run.rs correctly * Refactor back the k8s runtime * minor * Make AppLauncher generic over ExecutionBackend instead of app * Renaming local -> subprocess * CliBackend * remove AppLauncher * minor * Update deps * dep updates * BundleRef -> PackageRef * minor * Remove cli backend * Move out k8s backend * revert session.json * minro * Add ExecutionSpec.package_stream to pass the package * minor * rebase --- Cargo.lock | 36 +++- Cargo.toml | 1 + crates/config/src/session.rs | 4 +- crates/tower-cmd/src/apps.rs | 12 +- crates/tower-cmd/src/mcp.rs | 8 +- crates/tower-cmd/src/run.rs | 140 ++++++++++---- crates/tower-package/Cargo.toml | 9 +- crates/tower-runtime/Cargo.toml | 10 +- crates/tower-runtime/src/errors.rs | 27 ++- crates/tower-runtime/src/execution.rs | 243 +++++++++++++++++++++++++ crates/tower-runtime/src/lib.rs | 72 +------- crates/tower-runtime/src/local.rs | 24 +-- crates/tower-runtime/src/subprocess.rs | 200 ++++++++++++++++++++ 13 files changed, 634 insertions(+), 152 deletions(-) create mode 100644 crates/tower-runtime/src/execution.rs create mode 100644 crates/tower-runtime/src/subprocess.rs diff --git a/Cargo.lock b/Cargo.lock index 675e212f..5ea25d7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -168,6 +168,17 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "async_zip" version = "0.0.16" @@ -2789,18 +2800,28 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -3605,7 +3626,6 @@ dependencies = [ "tokio", "tokio-stream", "tokio-tar", - "tokio-util", "tower-telemetry", ] @@ -3613,10 +3633,12 @@ dependencies = [ name = "tower-runtime" version = "0.3.43" dependencies = [ + "async-trait", "chrono", "config", "nix 0.30.1", "snafu", + "tmpdir", "tokio", "tokio-util", "tower-package", @@ -3862,9 +3884,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" +checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" dependencies = [ "getrandom 0.3.3", "js-sys", diff --git a/Cargo.toml b/Cargo.toml index da4f236a..a6fc1635 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ repository = "https://github.com/tower/tower-cli" aes-gcm = "0.10" anyhow = "1.0.95" async-compression = { version = "0.4", features = ["tokio", "gzip"] } +async-trait = "0.1.89" async_zip = { version = "0.0.16", features = ["tokio", "tokio-fs", "deflate"] } axum = "0.8.4" base64 = "0.22" diff --git a/crates/config/src/session.rs b/crates/config/src/session.rs index 53830ccf..f7a22b07 100644 --- a/crates/config/src/session.rs +++ b/crates/config/src/session.rs @@ -21,7 +21,9 @@ fn extract_aid_from_jwt(jwt: &str) -> Option { let payload = parts[1]; let decoded = URL_SAFE_NO_PAD.decode(payload).ok()?; let json: serde_json::Value = serde_json::from_slice(&decoded).ok()?; - json.get("https://tower.dev/aid")?.as_str().map(String::from) + json.get("https://tower.dev/aid")? + .as_str() + .map(String::from) } const DEFAULT_TOWER_URL: &str = "https://api.tower.dev"; diff --git a/crates/tower-cmd/src/apps.rs b/crates/tower-cmd/src/apps.rs index a95a6251..31652eea 100644 --- a/crates/tower-cmd/src/apps.rs +++ b/crates/tower-cmd/src/apps.rs @@ -519,11 +519,11 @@ fn should_notify_run_wait(already_notified: bool, elapsed: Duration) -> bool { #[cfg(test)] mod tests { + use super::is_run_finished; use super::{ apps_cmd, is_run_started, next_backoff, should_emit_line, should_notify_run_wait, stream_logs_until_complete, LogFollowOutcome, FOLLOW_BACKOFF_INITIAL, FOLLOW_BACKOFF_MAX, }; - use super::is_run_finished; use tokio::sync::{mpsc, oneshot}; use tokio::time::Duration; use tower_api::models::run::Status; @@ -613,12 +613,18 @@ mod tests { #[test] fn test_run_wait_notification_logic() { - assert!(!should_notify_run_wait(true, super::RUN_START_MESSAGE_DELAY)); + assert!(!should_notify_run_wait( + true, + super::RUN_START_MESSAGE_DELAY + )); assert!(!should_notify_run_wait( false, super::RUN_START_MESSAGE_DELAY - Duration::from_millis(1) )); - assert!(should_notify_run_wait(false, super::RUN_START_MESSAGE_DELAY)); + assert!(should_notify_run_wait( + false, + super::RUN_START_MESSAGE_DELAY + )); } #[tokio::test] diff --git a/crates/tower-cmd/src/mcp.rs b/crates/tower-cmd/src/mcp.rs index eeabc1d9..af0a5b27 100644 --- a/crates/tower-cmd/src/mcp.rs +++ b/crates/tower-cmd/src/mcp.rs @@ -702,9 +702,7 @@ impl TowerService { } } - #[tool( - description = "Read and parse Towerfile configuration. Optional: working_directory." - )] + #[tool(description = "Read and parse Towerfile configuration. Optional: working_directory.")] async fn tower_file_read( &self, Parameters(request): Parameters, @@ -778,9 +776,7 @@ impl TowerService { } } - #[tool( - description = "Validate Towerfile configuration. Optional: working_directory." - )] + #[tool(description = "Validate Towerfile configuration. Optional: working_directory.")] async fn tower_file_validate( &self, Parameters(request): Parameters, diff --git a/crates/tower-cmd/src/run.rs b/crates/tower-cmd/src/run.rs index 50a3de07..b96f2ad0 100644 --- a/crates/tower-cmd/src/run.rs +++ b/crates/tower-cmd/src/run.rs @@ -3,20 +3,27 @@ use clap::{Arg, ArgMatches, Command}; use config::{Config, Towerfile}; use std::collections::HashMap; use std::path::PathBuf; +use tokio::fs::File; use tower_api::models::Run; use tower_package::{Package, PackageSpec}; -use tower_runtime::{local::LocalApp, App, AppLauncher, OutputReceiver, Status}; +use tower_runtime::{OutputReceiver, Status}; use tower_telemetry::{debug, Context}; +use crate::{api, output, util::dates}; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::{ - mpsc::{unbounded_channel, Receiver as MpscReceiver}, + mpsc::Receiver as MpscReceiver, oneshot::{self, Receiver as OneshotReceiver}, Mutex, }; use tokio::time::{sleep, timeout, Duration}; - -use crate::{api, output, util::dates}; +use tower_runtime::execution::ExecutionHandle; +use tower_runtime::execution::{ + CacheBackend, CacheConfig, CacheIsolation, ExecutionBackend, ExecutionSpec, ResourceLimits, + RuntimeConfig as ExecRuntimeConfig, +}; +use tower_runtime::subprocess::SubprocessBackend; pub fn run_cmd() -> Command { Command::new("run") @@ -148,7 +155,7 @@ where env_vars.insert("TOWER_URL".to_string(), config.tower_url.to_string()); // There should always be a session, if there isn't one then I'm not sure how we got here? - let session = config.session.ok_or(Error::NoSession)?; + let session = config.session.as_ref().ok_or(Error::NoSession)?; env_vars.insert("TOWER_JWT".to_string(), session.token.jwt.to_string()); @@ -162,34 +169,42 @@ where } } - // Build the package - let mut package = build_package(&towerfile).await?; - - // Unpack the package - package.unpack().await?; - - let (sender, receiver) = unbounded_channel(); - + // Build the package (creates tar.gz) + let package = build_package(&towerfile).await?; output::success(&format!("Launching app `{}`", towerfile.app.name)); - let output_task = tokio::spawn(output_handler(receiver)); - let mut launcher: AppLauncher = AppLauncher::default(); - launcher - .launch( - Context::new(), - sender, - package, - env.to_string(), - secrets, + // Open the tar.gz file as a stream + let package_path = package + .package_file_path + .as_ref() + .expect("Package must have a file path"); + let package_file = File::open(package_path).await?; + + let backend = SubprocessBackend::new(config.cache_dir.clone()); + let run_id = format!( + "cli-run-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + ); + let handle = backend + .create(build_cli_execution_spec( + config, + env, params, + secrets, env_vars, - config.cache_dir, - ) + package_file, + run_id, + )) .await?; + let receiver = handle.logs().await?; + let output_task = tokio::spawn(output_handler(receiver)); - // Monitor app output and status concurrently - let app = Arc::new(Mutex::new(launcher.app.unwrap())); - let status_task = tokio::spawn(monitor_local_status(Arc::clone(&app))); + // Monitor app status concurrently + let handle = Arc::new(Mutex::new(handle)); + let status_task = tokio::spawn(monitor_cli_status(Arc::clone(&handle))); // Wait for app to complete or SIGTERM let status_result = tokio::select! { @@ -199,7 +214,7 @@ where }, _ = tokio::signal::ctrl_c(), if !output::get_output_mode().is_mcp() => { output::write("\nReceived Ctrl+C, stopping local run...\n"); - app.lock().await.terminate().await.ok(); + handle.lock().await.terminate().await.ok(); return Ok(output_task.await.unwrap()); } }; @@ -222,6 +237,52 @@ where Ok(final_result) } +fn build_cli_execution_spec( + config: Config, + env: &str, + params: HashMap, + secrets: HashMap, + env_vars: HashMap, + package_stream: File, + run_id: String, +) -> ExecutionSpec { + let spec = ExecutionSpec { + id: run_id, + package_stream: Box::new(package_stream), + runtime: ExecRuntimeConfig { + image: "local".to_string(), + version: None, + cache: CacheConfig { + enable_bundle_cache: true, + enable_runtime_cache: true, + enable_dependency_cache: true, + backend: match config.cache_dir.clone() { + Some(dir) => CacheBackend::Local { cache_dir: dir }, + None => CacheBackend::None, + }, + isolation: CacheIsolation::None, + }, + entrypoint: None, + command: None, + }, + environment: env.to_string(), + secrets, + parameters: params, + env_vars, + resources: ResourceLimits { + cpu_millicores: None, + memory_mb: None, + storage_mb: None, + max_pids: None, + gpu_count: 0, + timeout_seconds: 3600, + }, + networking: None, + telemetry_ctx: Context::new(), + }; + spec +} + /// do_run_local is the entrypoint for running an app locally. It will load the Towerfile, build /// the package, and launch the app. The relevant package is cleaned up after execution is /// complete. @@ -595,8 +656,12 @@ async fn monitor_output(mut output: OutputReceiver) { /// monitor_local_status is a helper function that will monitor the status of a given app and waits for /// it to progress to a terminal state. -async fn monitor_local_status(app: Arc>) -> Status { - debug!("Starting status monitoring for LocalApp"); +async fn monitor_cli_status( + handle: Arc>, +) -> Status { + use tower_runtime::execution::ExecutionHandle as _; + + debug!("Starting status monitoring for CLI execution"); let mut check_count = 0; let mut err_count = 0; @@ -604,11 +669,11 @@ async fn monitor_local_status(app: Arc>) -> Status { check_count += 1; debug!( - "Status check #{}, attempting to get app status", + "Status check #{}, attempting to get CLI handle status", check_count ); - match app.lock().await.status().await { + match handle.lock().await.status().await { Ok(status) => { // We reset the error count to indicate that we can intermittently get statuses. err_count = 0; @@ -616,30 +681,27 @@ async fn monitor_local_status(app: Arc>) -> Status { match status { Status::Exited => { debug!("Run exited cleanly, stopping status monitoring"); - - // We're done. Exit this loop and function. return status; } Status::Crashed { .. } => { debug!("Run crashed, stopping status monitoring"); - - // We're done. Exit this loop and function. return status; } _ => { + debug!("Handle status: other, continuing to monitor"); sleep(Duration::from_millis(100)).await; } } } Err(e) => { - debug!("Failed to get app status: {:?}", e); + debug!("Failed to get handle status: {:?}", e); err_count += 1; // If we get five errors in a row, we abandon monitoring. if err_count >= 5 { - debug!("Failed to get app status after 5 attempts, giving up"); + debug!("Failed to get handle status after 5 attempts, giving up"); output::error("An error occured while monitoring your local run status!"); - return tower_runtime::Status::Crashed { code: -1 }; + return Status::Crashed { code: -1 }; } // Otherwise, keep on keepin' on. diff --git a/crates/tower-package/Cargo.toml b/crates/tower-package/Cargo.toml index 98efbaa4..1e302058 100644 --- a/crates/tower-package/Cargo.toml +++ b/crates/tower-package/Cargo.toml @@ -10,13 +10,12 @@ license = { workspace = true } async-compression = { workspace = true } config = { workspace = true } glob = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } sha2 = { workspace = true } -snafu = { workspace = true } +snafu = { workspace = true } tmpdir = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true } tokio-stream = { workspace = true } tokio-tar = { workspace = true } -tokio-util = { workspace = true } tower-telemetry = { workspace = true } diff --git a/crates/tower-runtime/Cargo.toml b/crates/tower-runtime/Cargo.toml index 8ab42fb5..37b47f62 100644 --- a/crates/tower-runtime/Cargo.toml +++ b/crates/tower-runtime/Cargo.toml @@ -7,14 +7,16 @@ rust-version = { workspace = true } license = { workspace = true } [dependencies] +async-trait = { workspace = true } chrono = { workspace = true } nix = { workspace = true } -snafu = { workspace = true } +snafu = { workspace = true } +tmpdir = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } -tower-package = { workspace = true } -tower-telemetry = { workspace = true } -tower-uv = { workspace = true } +tower-package = { workspace = true } +tower-telemetry = { workspace = true } +tower-uv = { workspace = true } [dev-dependencies] config = { workspace = true } diff --git a/crates/tower-runtime/src/errors.rs b/crates/tower-runtime/src/errors.rs index eae04338..0326cd27 100644 --- a/crates/tower-runtime/src/errors.rs +++ b/crates/tower-runtime/src/errors.rs @@ -17,14 +17,14 @@ pub enum Error { #[snafu(display("not implemented"))] NotImplemented, - #[snafu(display("bundle download failed"))] - BundleDownloadFailed, + #[snafu(display("package download failed"))] + PackageDownloadFailed, - #[snafu(display("bundle create failed"))] - BundleCreateFailed, + #[snafu(display("package create failed"))] + PackageCreateFailed, - #[snafu(display("bundle unpack failed"))] - BundleUnpackFailed, + #[snafu(display("package unpack failed"))] + PackageUnpackFailed, #[snafu(display("container already initialized"))] AlreadyInitialized, @@ -65,6 +65,15 @@ pub enum Error { #[snafu(display("cancelled"))] Cancelled, + #[snafu(display("app not started"))] + AppNotStarted, + + #[snafu(display("no execution handle"))] + NoHandle, + + #[snafu(display("invalid package"))] + InvalidPackage, + #[snafu(display("dependency installation failed"))] DependencyInstallationFailed, } @@ -94,3 +103,9 @@ impl From for Error { } } } + +impl From for Error { + fn from(_: tower_package::Error) -> Self { + Error::PackageUnpackFailed + } +} diff --git a/crates/tower-runtime/src/execution.rs b/crates/tower-runtime/src/execution.rs new file mode 100644 index 00000000..47d6dd26 --- /dev/null +++ b/crates/tower-runtime/src/execution.rs @@ -0,0 +1,243 @@ +//! Execution backend abstraction for Tower +//! +//! This module provides traits and types for abstracting execution backends, +//! allowing Tower to support multiple compute substrates (local processes, +//! Kubernetes pods, etc.) through a uniform interface. + +use async_trait::async_trait; +use std::collections::HashMap; +use std::path::PathBuf; +use tokio::io::AsyncRead; + +use crate::errors::Error; +use crate::{OutputReceiver, Status}; + +// ============================================================================ +// Core Execution Types +// ============================================================================ + +/// ExecutionSpec describes what to execute and how +pub struct ExecutionSpec { + /// Unique identifier for this execution (e.g., run_id) + pub id: String, + + /// Package as a stream of tar.gz data + pub package_stream: Box, + + /// Runtime configuration (image, version, etc.) + pub runtime: RuntimeConfig, + + /// Environment name (e.g., "production", "staging", "default") + pub environment: String, + + /// Secret key-value pairs to inject + pub secrets: HashMap, + + /// Parameter key-value pairs to inject + pub parameters: HashMap, + + /// Additional environment variables + pub env_vars: HashMap, + + /// Resource limits for execution + pub resources: ResourceLimits, + + /// Networking configuration (for service workloads) + pub networking: Option, + + /// Telemetry context for tracing + pub telemetry_ctx: tower_telemetry::Context, +} + +/// RuntimeConfig specifies the execution runtime environment +#[derive(Debug, Clone)] +pub struct RuntimeConfig { + /// Runtime image to use (e.g., "towerhq/tower-runtime:python-3.11") + pub image: String, + + /// Specific version/tag if applicable + pub version: Option, + + /// Cache configuration + pub cache: CacheConfig, + + /// Entrypoint override (if not using bundle's default) + pub entrypoint: Option>, + + /// Command override (if not using bundle's default) + pub command: Option>, +} + +/// CacheConfig describes what should be cached +#[derive(Debug, Clone)] +pub struct CacheConfig { + /// Enable bundle caching (content-addressable by checksum) + pub enable_bundle_cache: bool, + + /// Enable runtime layer caching (container image layers) + pub enable_runtime_cache: bool, + + /// Enable dependency caching (language-specific, e.g., pip cache, node_modules) + pub enable_dependency_cache: bool, + + /// Cache backend to use + pub backend: CacheBackend, + + /// Cache isolation strategy + pub isolation: CacheIsolation, +} + +/// CacheIsolation defines security boundaries for caches +#[derive(Debug, Clone)] +pub enum CacheIsolation { + /// Global sharing (safe for immutable content-addressable caches) + Global, + + /// Per-account isolation + PerAccount { account_id: String }, + + /// Per-app isolation + PerApp { app_id: String }, + + /// No isolation + None, +} + +/// CacheBackend describes where caches are stored +#[derive(Debug, Clone)] +pub enum CacheBackend { + /// Local filesystem cache + Local { cache_dir: PathBuf }, + + /// No caching + None, +} + +/// ResourceLimits defines compute resource constraints +#[derive(Debug, Clone)] +pub struct ResourceLimits { + /// CPU limit in millicores (e.g., 1000 = 1 CPU) + pub cpu_millicores: Option, + + /// Memory limit in megabytes + pub memory_mb: Option, + + /// Ephemeral storage limit in megabytes + pub storage_mb: Option, + + /// Maximum number of processes + pub max_pids: Option, + + /// GPU count + pub gpu_count: u32, + + /// Execution timeout in seconds + pub timeout_seconds: u32, +} + +/// NetworkingSpec defines networking requirements +#[derive(Debug, Clone)] +pub struct NetworkingSpec { + /// Port the app listens on + pub port: u16, + + /// Whether this app needs a stable service endpoint + pub expose_service: bool, + + /// Service name (for DNS) + pub service_name: Option, +} + +// ============================================================================ +// Execution Backend Trait +// ============================================================================ + +/// ExecutionBackend abstracts the compute substrate +#[async_trait] +pub trait ExecutionBackend: Send + Sync { + /// The handle type this backend returns + type Handle: ExecutionHandle; + + /// Create a new execution environment + async fn create(&self, spec: ExecutionSpec) -> Result; + + /// Get backend capabilities + fn capabilities(&self) -> BackendCapabilities; + + /// Cleanup backend resources + async fn cleanup(&self) -> Result<(), Error>; +} + +/// BackendCapabilities describes what a backend supports +#[derive(Debug, Clone)] +pub struct BackendCapabilities { + /// Backend name + pub name: String, + + /// Supports persistent volumes for caching + pub supports_persistent_cache: bool, + + /// Supports pre-warmed environments + pub supports_prewarming: bool, + + /// Supports network isolation + pub supports_network_isolation: bool, + + /// Supports service endpoints + pub supports_service_endpoints: bool, + + /// Typical startup latency in milliseconds + pub typical_cold_start_ms: u64, + pub typical_warm_start_ms: u64, + + /// Maximum concurrent executions + pub max_concurrent_executions: Option, +} + +// ============================================================================ +// Execution Handle Trait +// ============================================================================ + +/// ExecutionHandle represents a running execution +#[async_trait] +pub trait ExecutionHandle: Send + Sync { + /// Get a unique identifier for this execution + fn id(&self) -> &str; + + /// Get current execution status + async fn status(&self) -> Result; + + /// Subscribe to log stream + async fn logs(&self) -> Result; + + /// Terminate execution gracefully + async fn terminate(&mut self) -> Result<(), Error>; + + /// Force kill execution + async fn kill(&mut self) -> Result<(), Error>; + + /// Wait for execution to complete + async fn wait_for_completion(&self) -> Result; + + /// Get service endpoint + async fn service_endpoint(&self) -> Result, Error>; + + /// Cleanup resources + async fn cleanup(&mut self) -> Result<(), Error>; +} + +/// ServiceEndpoint describes how to reach a running service +#[derive(Debug, Clone)] +pub struct ServiceEndpoint { + /// Host/IP to connect to + pub host: String, + + /// Port to connect to + pub port: u16, + + /// Protocol (http, https, tcp, etc.) + pub protocol: String, + + /// Full URL if applicable (e.g., "http://app-run-123.default.svc.cluster.local:8080") + pub url: Option, +} diff --git a/crates/tower-runtime/src/lib.rs b/crates/tower-runtime/src/lib.rs index 195c3e26..74091edc 100644 --- a/crates/tower-runtime/src/lib.rs +++ b/crates/tower-runtime/src/lib.rs @@ -5,10 +5,11 @@ use std::path::PathBuf; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tower_package::Package; -use tower_telemetry::debug; pub mod errors; +pub mod execution; pub mod local; +pub mod subprocess; use errors::Error; @@ -47,7 +48,7 @@ pub type OutputReceiver = UnboundedReceiver; pub type OutputSender = UnboundedSender; -pub trait App { +pub trait App: Send + Sync { // start will start the process fn start(opts: StartOptions) -> impl Future> + Send where @@ -60,73 +61,6 @@ pub trait App { fn status(&self) -> impl Future> + Send; } -pub struct AppLauncher { - pub app: Option, -} - -impl std::default::Default for AppLauncher { - fn default() -> Self { - Self { app: None } - } -} - -impl AppLauncher { - pub async fn launch( - &mut self, - ctx: tower_telemetry::Context, - output_sender: OutputSender, - package: Package, - environment: String, - secrets: HashMap, - parameters: HashMap, - env_vars: HashMap, - cache_dir: Option, - ) -> Result<(), Error> { - let cwd = package.unpacked_path.clone().unwrap().to_path_buf(); - - let opts = StartOptions { - ctx, - output_sender, - cwd: Some(cwd), - environment, - secrets, - parameters, - package, - env_vars, - cache_dir, - }; - - // NOTE: This is a really awful hack to force any existing app to drop itself. Not certain - // this is exactly what we want to do... - self.app = None; - - let res = A::start(opts).await; - - if let Ok(app) = res { - self.app = Some(app); - Ok(()) - } else { - self.app = None; - Err(res.err().unwrap()) - } - } - - pub async fn terminate(&mut self) -> Result<(), Error> { - if let Some(app) = &mut self.app { - if let Err(err) = app.terminate().await { - debug!("failed to terminate app: {}", err); - Err(err) - } else { - self.app = None; - Ok(()) - } - } else { - // There's no app, so nothing to terminate. - Ok(()) - } - } -} - pub struct StartOptions { pub ctx: tower_telemetry::Context, pub package: Package, diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index a6296517..ec2ea64f 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -353,6 +353,18 @@ impl App for LocalApp { }) } + async fn terminate(&mut self) -> Result<(), Error> { + self.terminator.cancel(); + + // Now we should wait for the join handle to finish. + if let Some(execute_handle) = self.execute_handle.take() { + let _ = execute_handle.await; + self.execute_handle = None; + } + + Ok(()) + } + async fn status(&self) -> Result { let mut status = self.status.lock().await; @@ -379,18 +391,6 @@ impl App for LocalApp { } } } - - async fn terminate(&mut self) -> Result<(), Error> { - self.terminator.cancel(); - - // Now we should wait for the join handle to finish. - if let Some(execute_handle) = self.execute_handle.take() { - let _ = execute_handle.await; - self.execute_handle = None; - } - - Ok(()) - } } async fn execute_bash_program( diff --git a/crates/tower-runtime/src/subprocess.rs b/crates/tower-runtime/src/subprocess.rs new file mode 100644 index 00000000..713722f2 --- /dev/null +++ b/crates/tower-runtime/src/subprocess.rs @@ -0,0 +1,200 @@ +//! Subprocess execution backend + +use crate::errors::Error; +use crate::execution::{ + BackendCapabilities, CacheBackend, ExecutionBackend, ExecutionHandle, ExecutionSpec, + ServiceEndpoint, +}; +use crate::local::LocalApp; +use crate::{App, OutputReceiver, StartOptions, Status}; + +use async_trait::async_trait; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; +use tokio::sync::Mutex; +use tokio::time::Duration; +use tower_package::Package; + +/// SubprocessBackend executes apps as a subprocess +pub struct SubprocessBackend { + /// Optional default cache directory to use + cache_dir: Option, +} + +impl SubprocessBackend { + pub fn new(cache_dir: Option) -> Self { + Self { cache_dir } + } + + /// Receive package stream and unpack it + /// + /// Takes a stream of tar.gz data, saves it to a temp file, and unpacks it + /// Returns the Package (which keeps the temp directory alive) + async fn receive_and_unpack_package( + &self, + mut package_stream: Box, + ) -> Result { + // Create temp directory for this package + let temp_dir = tmpdir::TmpDir::new("tower-package") + .await + .map_err(|_| Error::PackageCreateFailed)?; + + // Save stream to tar.gz file + let tar_gz_path = temp_dir.to_path_buf().join("package.tar.gz"); + let mut file = File::create(&tar_gz_path) + .await + .map_err(|_| Error::PackageCreateFailed)?; + + tokio::io::copy(&mut package_stream, &mut file) + .await + .map_err(|_| Error::PackageCreateFailed)?; + + file.flush().await.map_err(|_| Error::PackageCreateFailed)?; + drop(file); + + // Unpack the package + let mut package = Package::default(); + package.package_file_path = Some(tar_gz_path); + package.tmp_dir = Some(temp_dir); + package.unpack().await?; + + Ok(package) + } +} + +#[async_trait] +impl ExecutionBackend for SubprocessBackend { + type Handle = SubprocessHandle; + + async fn create(&self, spec: ExecutionSpec) -> Result { + // Convert ExecutionSpec to StartOptions for LocalApp + let (output_sender, output_receiver) = tokio::sync::mpsc::unbounded_channel(); + + // Get cache_dir from spec or use backend default + let cache_dir = match &spec.runtime.cache.backend { + CacheBackend::Local { cache_dir } => Some(cache_dir.clone()), + _ => self.cache_dir.clone(), + }; + + // Receive package stream and unpack it + let package = self.receive_and_unpack_package(spec.package_stream).await?; + + let unpacked_path = package + .unpacked_path + .clone() + .ok_or(Error::PackageUnpackFailed)?; + + let opts = StartOptions { + ctx: spec.telemetry_ctx, + package: Package::from_unpacked_path(unpacked_path).await?, + cwd: None, // LocalApp determines cwd from package + environment: spec.environment, + secrets: spec.secrets, + parameters: spec.parameters, + env_vars: spec.env_vars, + output_sender: output_sender.clone(), + cache_dir, + }; + + // Start the LocalApp + let app = LocalApp::start(opts).await?; + + Ok(SubprocessHandle { + id: spec.id, + app: Arc::new(Mutex::new(app)), + output_receiver: Arc::new(Mutex::new(output_receiver)), + _package: package, // Keep package alive so temp dir doesn't get cleaned up + }) + } + + fn capabilities(&self) -> BackendCapabilities { + BackendCapabilities { + name: "local".to_string(), + supports_persistent_cache: true, + supports_prewarming: false, + supports_network_isolation: false, + supports_service_endpoints: false, + typical_cold_start_ms: 1000, // ~1s for venv + sync + typical_warm_start_ms: 100, // ~100ms with warm cache + max_concurrent_executions: None, // Limited by system resources + } + } + + async fn cleanup(&self) -> Result<(), Error> { + // Nothing to cleanup for local backend + Ok(()) + } +} + +/// SubprocessHandle provides lifecycle management for a subprocess execution +pub struct SubprocessHandle { + id: String, + app: Arc>, + output_receiver: Arc>, + _package: Package, // Keep package alive to prevent temp dir cleanup +} + +#[async_trait] +impl ExecutionHandle for SubprocessHandle { + fn id(&self) -> &str { + &self.id + } + + async fn status(&self) -> Result { + let app = self.app.lock().await; + app.status().await + } + + async fn logs(&self) -> Result { + // Create a new channel for log streaming + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + // Spawn a task to forward Output from the internal receiver + let output_receiver = self.output_receiver.clone(); + tokio::spawn(async move { + let mut receiver = output_receiver.lock().await; + while let Some(output) = receiver.recv().await { + if tx.send(output).is_err() { + break; // Receiver dropped + } + } + }); + + Ok(rx) + } + + async fn terminate(&mut self) -> Result<(), Error> { + let mut app = self.app.lock().await; + app.terminate().await + } + + async fn kill(&mut self) -> Result<(), Error> { + // For local processes, kill is the same as terminate + self.terminate().await + } + + async fn wait_for_completion(&self) -> Result { + loop { + let status = self.status().await?; + match status { + Status::None | Status::Running => { + tokio::time::sleep(Duration::from_millis(100)).await; + } + _ => return Ok(status), + } + } + } + + async fn service_endpoint(&self) -> Result, Error> { + // Local backend doesn't support service endpoints + Ok(None) + } + + async fn cleanup(&mut self) -> Result<(), Error> { + // Ensure the app is terminated + self.terminate().await?; + Ok(()) + } +} From bdd6184fd486ffc14e4ec55c40ca82b84feea995 Mon Sep 17 00:00:00 2001 From: Ben Lovell Date: Tue, 20 Jan 2026 17:35:05 +0100 Subject: [PATCH 7/7] fix: stop overwriting session mock during tests (#175) --- tests/integration/run_tests.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/integration/run_tests.py b/tests/integration/run_tests.py index 7f59d8ed..7ff598ee 100755 --- a/tests/integration/run_tests.py +++ b/tests/integration/run_tests.py @@ -18,6 +18,19 @@ def log(msg): print(f"\033[36m[test-runner]\033[0m {msg}") +def reset_session_fixture(test_home): + """Reset the session.json fixture to its committed state before tests. + + The CLI may modify session.json during MCP operations (like team switching), + so we restore it to the canonical committed version before each test run. + """ + session_file = test_home / ".config" / "tower" / "session.json" + subprocess.run( + ["git", "checkout", str(session_file)], + capture_output=True, + ) + + def check_mock_server_health(url): """Check if the mock server is running and responding.""" try: @@ -129,6 +142,8 @@ def main(): return 1 finally: + reset_session_fixture(test_home) + if mock_process: log("Stopping mock server...") mock_process.terminate()