From 9fca16391e1ab36aa9bba6cf162e859ba7e4c796 Mon Sep 17 00:00:00 2001 From: Burak Dede Date: Thu, 15 Jan 2026 16:41:28 +0100 Subject: [PATCH 1/5] feat (cli): add follow mode for apps logs --- crates/tower-cmd/src/apps.rs | 283 ++++++++++++++++++ tests/integration/features/cli_runs.feature | 8 + tests/integration/features/steps/cli_steps.py | 32 ++ tests/integration/features/steps/mcp_steps.py | 1 + 4 files changed, 324 insertions(+) diff --git a/crates/tower-cmd/src/apps.rs b/crates/tower-cmd/src/apps.rs index 668d5f79..148e9298 100644 --- a/crates/tower-cmd/src/apps.rs +++ b/crates/tower-cmd/src/apps.rs @@ -1,5 +1,7 @@ use clap::{value_parser, Arg, ArgMatches, Command}; use config::Config; +use tokio::sync::oneshot; +use tokio::time::{sleep, Duration}; use tower_api::models::Run; @@ -17,6 +19,13 @@ pub fn apps_cmd() -> Command { ) .subcommand( Command::new("logs") + .arg( + Arg::new("follow") + .short('f') + .long("follow") + .help("Follow logs in real time") + .action(clap::ArgAction::SetTrue), + ) .allow_external_subcommands(true) .about("Get the logs from a previous Tower app run"), ) @@ -48,6 +57,12 @@ pub fn apps_cmd() -> Command { pub async fn do_logs(config: Config, cmd: &ArgMatches) { let (name, seq) = extract_app_name_and_run("logs", cmd.subcommand()); + let follow = cmd.get_one::("follow").copied().unwrap_or(false); + + if follow { + follow_logs(config, name, seq).await; + return; + } if let Ok(resp) = api::describe_run_logs(&config, &name, seq).await { for line in resp.log_lines { @@ -211,3 +226,271 @@ fn extract_app_name(subcmd: &str, cmd: Option<(&str, &ArgMatches)>) -> String { ); output::die(&line); } + +async fn follow_logs(config: Config, name: String, seq: i64) { + let enable_ctrl_c = !output::get_output_mode().is_mcp(); + let mut backoff = Duration::from_millis(500); + + loop { + let run = match api::describe_run(&config, &name, seq).await { + Ok(res) => res.run, + Err(err) => output::tower_error_and_die(err, "Fetching run details failed"), + }; + + if is_run_finished(&run) { + if let Ok(resp) = api::describe_run_logs(&config, &name, seq).await { + for line in resp.log_lines { + output::remote_log_event(&line); + } + } + return; + } + + let run_complete = monitor_run_completion(&config, &name, seq); + match api::stream_run_logs(&config, &name, seq).await { + Ok(log_stream) => match stream_logs_until_complete( + log_stream, + run_complete, + enable_ctrl_c, + &run.dollar_link, + ) + .await + { + Ok(LogFollowOutcome::Completed) => return, + Ok(LogFollowOutcome::Interrupted) => return, + Ok(LogFollowOutcome::Disconnected) => {} + Err(_) => return, + }, + Err(err) => { + output::error(&format!("Failed to stream run logs: {:?}", err)); + } + } + + let latest = match api::describe_run(&config, &name, seq).await { + Ok(res) => res.run, + Err(err) => output::tower_error_and_die(err, "Fetching run details failed"), + }; + if is_run_finished(&latest) { + return; + } + + sleep(backoff).await; + backoff = next_backoff(backoff); + } +} + +fn next_backoff(current: Duration) -> Duration { + let max = Duration::from_secs(5); + let next = current.checked_mul(2).unwrap_or(max); + if next > max { + max + } else { + next + } +} + +enum LogFollowOutcome { + Completed, + Disconnected, + Interrupted, +} + +async fn stream_logs_until_complete( + mut log_stream: tokio::sync::mpsc::Receiver, + mut run_complete: oneshot::Receiver, + enable_ctrl_c: bool, + run_link: &str, +) -> Result { + loop { + tokio::select! { + event = log_stream.recv() => match event { + Some(api::LogStreamEvent::EventLog(log)) => { + output::remote_log_event(&log); + }, + None => return Ok(LogFollowOutcome::Disconnected), + _ => {}, + }, + res = &mut run_complete => { + match res { + Ok(_) => { + drain_remaining_logs(log_stream).await; + return Ok(LogFollowOutcome::Completed); + } + // If monitoring failed, keep following and let the caller retry. + Err(_) => return Ok(LogFollowOutcome::Disconnected), + } + }, + _ = tokio::signal::ctrl_c(), if enable_ctrl_c => { + output::write("Received Ctrl+C, stopping log streaming...\n"); + output::write("Note: The run will continue in Tower cloud\n"); + output::write(&format!(" See more: {}\n", run_link)); + return Ok(LogFollowOutcome::Interrupted); + }, + } + } +} + +async fn drain_remaining_logs(mut log_stream: tokio::sync::mpsc::Receiver) { + let drain_duration = Duration::from_secs(5); + let _ = tokio::time::timeout(drain_duration, async { + while let Some(event) = log_stream.recv().await { + if let api::LogStreamEvent::EventLog(log) = event { + output::remote_log_event(&log); + } + } + }) + .await; +} + +fn monitor_run_completion( + config: &Config, + app_name: &str, + seq: i64, +) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + let config_clone = config.clone(); + let app_name = app_name.to_string(); + + tokio::spawn(async move { + let mut failures = 0u32; + loop { + match api::describe_run(&config_clone, &app_name, seq).await { + Ok(res) => { + if is_run_finished(&res.run) { + let _ = tx.send(res.run); + return; + } + } + Err(_) => { + failures += 1; + if failures >= 5 { + return; + } + } + } + sleep(Duration::from_millis(500)).await; + } + }); + + rx +} + +fn is_run_finished(run: &Run) -> bool { + match run.status { + // Be explicit about terminal states so new non-terminal statuses + // don't cause us to stop following logs too early. + tower_api::models::run::Status::Crashed + | tower_api::models::run::Status::Errored + | tower_api::models::run::Status::Exited + | tower_api::models::run::Status::Cancelled => true, + _ => false, + } +} + +#[cfg(test)] +mod tests { + use super::{apps_cmd, next_backoff, stream_logs_until_complete, LogFollowOutcome}; + use super::is_run_finished; + use tokio::sync::{mpsc, oneshot}; + use tokio::time::Duration; + use tower_api::models::run::Status; + use tower_api::models::Run; + + #[test] + fn logs_follow_flag_is_parsed() { + let matches = apps_cmd() + .try_get_matches_from(["apps", "logs", "--follow", "hello-world#11"]) + .unwrap(); + let (cmd, sub_matches) = matches.subcommand().unwrap(); + + assert_eq!(cmd, "logs"); + assert_eq!(sub_matches.get_one::("follow"), Some(&true)); + assert_eq!( + sub_matches.subcommand().map(|(name, _)| name), + Some("hello-world#11") + ); + } + + #[test] + fn run_status_terminality_is_explicit() { + let non_terminal = [Status::Scheduled, Status::Pending, Status::Running]; + for status in non_terminal { + let run = Run { + status, + ..Default::default() + }; + assert!(!is_run_finished(&run)); + } + + let terminal = [ + Status::Crashed, + Status::Errored, + Status::Exited, + Status::Cancelled, + ]; + for status in terminal { + let run = Run { + status, + ..Default::default() + }; + assert!(is_run_finished(&run)); + } + } + + #[test] + fn run_status_variants_are_exhaustive() { + let status = Status::Scheduled; + match status { + Status::Scheduled => {} + Status::Pending => {} + Status::Running => {} + Status::Crashed => {} + Status::Errored => {} + Status::Exited => {} + Status::Cancelled => {} + } + } + + #[tokio::test] + async fn stream_logs_completes_on_run_completion() { + let (tx, rx) = mpsc::channel(1); + let (done_tx, done_rx) = oneshot::channel(); + + let done_task = tokio::spawn(async move { + let _ = done_tx.send(Run::default()); + tokio::time::sleep(Duration::from_millis(10)).await; + drop(tx); + }); + + let res = stream_logs_until_complete(rx, done_rx, false, "link").await; + done_task.await.unwrap(); + + assert!(matches!(res, Ok(LogFollowOutcome::Completed))); + } + + #[tokio::test] + async fn stream_logs_returns_disconnected_on_closed_stream() { + let (tx, rx) = mpsc::channel(1); + drop(tx); + let (_done_tx, done_rx) = oneshot::channel::(); + + let res = stream_logs_until_complete(rx, done_rx, false, "link").await; + + assert!(matches!(res, Ok(LogFollowOutcome::Disconnected))); + } + + #[test] + fn backoff_grows_and_caps() { + let mut backoff = Duration::from_millis(500); + backoff = next_backoff(backoff); + assert_eq!(backoff, Duration::from_secs(1)); + backoff = next_backoff(backoff); + assert_eq!(backoff, Duration::from_secs(2)); + backoff = next_backoff(backoff); + assert_eq!(backoff, Duration::from_secs(4)); + backoff = next_backoff(backoff); + assert_eq!(backoff, Duration::from_secs(5)); + backoff = next_backoff(backoff); + assert_eq!(backoff, Duration::from_secs(5)); + } +} diff --git a/tests/integration/features/cli_runs.feature b/tests/integration/features/cli_runs.feature index 5afa3e40..da8e0aa1 100644 --- a/tests/integration/features/cli_runs.feature +++ b/tests/integration/features/cli_runs.feature @@ -38,3 +38,11 @@ Feature: CLI Run Commands And I run "tower run" via CLI Then the output should show "First log before run completes" And the output should show "Second log after run completes" + + Scenario: CLI apps logs follow should stream logs and drain after completion + Given I have a simple hello world application named "app-logs-after-completion" + When I run "tower deploy --create" via CLI + And I run "tower run --detached" via CLI and capture run number + And I run "tower apps logs --follow {app_name}#{run_number}" via CLI using created app name and run number + Then the output should show "First log before run completes" + And the output should show "Second log after run completes" diff --git a/tests/integration/features/steps/cli_steps.py b/tests/integration/features/steps/cli_steps.py index 24a510ec..a491e491 100644 --- a/tests/integration/features/steps/cli_steps.py +++ b/tests/integration/features/steps/cli_steps.py @@ -6,6 +6,7 @@ import shutil import json import shlex +import re from datetime import datetime from pathlib import Path from behave import given, when, then @@ -55,6 +56,37 @@ def step_run_cli_command(context, command): raise +@step('I run "{command}" via CLI using created app name') +def step_run_cli_command_with_app_name(context, command): + """Run a Tower CLI command with the generated app name injected.""" + if not hasattr(context, "app_name"): + raise AssertionError("Expected context.app_name to be set by app setup step") + formatted = command.format(app_name=context.app_name) + step_run_cli_command(context, formatted) + + +@step('I run "{command}" via CLI and capture run number') +def step_run_cli_command_capture_run_number(context, command): + """Run a Tower CLI command and capture the run number from its output.""" + step_run_cli_command(context, command) + output = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", context.cli_output) + match = re.search(r"Run #(?P\d+)", output) + if not match: + raise AssertionError(f"Expected run number in output, got: {output}") + context.run_number = match.group("number") + + +@step('I run "{command}" via CLI using created app name and run number') +def step_run_cli_command_with_app_name_and_run(context, command): + """Run a Tower CLI command with the generated app name and run number injected.""" + if not hasattr(context, "app_name"): + raise AssertionError("Expected context.app_name to be set by app setup step") + if not hasattr(context, "run_number"): + raise AssertionError("Expected context.run_number to be set by run step") + formatted = command.format(app_name=context.app_name, run_number=context.run_number) + step_run_cli_command(context, formatted) + + @step("timestamps should be yellow colored") def step_timestamps_should_be_yellow(context): """Verify timestamps are colored yellow (ANSI code 33)""" diff --git a/tests/integration/features/steps/mcp_steps.py b/tests/integration/features/steps/mcp_steps.py index 9e8090c2..f0b217f3 100644 --- a/tests/integration/features/steps/mcp_steps.py +++ b/tests/integration/features/steps/mcp_steps.py @@ -127,6 +127,7 @@ def create_towerfile( """Create a Towerfile for testing - pure function with no side effects beyond file creation""" app_name = unique_app_name(context, app_name, force_new=True) + context.app_name = app_name template_dir = Path(__file__).parents[2] / "templates" From 585f6d3a0f524a9b9bcd6ba1ff3fe041cfc3d107 Mon Sep 17 00:00:00 2001 From: Burak Dede Date: Thu, 15 Jan 2026 16:52:59 +0100 Subject: [PATCH 2/5] fix(cli): improve follow retry handling --- crates/tower-cmd/src/apps.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/tower-cmd/src/apps.rs b/crates/tower-cmd/src/apps.rs index 148e9298..b8de6314 100644 --- a/crates/tower-cmd/src/apps.rs +++ b/crates/tower-cmd/src/apps.rs @@ -263,6 +263,9 @@ async fn follow_logs(config: Config, name: String, seq: i64) { }, Err(err) => { output::error(&format!("Failed to stream run logs: {:?}", err)); + sleep(backoff).await; + backoff = next_backoff(backoff); + continue; } } @@ -364,6 +367,9 @@ fn monitor_run_completion( Err(_) => { failures += 1; if failures >= 5 { + output::error( + "Failed to monitor run completion after repeated errors", + ); return; } } From 8cc27e2c650c230bcd1785cf53757d64cd27e1b9 Mon Sep 17 00:00:00 2001 From: Burak Dede Date: Thu, 15 Jan 2026 17:01:27 +0100 Subject: [PATCH 3/5] refactor: reset follow backoff and improve log errors --- crates/tower-cmd/src/api.rs | 11 +++++++++ crates/tower-cmd/src/apps.rs | 47 ++++++++++++++++++++---------------- 2 files changed, 37 insertions(+), 21 deletions(-) diff --git a/crates/tower-cmd/src/api.rs b/crates/tower-cmd/src/api.rs index 8a49b4a3..ab977e11 100644 --- a/crates/tower-cmd/src/api.rs +++ b/crates/tower-cmd/src/api.rs @@ -367,6 +367,17 @@ pub enum LogStreamError { Unknown, } +impl std::fmt::Display for LogStreamError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + LogStreamError::Reqwest(err) => write!(f, "{err}"), + LogStreamError::Unknown => write!(f, "unknown log stream error"), + } + } +} + +impl std::error::Error for LogStreamError {} + impl From for LogStreamError { fn from(err: reqwest_eventsource::CannotCloneRequestError) -> Self { debug!("Failed to clone request {:?}", err); diff --git a/crates/tower-cmd/src/apps.rs b/crates/tower-cmd/src/apps.rs index b8de6314..64a4190a 100644 --- a/crates/tower-cmd/src/apps.rs +++ b/crates/tower-cmd/src/apps.rs @@ -227,9 +227,13 @@ fn extract_app_name(subcmd: &str, cmd: Option<(&str, &ArgMatches)>) -> String { output::die(&line); } +const FOLLOW_BACKOFF_INITIAL: Duration = Duration::from_millis(500); +const FOLLOW_BACKOFF_MAX: Duration = Duration::from_secs(5); +const LOG_DRAIN_DURATION: Duration = Duration::from_secs(5); + async fn follow_logs(config: Config, name: String, seq: i64) { let enable_ctrl_c = !output::get_output_mode().is_mcp(); - let mut backoff = Duration::from_millis(500); + let mut backoff = FOLLOW_BACKOFF_INITIAL; loop { let run = match api::describe_run(&config, &name, seq).await { @@ -248,21 +252,24 @@ async fn follow_logs(config: Config, name: String, seq: i64) { let run_complete = monitor_run_completion(&config, &name, seq); match api::stream_run_logs(&config, &name, seq).await { - Ok(log_stream) => match stream_logs_until_complete( - log_stream, - run_complete, - enable_ctrl_c, - &run.dollar_link, - ) - .await - { - Ok(LogFollowOutcome::Completed) => return, - Ok(LogFollowOutcome::Interrupted) => return, - Ok(LogFollowOutcome::Disconnected) => {} - Err(_) => return, - }, + Ok(log_stream) => { + backoff = FOLLOW_BACKOFF_INITIAL; + match stream_logs_until_complete( + log_stream, + run_complete, + enable_ctrl_c, + &run.dollar_link, + ) + .await + { + Ok(LogFollowOutcome::Completed) => return, + Ok(LogFollowOutcome::Interrupted) => return, + Ok(LogFollowOutcome::Disconnected) => {} + Err(_) => return, + } + } Err(err) => { - output::error(&format!("Failed to stream run logs: {:?}", err)); + output::error(&format!("Failed to stream run logs: {}", err)); sleep(backoff).await; backoff = next_backoff(backoff); continue; @@ -283,10 +290,9 @@ async fn follow_logs(config: Config, name: String, seq: i64) { } fn next_backoff(current: Duration) -> Duration { - let max = Duration::from_secs(5); - let next = current.checked_mul(2).unwrap_or(max); - if next > max { - max + let next = current.checked_mul(2).unwrap_or(FOLLOW_BACKOFF_MAX); + if next > FOLLOW_BACKOFF_MAX { + FOLLOW_BACKOFF_MAX } else { next } @@ -334,8 +340,7 @@ async fn stream_logs_until_complete( } async fn drain_remaining_logs(mut log_stream: tokio::sync::mpsc::Receiver) { - let drain_duration = Duration::from_secs(5); - let _ = tokio::time::timeout(drain_duration, async { + let _ = tokio::time::timeout(LOG_DRAIN_DURATION, async { while let Some(event) = log_stream.recv().await { if let api::LogStreamEvent::EventLog(log) = event { output::remote_log_event(&log); From dc4752695ed8ce027958009d72a60ca9899887ed Mon Sep 17 00:00:00 2001 From: Burak Dede Date: Thu, 15 Jan 2026 17:11:42 +0100 Subject: [PATCH 4/5] reset monitor failures and cancel follow watchers --- crates/tower-cmd/src/apps.rs | 75 +++++++++++++++++++++++++----------- 1 file changed, 53 insertions(+), 22 deletions(-) diff --git a/crates/tower-cmd/src/apps.rs b/crates/tower-cmd/src/apps.rs index 64a4190a..4c74d1b1 100644 --- a/crates/tower-cmd/src/apps.rs +++ b/crates/tower-cmd/src/apps.rs @@ -234,6 +234,7 @@ const LOG_DRAIN_DURATION: Duration = Duration::from_secs(5); async fn follow_logs(config: Config, name: String, seq: i64) { let enable_ctrl_c = !output::get_output_mode().is_mcp(); let mut backoff = FOLLOW_BACKOFF_INITIAL; + let mut cancel_monitor: Option> = None; loop { let run = match api::describe_run(&config, &name, seq).await { @@ -250,9 +251,16 @@ async fn follow_logs(config: Config, name: String, seq: i64) { return; } - let run_complete = monitor_run_completion(&config, &name, seq); + // Cancel any prior watcher so we don't accumulate pollers after reconnects. + if let Some(cancel) = cancel_monitor.take() { + let _ = cancel.send(()); + } + let (cancel_tx, cancel_rx) = oneshot::channel(); + cancel_monitor = Some(cancel_tx); + let run_complete = monitor_run_completion(&config, &name, seq, cancel_rx); match api::stream_run_logs(&config, &name, seq).await { Ok(log_stream) => { + // Reset after a successful connection so transient drops recover quickly. backoff = FOLLOW_BACKOFF_INITIAL; match stream_logs_until_complete( log_stream, @@ -262,10 +270,25 @@ async fn follow_logs(config: Config, name: String, seq: i64) { ) .await { - Ok(LogFollowOutcome::Completed) => return, - Ok(LogFollowOutcome::Interrupted) => return, + Ok(LogFollowOutcome::Completed) => { + if let Some(cancel) = cancel_monitor.take() { + let _ = cancel.send(()); + } + return; + } + Ok(LogFollowOutcome::Interrupted) => { + if let Some(cancel) = cancel_monitor.take() { + let _ = cancel.send(()); + } + return; + } Ok(LogFollowOutcome::Disconnected) => {} - Err(_) => return, + Err(_) => { + if let Some(cancel) = cancel_monitor.take() { + let _ = cancel.send(()); + } + return; + } } } Err(err) => { @@ -354,6 +377,7 @@ fn monitor_run_completion( config: &Config, app_name: &str, seq: i64, + mut cancel: oneshot::Receiver<()>, ) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); let config_clone = config.clone(); @@ -362,22 +386,26 @@ fn monitor_run_completion( tokio::spawn(async move { let mut failures = 0u32; loop { - match api::describe_run(&config_clone, &app_name, seq).await { - Ok(res) => { - if is_run_finished(&res.run) { - let _ = tx.send(res.run); - return; + tokio::select! { + _ = &mut cancel => return, + result = api::describe_run(&config_clone, &app_name, seq) => match result { + Ok(res) => { + failures = 0; + if is_run_finished(&res.run) { + let _ = tx.send(res.run); + return; + } } - } - Err(_) => { - failures += 1; - if failures >= 5 { - output::error( - "Failed to monitor run completion after repeated errors", - ); - return; + Err(_) => { + failures += 1; + if failures >= 5 { + output::error( + "Failed to monitor run completion after repeated errors", + ); + return; + } } - } + }, } sleep(Duration::from_millis(500)).await; } @@ -400,7 +428,10 @@ fn is_run_finished(run: &Run) -> bool { #[cfg(test)] mod tests { - use super::{apps_cmd, next_backoff, stream_logs_until_complete, LogFollowOutcome}; + use super::{ + apps_cmd, next_backoff, stream_logs_until_complete, LogFollowOutcome, + FOLLOW_BACKOFF_INITIAL, FOLLOW_BACKOFF_MAX, + }; use super::is_run_finished; use tokio::sync::{mpsc, oneshot}; use tokio::time::Duration; @@ -492,7 +523,7 @@ mod tests { #[test] fn backoff_grows_and_caps() { - let mut backoff = Duration::from_millis(500); + let mut backoff = FOLLOW_BACKOFF_INITIAL; backoff = next_backoff(backoff); assert_eq!(backoff, Duration::from_secs(1)); backoff = next_backoff(backoff); @@ -500,8 +531,8 @@ mod tests { backoff = next_backoff(backoff); assert_eq!(backoff, Duration::from_secs(4)); backoff = next_backoff(backoff); - assert_eq!(backoff, Duration::from_secs(5)); + assert_eq!(backoff, FOLLOW_BACKOFF_MAX); backoff = next_backoff(backoff); - assert_eq!(backoff, Duration::from_secs(5)); + assert_eq!(backoff, FOLLOW_BACKOFF_MAX); } } From 518721df77d5e18266c234ae2902cca082c2ac3b Mon Sep 17 00:00:00 2001 From: Burak Dede Date: Thu, 15 Jan 2026 19:45:58 +0100 Subject: [PATCH 5/5] harden apps logs follow and add regression tests for - deduplicate log lines across reconnects / might be an issue - add unit test for deduplication and out of order logs --- crates/tower-cmd/src/api.rs | 28 ++++- crates/tower-cmd/src/apps.rs | 110 ++++++++++++++++---- tests/integration/features/cli_runs.feature | 7 ++ tests/mock-api-server/main.py | 19 +++- 4 files changed, 139 insertions(+), 25 deletions(-) diff --git a/crates/tower-cmd/src/api.rs b/crates/tower-cmd/src/api.rs index ab977e11..772bfb21 100644 --- a/crates/tower-cmd/src/api.rs +++ b/crates/tower-cmd/src/api.rs @@ -410,10 +410,30 @@ async fn drain_run_logs_stream(mut source: EventSource, tx: mpsc::Sender { let event_warning = serde_json::from_str(&message.data); - if let Ok(event) = event_warning { - tx.send(LogStreamEvent::EventWarning(event)).await.ok(); - } else { - debug!("Failed to parse warning message: {:?}", message.data); + match event_warning { + Ok(event) => { + tx.send(LogStreamEvent::EventWarning(event)).await.ok(); + } + Err(err) => { + let warning_data = serde_json::from_str(&message.data); + match warning_data { + Ok(data) => { + let event = tower_api::models::EventWarning { + data, + event: tower_api::models::event_warning::Event::Warning, + id: None, + retry: None, + }; + tx.send(LogStreamEvent::EventWarning(event)).await.ok(); + } + Err(_) => { + debug!( + "Failed to parse warning message: {:?}. Error: {}", + message.data, err + ); + } + } + } } } _ => { diff --git a/crates/tower-cmd/src/apps.rs b/crates/tower-cmd/src/apps.rs index 4c74d1b1..fa759528 100644 --- a/crates/tower-cmd/src/apps.rs +++ b/crates/tower-cmd/src/apps.rs @@ -3,7 +3,7 @@ use config::Config; use tokio::sync::oneshot; use tokio::time::{sleep, Duration}; -use tower_api::models::Run; +use tower_api::models::{Run, RunLogLine}; use crate::{api, output}; @@ -235,6 +235,7 @@ async fn follow_logs(config: Config, name: String, seq: i64) { let enable_ctrl_c = !output::get_output_mode().is_mcp(); let mut backoff = FOLLOW_BACKOFF_INITIAL; let mut cancel_monitor: Option> = None; + let mut last_line_num: Option = None; loop { let run = match api::describe_run(&config, &name, seq).await { @@ -245,7 +246,7 @@ async fn follow_logs(config: Config, name: String, seq: i64) { if is_run_finished(&run) { if let Ok(resp) = api::describe_run_logs(&config, &name, seq).await { for line in resp.log_lines { - output::remote_log_event(&line); + emit_log_if_new(&line, &mut last_line_num); } } return; @@ -267,6 +268,7 @@ async fn follow_logs(config: Config, name: String, seq: i64) { run_complete, enable_ctrl_c, &run.dollar_link, + &mut last_line_num, ) .await { @@ -292,6 +294,10 @@ async fn follow_logs(config: Config, name: String, seq: i64) { } } Err(err) => { + if is_fatal_stream_error(&err) { + output::error(&format!("Failed to stream run logs: {}", err)); + return; + } output::error(&format!("Failed to stream run logs: {}", err)); sleep(backoff).await; backoff = next_backoff(backoff); @@ -332,20 +338,23 @@ async fn stream_logs_until_complete( mut run_complete: oneshot::Receiver, enable_ctrl_c: bool, run_link: &str, + last_line_num: &mut Option, ) -> Result { loop { tokio::select! { event = log_stream.recv() => match event { Some(api::LogStreamEvent::EventLog(log)) => { - output::remote_log_event(&log); + emit_log_if_new(&log, last_line_num); }, + Some(api::LogStreamEvent::EventWarning(warning)) => { + output::write(&format!("Warning: {}\n", warning.data.content)); + } None => return Ok(LogFollowOutcome::Disconnected), - _ => {}, }, res = &mut run_complete => { match res { Ok(_) => { - drain_remaining_logs(log_stream).await; + drain_remaining_logs(log_stream, last_line_num).await; return Ok(LogFollowOutcome::Completed); } // If monitoring failed, keep following and let the caller retry. @@ -362,17 +371,50 @@ async fn stream_logs_until_complete( } } -async fn drain_remaining_logs(mut log_stream: tokio::sync::mpsc::Receiver) { +async fn drain_remaining_logs( + mut log_stream: tokio::sync::mpsc::Receiver, + last_line_num: &mut Option, +) { let _ = tokio::time::timeout(LOG_DRAIN_DURATION, async { while let Some(event) = log_stream.recv().await { - if let api::LogStreamEvent::EventLog(log) = event { - output::remote_log_event(&log); + match event { + api::LogStreamEvent::EventLog(log) => { + emit_log_if_new(&log, last_line_num); + } + api::LogStreamEvent::EventWarning(warning) => { + output::write(&format!("Warning: {}\n", warning.data.content)); + } } } }) .await; } +fn emit_log_if_new(log: &RunLogLine, last_line_num: &mut Option) { + if should_emit_line(last_line_num, log.line_num) { + output::remote_log_event(log); + } +} + +fn should_emit_line(last_line_num: &mut Option, line_num: i64) -> bool { + if last_line_num.map_or(true, |last| line_num > last) { + *last_line_num = Some(line_num); + true + } else { + false + } +} + +fn is_fatal_stream_error(err: &api::LogStreamError) -> bool { + match err { + api::LogStreamError::Reqwest(reqwest_err) => reqwest_err + .status() + .map(|status| status.is_client_error() && status.as_u16() != 429) + .unwrap_or(false), + api::LogStreamError::Unknown => false, + } +} + fn monitor_run_completion( config: &Config, app_name: &str, @@ -384,7 +426,7 @@ fn monitor_run_completion( let app_name = app_name.to_string(); tokio::spawn(async move { - let mut failures = 0u32; + let mut failures = 0; loop { tokio::select! { _ = &mut cancel => return, @@ -429,8 +471,8 @@ fn is_run_finished(run: &Run) -> bool { #[cfg(test)] mod tests { use super::{ - apps_cmd, next_backoff, stream_logs_until_complete, LogFollowOutcome, - FOLLOW_BACKOFF_INITIAL, FOLLOW_BACKOFF_MAX, + apps_cmd, next_backoff, should_emit_line, stream_logs_until_complete, + LogFollowOutcome, FOLLOW_BACKOFF_INITIAL, FOLLOW_BACKOFF_MAX, }; use super::is_run_finished; use tokio::sync::{mpsc, oneshot}; @@ -439,7 +481,7 @@ mod tests { use tower_api::models::Run; #[test] - fn logs_follow_flag_is_parsed() { + fn test_follow_flag_parsing() { let matches = apps_cmd() .try_get_matches_from(["apps", "logs", "--follow", "hello-world#11"]) .unwrap(); @@ -454,7 +496,7 @@ mod tests { } #[test] - fn run_status_terminality_is_explicit() { + fn test_terminal_statuses_explicit() { let non_terminal = [Status::Scheduled, Status::Pending, Status::Running]; for status in non_terminal { let run = Run { @@ -480,7 +522,7 @@ mod tests { } #[test] - fn run_status_variants_are_exhaustive() { + fn test_status_variants_exhaustive() { let status = Status::Scheduled; match status { Status::Scheduled => {} @@ -494,9 +536,10 @@ mod tests { } #[tokio::test] - async fn stream_logs_completes_on_run_completion() { + async fn test_stream_completion_on_run_finish() { let (tx, rx) = mpsc::channel(1); let (done_tx, done_rx) = oneshot::channel(); + let mut last_line_num = None; let done_task = tokio::spawn(async move { let _ = done_tx.send(Run::default()); @@ -504,25 +547,26 @@ mod tests { drop(tx); }); - let res = stream_logs_until_complete(rx, done_rx, false, "link").await; + let res = stream_logs_until_complete(rx, done_rx, false, "link", &mut last_line_num).await; done_task.await.unwrap(); assert!(matches!(res, Ok(LogFollowOutcome::Completed))); } #[tokio::test] - async fn stream_logs_returns_disconnected_on_closed_stream() { + async fn test_stream_disconnection_on_close() { let (tx, rx) = mpsc::channel(1); drop(tx); let (_done_tx, done_rx) = oneshot::channel::(); + let mut last_line_num = None; - let res = stream_logs_until_complete(rx, done_rx, false, "link").await; + let res = stream_logs_until_complete(rx, done_rx, false, "link", &mut last_line_num).await; assert!(matches!(res, Ok(LogFollowOutcome::Disconnected))); } #[test] - fn backoff_grows_and_caps() { + fn test_backoff_growth_and_cap() { let mut backoff = FOLLOW_BACKOFF_INITIAL; backoff = next_backoff(backoff); assert_eq!(backoff, Duration::from_secs(1)); @@ -535,4 +579,32 @@ mod tests { backoff = next_backoff(backoff); assert_eq!(backoff, FOLLOW_BACKOFF_MAX); } + + #[test] + fn test_duplicate_line_filtering() { + let mut last_line_num = None; + assert!(should_emit_line(&mut last_line_num, 1)); + assert_eq!(last_line_num, Some(1)); + assert!(!should_emit_line(&mut last_line_num, 1)); + assert_eq!(last_line_num, Some(1)); + assert!(!should_emit_line(&mut last_line_num, 0)); + assert_eq!(last_line_num, Some(1)); + assert!(should_emit_line(&mut last_line_num, 2)); + assert_eq!(last_line_num, Some(2)); + assert!(should_emit_line(&mut last_line_num, 10)); + assert_eq!(last_line_num, Some(10)); + } + + #[test] + fn test_out_of_order_log_handling() { + let mut last_line_num = None; + assert!(should_emit_line(&mut last_line_num, 1)); + assert_eq!(last_line_num, Some(1)); + assert!(should_emit_line(&mut last_line_num, 3)); + assert_eq!(last_line_num, Some(3)); + assert!(!should_emit_line(&mut last_line_num, 2)); + assert_eq!(last_line_num, Some(3)); + assert!(should_emit_line(&mut last_line_num, 4)); + assert_eq!(last_line_num, Some(4)); + } } diff --git a/tests/integration/features/cli_runs.feature b/tests/integration/features/cli_runs.feature index da8e0aa1..ede7b436 100644 --- a/tests/integration/features/cli_runs.feature +++ b/tests/integration/features/cli_runs.feature @@ -46,3 +46,10 @@ Feature: CLI Run Commands And I run "tower apps logs --follow {app_name}#{run_number}" via CLI using created app name and run number Then the output should show "First log before run completes" And the output should show "Second log after run completes" + + Scenario: CLI apps logs follow should display warnings + Given I have a simple hello world application named "app-logs-warning" + When I run "tower deploy --create" via CLI + And I run "tower run --detached" via CLI and capture run number + And I run "tower apps logs --follow {app_name}#{run_number}" via CLI using created app name and run number + Then the output should show "Warning: Rate limit approaching" diff --git a/tests/mock-api-server/main.py b/tests/mock-api-server/main.py index cf949e70..31da247d 100644 --- a/tests/mock-api-server/main.py +++ b/tests/mock-api-server/main.py @@ -272,7 +272,9 @@ async def describe_run(name: str, seq: int): # For logs-after-completion test apps, complete quickly to test log draining # Use 1 second so CLI has time to start streaming before completion - completion_threshold = 1.0 if "logs-after-completion" in name else 5.0 + completion_threshold = ( + 1.0 if "logs-after-completion" in name or "logs-warning" in name else 5.0 + ) if elapsed > completion_threshold: run_data["status"] = "exited" @@ -490,6 +492,10 @@ def make_log_data(seq: int, line_num: int, content: str, timestamp: str): def make_log_event(seq: int, line_num: int, content: str, timestamp: str): return f"event: log\ndata: {json.dumps(make_log_data(seq, line_num, content, timestamp))}\n\n" +def make_warning_event(content: str, timestamp: str): + data = {"data": {"content": content, "reported_at": timestamp}, "event": "warning"} + return f"event: warning\ndata: {json.dumps(data)}\n\n" + @app.get("/v1/apps/{name}/runs/{seq}/logs") async def describe_run_logs(name: str, seq: int): @@ -518,6 +524,13 @@ async def generate_logs_after_completion_test_stream(seq: int): seq, 2, "Second log after run completes", "2025-08-22T12:00:01Z" ) +async def generate_warning_log_stream(seq: int): + """Stream a warning and a couple of logs, then finish.""" + yield make_warning_event("Rate limit approaching", "2025-08-22T12:00:00Z") + yield make_log_event(seq, 1, "Warning stream log 1", "2025-08-22T12:00:00Z") + await asyncio.sleep(1.2) + yield make_log_event(seq, 2, "Warning stream log 2", "2025-08-22T12:00:01Z") + async def generate_normal_log_stream(seq: int): """Normal log stream for regular tests.""" @@ -533,7 +546,9 @@ async def stream_run_logs(name: str, seq: int): if name not in mock_apps_db: raise HTTPException(status_code=404, detail=f"App '{name}' not found") - if "logs-after-completion" in name: + if "logs-warning" in name: + stream = generate_warning_log_stream(seq) + elif "logs-after-completion" in name: stream = generate_logs_after_completion_test_stream(seq) else: stream = generate_normal_log_stream(seq)