diff --git a/crates/tower-cmd/src/api.rs b/crates/tower-cmd/src/api.rs index 8a49b4a3..772bfb21 100644 --- a/crates/tower-cmd/src/api.rs +++ b/crates/tower-cmd/src/api.rs @@ -367,6 +367,17 @@ pub enum LogStreamError { Unknown, } +impl std::fmt::Display for LogStreamError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + LogStreamError::Reqwest(err) => write!(f, "{err}"), + LogStreamError::Unknown => write!(f, "unknown log stream error"), + } + } +} + +impl std::error::Error for LogStreamError {} + impl From for LogStreamError { fn from(err: reqwest_eventsource::CannotCloneRequestError) -> Self { debug!("Failed to clone request {:?}", err); @@ -399,10 +410,30 @@ async fn drain_run_logs_stream(mut source: EventSource, tx: mpsc::Sender { let event_warning = serde_json::from_str(&message.data); - if let Ok(event) = event_warning { - tx.send(LogStreamEvent::EventWarning(event)).await.ok(); - } else { - debug!("Failed to parse warning message: {:?}", message.data); + match event_warning { + Ok(event) => { + tx.send(LogStreamEvent::EventWarning(event)).await.ok(); + } + Err(err) => { + let warning_data = serde_json::from_str(&message.data); + match warning_data { + Ok(data) => { + let event = tower_api::models::EventWarning { + data, + event: tower_api::models::event_warning::Event::Warning, + id: None, + retry: None, + }; + tx.send(LogStreamEvent::EventWarning(event)).await.ok(); + } + Err(_) => { + debug!( + "Failed to parse warning message: {:?}. Error: {}", + message.data, err + ); + } + } + } } } _ => { diff --git a/crates/tower-cmd/src/apps.rs b/crates/tower-cmd/src/apps.rs index 668d5f79..fa759528 100644 --- a/crates/tower-cmd/src/apps.rs +++ b/crates/tower-cmd/src/apps.rs @@ -1,7 +1,9 @@ use clap::{value_parser, Arg, ArgMatches, Command}; use config::Config; +use tokio::sync::oneshot; +use tokio::time::{sleep, Duration}; -use tower_api::models::Run; +use tower_api::models::{Run, RunLogLine}; use crate::{api, output}; @@ -17,6 +19,13 @@ pub fn apps_cmd() -> Command { ) .subcommand( Command::new("logs") + .arg( + Arg::new("follow") + .short('f') + .long("follow") + .help("Follow logs in real time") + .action(clap::ArgAction::SetTrue), + ) .allow_external_subcommands(true) .about("Get the logs from a previous Tower app run"), ) @@ -48,6 +57,12 @@ pub fn apps_cmd() -> Command { pub async fn do_logs(config: Config, cmd: &ArgMatches) { let (name, seq) = extract_app_name_and_run("logs", cmd.subcommand()); + let follow = cmd.get_one::("follow").copied().unwrap_or(false); + + if follow { + follow_logs(config, name, seq).await; + return; + } if let Ok(resp) = api::describe_run_logs(&config, &name, seq).await { for line in resp.log_lines { @@ -211,3 +226,385 @@ fn extract_app_name(subcmd: &str, cmd: Option<(&str, &ArgMatches)>) -> String { ); output::die(&line); } + +const FOLLOW_BACKOFF_INITIAL: Duration = Duration::from_millis(500); +const FOLLOW_BACKOFF_MAX: Duration = Duration::from_secs(5); +const LOG_DRAIN_DURATION: Duration = Duration::from_secs(5); + +async fn follow_logs(config: Config, name: String, seq: i64) { + let enable_ctrl_c = !output::get_output_mode().is_mcp(); + let mut backoff = FOLLOW_BACKOFF_INITIAL; + let mut cancel_monitor: Option> = None; + let mut last_line_num: Option = None; + + loop { + let run = match api::describe_run(&config, &name, seq).await { + Ok(res) => res.run, + Err(err) => output::tower_error_and_die(err, "Fetching run details failed"), + }; + + if is_run_finished(&run) { + if let Ok(resp) = api::describe_run_logs(&config, &name, seq).await { + for line in resp.log_lines { + emit_log_if_new(&line, &mut last_line_num); + } + } + return; + } + + // Cancel any prior watcher so we don't accumulate pollers after reconnects. + if let Some(cancel) = cancel_monitor.take() { + let _ = cancel.send(()); + } + let (cancel_tx, cancel_rx) = oneshot::channel(); + cancel_monitor = Some(cancel_tx); + let run_complete = monitor_run_completion(&config, &name, seq, cancel_rx); + match api::stream_run_logs(&config, &name, seq).await { + Ok(log_stream) => { + // Reset after a successful connection so transient drops recover quickly. + backoff = FOLLOW_BACKOFF_INITIAL; + match stream_logs_until_complete( + log_stream, + run_complete, + enable_ctrl_c, + &run.dollar_link, + &mut last_line_num, + ) + .await + { + Ok(LogFollowOutcome::Completed) => { + if let Some(cancel) = cancel_monitor.take() { + let _ = cancel.send(()); + } + return; + } + Ok(LogFollowOutcome::Interrupted) => { + if let Some(cancel) = cancel_monitor.take() { + let _ = cancel.send(()); + } + return; + } + Ok(LogFollowOutcome::Disconnected) => {} + Err(_) => { + if let Some(cancel) = cancel_monitor.take() { + let _ = cancel.send(()); + } + return; + } + } + } + Err(err) => { + if is_fatal_stream_error(&err) { + output::error(&format!("Failed to stream run logs: {}", err)); + return; + } + output::error(&format!("Failed to stream run logs: {}", err)); + sleep(backoff).await; + backoff = next_backoff(backoff); + continue; + } + } + + let latest = match api::describe_run(&config, &name, seq).await { + Ok(res) => res.run, + Err(err) => output::tower_error_and_die(err, "Fetching run details failed"), + }; + if is_run_finished(&latest) { + return; + } + + sleep(backoff).await; + backoff = next_backoff(backoff); + } +} + +fn next_backoff(current: Duration) -> Duration { + let next = current.checked_mul(2).unwrap_or(FOLLOW_BACKOFF_MAX); + if next > FOLLOW_BACKOFF_MAX { + FOLLOW_BACKOFF_MAX + } else { + next + } +} + +enum LogFollowOutcome { + Completed, + Disconnected, + Interrupted, +} + +async fn stream_logs_until_complete( + mut log_stream: tokio::sync::mpsc::Receiver, + mut run_complete: oneshot::Receiver, + enable_ctrl_c: bool, + run_link: &str, + last_line_num: &mut Option, +) -> Result { + loop { + tokio::select! { + event = log_stream.recv() => match event { + Some(api::LogStreamEvent::EventLog(log)) => { + emit_log_if_new(&log, last_line_num); + }, + Some(api::LogStreamEvent::EventWarning(warning)) => { + output::write(&format!("Warning: {}\n", warning.data.content)); + } + None => return Ok(LogFollowOutcome::Disconnected), + }, + res = &mut run_complete => { + match res { + Ok(_) => { + drain_remaining_logs(log_stream, last_line_num).await; + return Ok(LogFollowOutcome::Completed); + } + // If monitoring failed, keep following and let the caller retry. + Err(_) => return Ok(LogFollowOutcome::Disconnected), + } + }, + _ = tokio::signal::ctrl_c(), if enable_ctrl_c => { + output::write("Received Ctrl+C, stopping log streaming...\n"); + output::write("Note: The run will continue in Tower cloud\n"); + output::write(&format!(" See more: {}\n", run_link)); + return Ok(LogFollowOutcome::Interrupted); + }, + } + } +} + +async fn drain_remaining_logs( + mut log_stream: tokio::sync::mpsc::Receiver, + last_line_num: &mut Option, +) { + let _ = tokio::time::timeout(LOG_DRAIN_DURATION, async { + while let Some(event) = log_stream.recv().await { + match event { + api::LogStreamEvent::EventLog(log) => { + emit_log_if_new(&log, last_line_num); + } + api::LogStreamEvent::EventWarning(warning) => { + output::write(&format!("Warning: {}\n", warning.data.content)); + } + } + } + }) + .await; +} + +fn emit_log_if_new(log: &RunLogLine, last_line_num: &mut Option) { + if should_emit_line(last_line_num, log.line_num) { + output::remote_log_event(log); + } +} + +fn should_emit_line(last_line_num: &mut Option, line_num: i64) -> bool { + if last_line_num.map_or(true, |last| line_num > last) { + *last_line_num = Some(line_num); + true + } else { + false + } +} + +fn is_fatal_stream_error(err: &api::LogStreamError) -> bool { + match err { + api::LogStreamError::Reqwest(reqwest_err) => reqwest_err + .status() + .map(|status| status.is_client_error() && status.as_u16() != 429) + .unwrap_or(false), + api::LogStreamError::Unknown => false, + } +} + +fn monitor_run_completion( + config: &Config, + app_name: &str, + seq: i64, + mut cancel: oneshot::Receiver<()>, +) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + let config_clone = config.clone(); + let app_name = app_name.to_string(); + + tokio::spawn(async move { + let mut failures = 0; + loop { + tokio::select! { + _ = &mut cancel => return, + result = api::describe_run(&config_clone, &app_name, seq) => match result { + Ok(res) => { + failures = 0; + if is_run_finished(&res.run) { + let _ = tx.send(res.run); + return; + } + } + Err(_) => { + failures += 1; + if failures >= 5 { + output::error( + "Failed to monitor run completion after repeated errors", + ); + return; + } + } + }, + } + sleep(Duration::from_millis(500)).await; + } + }); + + rx +} + +fn is_run_finished(run: &Run) -> bool { + match run.status { + // Be explicit about terminal states so new non-terminal statuses + // don't cause us to stop following logs too early. + tower_api::models::run::Status::Crashed + | tower_api::models::run::Status::Errored + | tower_api::models::run::Status::Exited + | tower_api::models::run::Status::Cancelled => true, + _ => false, + } +} + +#[cfg(test)] +mod tests { + use super::{ + apps_cmd, next_backoff, should_emit_line, stream_logs_until_complete, + LogFollowOutcome, FOLLOW_BACKOFF_INITIAL, FOLLOW_BACKOFF_MAX, + }; + use super::is_run_finished; + use tokio::sync::{mpsc, oneshot}; + use tokio::time::Duration; + use tower_api::models::run::Status; + use tower_api::models::Run; + + #[test] + fn test_follow_flag_parsing() { + let matches = apps_cmd() + .try_get_matches_from(["apps", "logs", "--follow", "hello-world#11"]) + .unwrap(); + let (cmd, sub_matches) = matches.subcommand().unwrap(); + + assert_eq!(cmd, "logs"); + assert_eq!(sub_matches.get_one::("follow"), Some(&true)); + assert_eq!( + sub_matches.subcommand().map(|(name, _)| name), + Some("hello-world#11") + ); + } + + #[test] + fn test_terminal_statuses_explicit() { + let non_terminal = [Status::Scheduled, Status::Pending, Status::Running]; + for status in non_terminal { + let run = Run { + status, + ..Default::default() + }; + assert!(!is_run_finished(&run)); + } + + let terminal = [ + Status::Crashed, + Status::Errored, + Status::Exited, + Status::Cancelled, + ]; + for status in terminal { + let run = Run { + status, + ..Default::default() + }; + assert!(is_run_finished(&run)); + } + } + + #[test] + fn test_status_variants_exhaustive() { + let status = Status::Scheduled; + match status { + Status::Scheduled => {} + Status::Pending => {} + Status::Running => {} + Status::Crashed => {} + Status::Errored => {} + Status::Exited => {} + Status::Cancelled => {} + } + } + + #[tokio::test] + async fn test_stream_completion_on_run_finish() { + let (tx, rx) = mpsc::channel(1); + let (done_tx, done_rx) = oneshot::channel(); + let mut last_line_num = None; + + let done_task = tokio::spawn(async move { + let _ = done_tx.send(Run::default()); + tokio::time::sleep(Duration::from_millis(10)).await; + drop(tx); + }); + + let res = stream_logs_until_complete(rx, done_rx, false, "link", &mut last_line_num).await; + done_task.await.unwrap(); + + assert!(matches!(res, Ok(LogFollowOutcome::Completed))); + } + + #[tokio::test] + async fn test_stream_disconnection_on_close() { + let (tx, rx) = mpsc::channel(1); + drop(tx); + let (_done_tx, done_rx) = oneshot::channel::(); + let mut last_line_num = None; + + let res = stream_logs_until_complete(rx, done_rx, false, "link", &mut last_line_num).await; + + assert!(matches!(res, Ok(LogFollowOutcome::Disconnected))); + } + + #[test] + fn test_backoff_growth_and_cap() { + let mut backoff = FOLLOW_BACKOFF_INITIAL; + backoff = next_backoff(backoff); + assert_eq!(backoff, Duration::from_secs(1)); + backoff = next_backoff(backoff); + assert_eq!(backoff, Duration::from_secs(2)); + backoff = next_backoff(backoff); + assert_eq!(backoff, Duration::from_secs(4)); + backoff = next_backoff(backoff); + assert_eq!(backoff, FOLLOW_BACKOFF_MAX); + backoff = next_backoff(backoff); + assert_eq!(backoff, FOLLOW_BACKOFF_MAX); + } + + #[test] + fn test_duplicate_line_filtering() { + let mut last_line_num = None; + assert!(should_emit_line(&mut last_line_num, 1)); + assert_eq!(last_line_num, Some(1)); + assert!(!should_emit_line(&mut last_line_num, 1)); + assert_eq!(last_line_num, Some(1)); + assert!(!should_emit_line(&mut last_line_num, 0)); + assert_eq!(last_line_num, Some(1)); + assert!(should_emit_line(&mut last_line_num, 2)); + assert_eq!(last_line_num, Some(2)); + assert!(should_emit_line(&mut last_line_num, 10)); + assert_eq!(last_line_num, Some(10)); + } + + #[test] + fn test_out_of_order_log_handling() { + let mut last_line_num = None; + assert!(should_emit_line(&mut last_line_num, 1)); + assert_eq!(last_line_num, Some(1)); + assert!(should_emit_line(&mut last_line_num, 3)); + assert_eq!(last_line_num, Some(3)); + assert!(!should_emit_line(&mut last_line_num, 2)); + assert_eq!(last_line_num, Some(3)); + assert!(should_emit_line(&mut last_line_num, 4)); + assert_eq!(last_line_num, Some(4)); + } +} diff --git a/tests/integration/features/cli_runs.feature b/tests/integration/features/cli_runs.feature index 5afa3e40..ede7b436 100644 --- a/tests/integration/features/cli_runs.feature +++ b/tests/integration/features/cli_runs.feature @@ -38,3 +38,18 @@ Feature: CLI Run Commands And I run "tower run" via CLI Then the output should show "First log before run completes" And the output should show "Second log after run completes" + + Scenario: CLI apps logs follow should stream logs and drain after completion + Given I have a simple hello world application named "app-logs-after-completion" + When I run "tower deploy --create" via CLI + And I run "tower run --detached" via CLI and capture run number + And I run "tower apps logs --follow {app_name}#{run_number}" via CLI using created app name and run number + Then the output should show "First log before run completes" + And the output should show "Second log after run completes" + + Scenario: CLI apps logs follow should display warnings + Given I have a simple hello world application named "app-logs-warning" + When I run "tower deploy --create" via CLI + And I run "tower run --detached" via CLI and capture run number + And I run "tower apps logs --follow {app_name}#{run_number}" via CLI using created app name and run number + Then the output should show "Warning: Rate limit approaching" diff --git a/tests/integration/features/steps/cli_steps.py b/tests/integration/features/steps/cli_steps.py index 24a510ec..a491e491 100644 --- a/tests/integration/features/steps/cli_steps.py +++ b/tests/integration/features/steps/cli_steps.py @@ -6,6 +6,7 @@ import shutil import json import shlex +import re from datetime import datetime from pathlib import Path from behave import given, when, then @@ -55,6 +56,37 @@ def step_run_cli_command(context, command): raise +@step('I run "{command}" via CLI using created app name') +def step_run_cli_command_with_app_name(context, command): + """Run a Tower CLI command with the generated app name injected.""" + if not hasattr(context, "app_name"): + raise AssertionError("Expected context.app_name to be set by app setup step") + formatted = command.format(app_name=context.app_name) + step_run_cli_command(context, formatted) + + +@step('I run "{command}" via CLI and capture run number') +def step_run_cli_command_capture_run_number(context, command): + """Run a Tower CLI command and capture the run number from its output.""" + step_run_cli_command(context, command) + output = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", context.cli_output) + match = re.search(r"Run #(?P\d+)", output) + if not match: + raise AssertionError(f"Expected run number in output, got: {output}") + context.run_number = match.group("number") + + +@step('I run "{command}" via CLI using created app name and run number') +def step_run_cli_command_with_app_name_and_run(context, command): + """Run a Tower CLI command with the generated app name and run number injected.""" + if not hasattr(context, "app_name"): + raise AssertionError("Expected context.app_name to be set by app setup step") + if not hasattr(context, "run_number"): + raise AssertionError("Expected context.run_number to be set by run step") + formatted = command.format(app_name=context.app_name, run_number=context.run_number) + step_run_cli_command(context, formatted) + + @step("timestamps should be yellow colored") def step_timestamps_should_be_yellow(context): """Verify timestamps are colored yellow (ANSI code 33)""" diff --git a/tests/integration/features/steps/mcp_steps.py b/tests/integration/features/steps/mcp_steps.py index 9e8090c2..f0b217f3 100644 --- a/tests/integration/features/steps/mcp_steps.py +++ b/tests/integration/features/steps/mcp_steps.py @@ -127,6 +127,7 @@ def create_towerfile( """Create a Towerfile for testing - pure function with no side effects beyond file creation""" app_name = unique_app_name(context, app_name, force_new=True) + context.app_name = app_name template_dir = Path(__file__).parents[2] / "templates" diff --git a/tests/mock-api-server/main.py b/tests/mock-api-server/main.py index cf949e70..31da247d 100644 --- a/tests/mock-api-server/main.py +++ b/tests/mock-api-server/main.py @@ -272,7 +272,9 @@ async def describe_run(name: str, seq: int): # For logs-after-completion test apps, complete quickly to test log draining # Use 1 second so CLI has time to start streaming before completion - completion_threshold = 1.0 if "logs-after-completion" in name else 5.0 + completion_threshold = ( + 1.0 if "logs-after-completion" in name or "logs-warning" in name else 5.0 + ) if elapsed > completion_threshold: run_data["status"] = "exited" @@ -490,6 +492,10 @@ def make_log_data(seq: int, line_num: int, content: str, timestamp: str): def make_log_event(seq: int, line_num: int, content: str, timestamp: str): return f"event: log\ndata: {json.dumps(make_log_data(seq, line_num, content, timestamp))}\n\n" +def make_warning_event(content: str, timestamp: str): + data = {"data": {"content": content, "reported_at": timestamp}, "event": "warning"} + return f"event: warning\ndata: {json.dumps(data)}\n\n" + @app.get("/v1/apps/{name}/runs/{seq}/logs") async def describe_run_logs(name: str, seq: int): @@ -518,6 +524,13 @@ async def generate_logs_after_completion_test_stream(seq: int): seq, 2, "Second log after run completes", "2025-08-22T12:00:01Z" ) +async def generate_warning_log_stream(seq: int): + """Stream a warning and a couple of logs, then finish.""" + yield make_warning_event("Rate limit approaching", "2025-08-22T12:00:00Z") + yield make_log_event(seq, 1, "Warning stream log 1", "2025-08-22T12:00:00Z") + await asyncio.sleep(1.2) + yield make_log_event(seq, 2, "Warning stream log 2", "2025-08-22T12:00:01Z") + async def generate_normal_log_stream(seq: int): """Normal log stream for regular tests.""" @@ -533,7 +546,9 @@ async def stream_run_logs(name: str, seq: int): if name not in mock_apps_db: raise HTTPException(status_code=404, detail=f"App '{name}' not found") - if "logs-after-completion" in name: + if "logs-warning" in name: + stream = generate_warning_log_stream(seq) + elif "logs-after-completion" in name: stream = generate_logs_after_completion_test_stream(seq) else: stream = generate_normal_log_stream(seq)