diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 303bdfd3..45c5afbb 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -111,6 +111,12 @@ func resolveAlias(name string) string { return name } +// CanonicalName resolves an agent alias to its canonical name. +// Returns the name unchanged if it is not an alias. +func CanonicalName(name string) string { + return resolveAlias(name) +} + // Register adds an agent to the registry func Register(a Agent) { registry[a.Name()] = a diff --git a/internal/daemon/ci_poller.go b/internal/daemon/ci_poller.go index d2f981e8..ee6f4e27 100644 --- a/internal/daemon/ci_poller.go +++ b/internal/daemon/ci_poller.go @@ -10,6 +10,7 @@ import ( "log" "os" "os/exec" + "sort" "strings" "sync" "time" @@ -853,20 +854,34 @@ func (p *CIPoller) postBatchResults(batch *storage.CIPRBatch) { } // Set commit status based on job outcomes: - // all succeeded → success - // mixed → failure (some reviews did not complete) - // all failed → error + // all succeeded → success + // all failures are quota skips → success (with note) + // mixed real failures → failure + // all failed (real) → error + quotaSkips := countQuotaFailures(reviews) + realFailures := batch.FailedJobs - quotaSkips statusState := "success" statusDesc := "Review complete" switch { + case batch.CompletedJobs == 0 && realFailures == 0 && quotaSkips > 0: + // All failures are quota skips — not the code's fault + statusDesc = fmt.Sprintf( + "Review complete (%d agent(s) skipped — quota)", + quotaSkips, + ) case batch.CompletedJobs == 0: statusState = "error" statusDesc = "All reviews failed" - case batch.FailedJobs > 0: + case realFailures > 0: statusState = "failure" statusDesc = fmt.Sprintf( "Review complete (%d/%d jobs failed)", - batch.FailedJobs, batch.TotalJobs, + realFailures, batch.TotalJobs, + ) + case quotaSkips > 0: + statusDesc = fmt.Sprintf( + "Review complete (%d agent(s) skipped — quota)", + quotaSkips, ) } if err := p.callSetCommitStatus(batch.GithubRepo, batch.HeadSHA, statusState, statusDesc); err != nil { @@ -1120,11 +1135,15 @@ Rules: for i, r := range reviews { fmt.Fprintf(&b, "---\n### Review %d: Agent=%s, Type=%s", i+1, r.Agent, r.ReviewType) - if r.Status == "failed" { + if isQuotaFailure(r) { + b.WriteString(" [SKIPPED]") + } else if r.Status == "failed" { b.WriteString(" [FAILED]") } b.WriteString("\n") - if r.Output != "" { + if isQuotaFailure(r) { + b.WriteString("(review skipped — agent quota exhausted)") + } else if r.Output != "" { output := r.Output if len(output) > maxPerReview { output = output[:maxPerReview] + "\n\n...(truncated)" @@ -1167,6 +1186,10 @@ func formatSynthesizedComment(output string, reviews []storage.BatchReviewResult fmt.Fprintf(&b, "\n\n---\n*Synthesized from %d reviews (agents: %s | types: %s)*\n", len(reviews), strings.Join(agents, ", "), strings.Join(types, ", ")) + if note := skippedAgentNote(reviews); note != "" { + b.WriteString(note) + } + return b.String() } @@ -1178,9 +1201,15 @@ func formatRawBatchComment(reviews []storage.BatchReviewResult, headSHA string) b.WriteString("> Synthesis unavailable. Showing raw review outputs.\n\n") for _, r := range reviews { - summary := fmt.Sprintf("Agent: %s | Type: %s | Status: %s", r.Agent, r.ReviewType, r.Status) + status := r.Status + if isQuotaFailure(r) { + status = "skipped (quota)" + } + summary := fmt.Sprintf("Agent: %s | Type: %s | Status: %s", r.Agent, r.ReviewType, status) fmt.Fprintf(&b, "
\n%s\n\n", summary) - if r.Status == "failed" { + if isQuotaFailure(r) { + b.WriteString("Review skipped — agent quota exhausted.\n") + } else if r.Status == "failed" { b.WriteString("**Error:** Review failed. Check daemon logs for details.\n") } else if r.Output != "" { output := r.Output @@ -1195,24 +1224,93 @@ func formatRawBatchComment(reviews []storage.BatchReviewResult, headSHA string) b.WriteString("\n\n
\n\n") } + if note := skippedAgentNote(reviews); note != "" { + b.WriteString(note) + } + return b.String() } // formatAllFailedComment formats a comment when every job in a batch failed. func formatAllFailedComment(reviews []storage.BatchReviewResult, headSHA string) string { + quotaSkips := countQuotaFailures(reviews) + allQuota := len(reviews) > 0 && quotaSkips == len(reviews) + var b strings.Builder - fmt.Fprintf(&b, "## roborev: Review Failed (`%s`)\n\n", shortSHA(headSHA)) - b.WriteString("All review jobs in this batch failed.\n\n") + if allQuota { + fmt.Fprintf(&b, "## roborev: Review Skipped (`%s`)\n\n", shortSHA(headSHA)) + b.WriteString("All review agents were skipped due to quota exhaustion.\n\n") + } else { + fmt.Fprintf(&b, "## roborev: Review Failed (`%s`)\n\n", shortSHA(headSHA)) + b.WriteString("All review jobs in this batch failed.\n\n") + } for _, r := range reviews { - fmt.Fprintf(&b, "- **%s** (%s): failed\n", r.Agent, r.ReviewType) + if isQuotaFailure(r) { + fmt.Fprintf(&b, "- **%s** (%s): skipped (quota)\n", r.Agent, r.ReviewType) + } else { + fmt.Fprintf(&b, "- **%s** (%s): failed\n", r.Agent, r.ReviewType) + } } - b.WriteString("\nCheck daemon logs for error details.") + if !allQuota { + b.WriteString("\nCheck daemon logs for error details.") + } + + if note := skippedAgentNote(reviews); note != "" { + b.WriteString(note) + } return b.String() } +// isQuotaFailure returns true if a batch review's error indicates a quota +// skip rather than a real failure. Matches the prefix set by worker.go. +func isQuotaFailure(r storage.BatchReviewResult) bool { + return r.Status == "failed" && strings.HasPrefix(r.Error, quotaErrorPrefix) +} + +// countQuotaFailures returns the number of reviews that failed due to +// agent quota exhaustion rather than a real error. +func countQuotaFailures(reviews []storage.BatchReviewResult) int { + n := 0 + for _, r := range reviews { + if isQuotaFailure(r) { + n++ + } + } + return n +} + +// skippedAgentNote returns a markdown note listing agents that were +// skipped due to quota exhaustion. Returns "" if none were skipped. +func skippedAgentNote(reviews []storage.BatchReviewResult) string { + agents := make(map[string]struct{}) + for _, r := range reviews { + if isQuotaFailure(r) { + agents[r.Agent] = struct{}{} + } + } + if len(agents) == 0 { + return "" + } + names := make([]string, 0, len(agents)) + for a := range agents { + names = append(names, a) + } + sort.Strings(names) + if len(names) == 1 { + return fmt.Sprintf( + "\n*Note: %s review skipped (agent quota exhausted)*\n", + names[0], + ) + } + return fmt.Sprintf( + "\n*Note: %s reviews skipped (agent quota exhausted)*\n", + strings.Join(names, ", "), + ) +} + // shortSHA returns the first 8 characters of a SHA, or the full string if shorter. func shortSHA(sha string) string { if len(sha) > 8 { diff --git a/internal/daemon/ci_poller_test.go b/internal/daemon/ci_poller_test.go index 99de2110..8b5e95fb 100644 --- a/internal/daemon/ci_poller_test.go +++ b/internal/daemon/ci_poller_test.go @@ -1550,6 +1550,133 @@ func TestCIPollerPostBatchResults_SetsFailureStatusOnMixedOutcome(t *testing.T) } } +func TestCIPollerPostBatchResults_QuotaSkippedNotFailure(t *testing.T) { + h := newCIPollerHarness(t, "git@github.com:acme/api.git") + + // One success, one quota-skipped + batch, jobs := h.seedBatchWithJobs(t, 70, "quota-sha", + jobSpec{Agent: "codex", ReviewType: "security", Status: "done", Output: "No issues found."}, + jobSpec{Agent: "gemini", ReviewType: "security", Status: "failed", Error: quotaErrorPrefix + "gemini quota exhausted"}, + ) + + capturedStatuses := h.CaptureCommitStatuses() + h.CaptureComments() + + // Simulate: job1 succeeded, job2 failed (quota) + h.Poller.handleBatchJobDone(batch, jobs[0].ID, true) + h.Poller.handleBatchJobDone(batch, jobs[1].ID, false) + + if len(*capturedStatuses) != 1 { + t.Fatalf("expected 1 status call, got %d", len(*capturedStatuses)) + } + sc := (*capturedStatuses)[0] + // Quota skip with at least one success → success, not failure + if sc.State != "success" { + t.Errorf("state=%q, want success (quota skip not a failure)", sc.State) + } + if !strings.Contains(sc.Desc, "skipped") { + t.Errorf("desc=%q, expected mention of skipped", sc.Desc) + } +} + +func TestCIPollerPostBatchResults_AllQuotaSkippedIsSuccess(t *testing.T) { + h := newCIPollerHarness(t, "git@github.com:acme/api.git") + + batch, jobs := h.seedBatchWithJobs(t, 71, "all-quota-sha", + jobSpec{Agent: "gemini", ReviewType: "security", Status: "failed", Error: quotaErrorPrefix + "gemini quota exhausted"}, + ) + + capturedStatuses := h.CaptureCommitStatuses() + h.CaptureComments() + + h.Poller.handleBatchJobDone(batch, jobs[0].ID, false) + + if len(*capturedStatuses) != 1 { + t.Fatalf("expected 1 status call, got %d", len(*capturedStatuses)) + } + sc := (*capturedStatuses)[0] + if sc.State != "success" { + t.Errorf("state=%q, want success (all-quota batch is not an error)", sc.State) + } + if !strings.Contains(sc.Desc, "skipped") { + t.Errorf("desc=%q, expected mention of skipped", sc.Desc) + } +} + +func TestCIPollerPostBatchResults_MixedQuotaAndRealFailure(t *testing.T) { + h := newCIPollerHarness(t, "git@github.com:acme/api.git") + + batch, jobs := h.seedBatchWithJobs(t, 72, "mixed-quota-sha", + jobSpec{Agent: "codex", ReviewType: "security", Status: "failed", Error: "timeout"}, + jobSpec{Agent: "gemini", ReviewType: "security", Status: "failed", Error: quotaErrorPrefix + "quota exhausted"}, + ) + + capturedStatuses := h.CaptureCommitStatuses() + h.CaptureComments() + + h.Poller.handleBatchJobDone(batch, jobs[0].ID, false) + h.Poller.handleBatchJobDone(batch, jobs[1].ID, false) + + if len(*capturedStatuses) != 1 { + t.Fatalf("expected 1 status call, got %d", len(*capturedStatuses)) + } + sc := (*capturedStatuses)[0] + // Real failure + quota skip → error (CompletedJobs == 0 and realFailures > 0) + if sc.State != "error" { + t.Errorf("state=%q, want error (real failure present)", sc.State) + } +} + +func TestFormatAllFailedComment_AllQuotaSkipped(t *testing.T) { + reviews := []storage.BatchReviewResult{ + {JobID: 1, Agent: "gemini", ReviewType: "security", Status: "failed", Error: quotaErrorPrefix + "quota exhausted"}, + } + + comment := formatAllFailedComment(reviews, "abc123def456") + + assertContainsAll(t, comment, "comment", + "## roborev: Review Skipped", + "quota exhaustion", + "skipped (quota)", + ) + if strings.Contains(comment, "Check daemon logs") { + t.Error("all-quota comment should not mention daemon logs") + } +} + +func TestFormatRawBatchComment_QuotaSkippedNote(t *testing.T) { + reviews := []storage.BatchReviewResult{ + {JobID: 1, Agent: "codex", ReviewType: "security", Output: "Finding A", Status: "done"}, + {JobID: 2, Agent: "gemini", ReviewType: "security", Status: "failed", Error: quotaErrorPrefix + "quota exhausted"}, + } + + comment := formatRawBatchComment(reviews, "abc123def456") + + assertContainsAll(t, comment, "comment", + "skipped (quota)", + "gemini review skipped", + ) +} + +func TestBuildSynthesisPrompt_QuotaSkippedLabel(t *testing.T) { + reviews := []storage.BatchReviewResult{ + {JobID: 1, Agent: "codex", ReviewType: "security", Output: "No issues.", Status: "done"}, + {JobID: 2, Agent: "gemini", ReviewType: "security", Status: "failed", Error: quotaErrorPrefix + "quota exhausted"}, + } + + prompt := buildSynthesisPrompt(reviews, "") + + assertContainsAll(t, prompt, "prompt", + "[SKIPPED]", + "review skipped", + ) + // Should NOT contain [FAILED] for the quota-skipped review + // Count occurrences of [FAILED] + if strings.Contains(prompt, "[FAILED]") { + t.Error("quota-skipped review should use [SKIPPED], not [FAILED]") + } +} + func TestCIPollerPostBatchResults_SetsErrorStatusOnCommentPostFailure(t *testing.T) { h := newCIPollerHarness(t, "git@github.com:acme/api.git") batch, _ := h.seedBatchWithJobs(t, 63, "post-fail-sha", jobSpec{ diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index 996dd819..022d366d 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -33,11 +33,16 @@ type WorkerPool struct { pendingCancels map[int64]bool // Jobs canceled before registered runningJobsMu sync.Mutex + // Agent cooldowns for quota exhaustion + agentCooldowns map[string]time.Time // agent name -> expiry + agentCooldownsMu sync.RWMutex + // Output capture for tail command outputBuffers *OutputBuffer // Test hooks for deterministic synchronization (nil in production) - testHookAfterSecondCheck func() // Called after second runningJobs check, before second DB lookup + testHookAfterSecondCheck func() // Called after second runningJobs check, before second DB lookup + testHookCooldownLockUpgrade func() // Called between RUnlock and Lock in isAgentCoolingDown } // NewWorkerPool creates a new worker pool @@ -52,6 +57,7 @@ func NewWorkerPool(db *storage.DB, cfgGetter ConfigGetter, numWorkers int, broad stopCh: make(chan struct{}), runningJobs: make(map[int64]context.CancelFunc), pendingCancels: make(map[int64]bool), + agentCooldowns: make(map[string]time.Time), outputBuffers: NewOutputBuffer(512*1024, 4*1024*1024), // 512KB/job, 4MB total } } @@ -280,6 +286,17 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { wp.registerRunningJob(job.ID, cancel) defer wp.unregisterRunningJob(job.ID) + // Skip immediately if the agent is in quota cooldown. + // Resolve alias so "claude" checks cooldown for "claude-code". + canonicalAgent := agent.CanonicalName(job.Agent) + if wp.isAgentCoolingDown(canonicalAgent) { + log.Printf("[%s] Agent %s in cooldown, skipping job %d", + workerID, canonicalAgent, job.ID) + wp.failoverOrFail(workerID, job, canonicalAgent, + fmt.Sprintf("agent %s quota cooldown active", canonicalAgent)) + return + } + // Build the prompt (or use pre-stored prompt for task/compact jobs) var reviewPrompt string var err error @@ -447,6 +464,17 @@ func (wp *WorkerPool) failOrRetryAgent(workerID string, job *storage.ReviewJob, } func (wp *WorkerPool) failOrRetryInner(workerID string, job *storage.ReviewJob, agentName string, errorMsg string, agentError bool) { + // Quota errors skip retries entirely — cool down the agent and + // attempt failover or fail with a quota-prefixed error. + if agentError && isQuotaError(errorMsg) { + dur := parseQuotaCooldown(errorMsg, defaultCooldown) + wp.cooldownAgent(agentName, time.Now().Add(dur)) + log.Printf("[%s] Agent %s quota exhausted, cooldown %v", + workerID, agentName, dur) + wp.failoverOrFail(workerID, job, agentName, errorMsg) + return + } + retried, err := wp.db.RetryJob(job.ID, workerID, maxRetries) if err != nil { log.Printf("[%s] Error retrying job: %v", workerID, err) @@ -468,7 +496,7 @@ func (wp *WorkerPool) failOrRetryInner(workerID string, job *storage.ReviewJob, // Retries exhausted -- attempt failover to backup agent if this is an agent error if agentError { backupAgent := wp.resolveBackupAgent(job) - if backupAgent != "" { + if backupAgent != "" && !wp.isAgentCoolingDown(backupAgent) { failedOver, foErr := wp.db.FailoverJob(job.ID, workerID, backupAgent) if foErr != nil { log.Printf("[%s] Error attempting failover for job %d: %v", workerID, job.ID, foErr) @@ -514,7 +542,7 @@ func (wp *WorkerPool) resolveBackupAgent(job *storage.ReviewJob) string { if err != nil || !agent.IsAvailable(resolved.Name()) { return "" } - if resolved.Name() == job.Agent { + if resolved.Name() == agent.CanonicalName(job.Agent) { return "" } return resolved.Name() @@ -534,6 +562,148 @@ func (wp *WorkerPool) broadcastFailed(job *storage.ReviewJob, agentName, errorMs }) } +// quotaErrorPrefix is prepended to error messages when a job is failed +// due to agent quota exhaustion. CI poller checks for this prefix to +// distinguish quota failures from real failures. +const quotaErrorPrefix = "quota: " + +// defaultCooldown is the fallback duration when the error message doesn't +// contain a parseable "reset after" token. +const defaultCooldown = 30 * time.Minute +const minCooldown = 1 * time.Minute +const maxCooldown = 24 * time.Hour + +// isQuotaError returns true if the error message indicates a hard API +// quota exhaustion (case-insensitive). Transient rate-limit/429 errors +// are excluded — those should go through normal retries, not cooldown. +func isQuotaError(errMsg string) bool { + lower := strings.ToLower(errMsg) + patterns := []string{ + "resource exhausted", + "quota exceeded", + "quota_exceeded", + "quota exhausted", + "quota_exhausted", + "insufficient_quota", + "exhausted your capacity", + } + for _, p := range patterns { + if strings.Contains(lower, p) { + return true + } + } + return false +} + +// parseQuotaCooldown extracts a Go-format duration from a "reset after +// " substring. Returns fallback if not found or unparseable. +func parseQuotaCooldown(errMsg string, fallback time.Duration) time.Duration { + lower := strings.ToLower(errMsg) + idx := strings.Index(lower, "reset after ") + if idx < 0 { + return fallback + } + rest := errMsg[idx+len("reset after "):] + // Take the next whitespace-delimited token as a duration + token := rest + if sp := strings.IndexAny(rest, " \t\n,;)"); sp > 0 { + token = rest[:sp] + } + token = strings.TrimRight(token, ".,;:)]}") + d, err := time.ParseDuration(token) + if err != nil || d <= 0 { + return fallback + } + if d < minCooldown { + return minCooldown + } + if d > maxCooldown { + return maxCooldown + } + return d +} + +// cooldownAgent sets or extends the cooldown expiry for an agent. +// Only extends — never shortens an existing cooldown. +func (wp *WorkerPool) cooldownAgent(name string, until time.Time) { + name = agent.CanonicalName(name) + wp.agentCooldownsMu.Lock() + defer wp.agentCooldownsMu.Unlock() + if existing, ok := wp.agentCooldowns[name]; ok && existing.After(until) { + return + } + wp.agentCooldowns[name] = until +} + +// isAgentCoolingDown returns true if the agent is currently in a +// quota cooldown period. Expired entries are cleaned up eagerly. +func (wp *WorkerPool) isAgentCoolingDown(name string) bool { + name = agent.CanonicalName(name) + wp.agentCooldownsMu.RLock() + expiry, ok := wp.agentCooldowns[name] + if !ok { + wp.agentCooldownsMu.RUnlock() + return false + } + if time.Now().After(expiry) { + wp.agentCooldownsMu.RUnlock() + if wp.testHookCooldownLockUpgrade != nil { + wp.testHookCooldownLockUpgrade() + } + // Upgrade to write lock and delete expired entry + wp.agentCooldownsMu.Lock() + // Re-check under write lock (may have been refreshed) + exp, stillExists := wp.agentCooldowns[name] + if stillExists && time.Now().After(exp) { + delete(wp.agentCooldowns, name) + wp.agentCooldownsMu.Unlock() + return false + } + wp.agentCooldownsMu.Unlock() + return stillExists + } + wp.agentCooldownsMu.RUnlock() + return true +} + +// failoverOrFail attempts failover to a backup agent for the job. +// If no backup is available, fails the job with a quota-prefixed error. +func (wp *WorkerPool) failoverOrFail( + workerID string, job *storage.ReviewJob, + agentName, errorMsg string, +) { + backupAgent := wp.resolveBackupAgent(job) + if backupAgent != "" && !wp.isAgentCoolingDown(backupAgent) { + failedOver, err := wp.db.FailoverJob( + job.ID, workerID, backupAgent, + ) + if err != nil { + log.Printf("[%s] Error attempting failover for job %d: %v", + workerID, job.ID, err) + } + if failedOver { + log.Printf("[%s] Job %d failing over from %s to %s (quota): %s", + workerID, job.ID, agentName, backupAgent, errorMsg) + return + } + } + + // No backup or failover failed — fail with quota prefix + quotaMsg := quotaErrorPrefix + errorMsg + if updated, err := wp.db.FailJob(job.ID, workerID, quotaMsg); err != nil { + log.Printf("[%s] Error failing job %d: %v", workerID, job.ID, err) + } else if updated { + log.Printf("[%s] Job %d skipped (agent %s quota exhausted)", + workerID, job.ID, agentName) + wp.broadcastFailed(job, agentName, quotaMsg) + if wp.errorLog != nil { + wp.errorLog.LogError("worker", + fmt.Sprintf("job %d skipped (quota): %s", job.ID, errorMsg), + job.ID) + } + } +} + // markCompactSourceJobs marks all source jobs as addressed for a completed compact job func (wp *WorkerPool) markCompactSourceJobs(workerID string, jobID int64) error { // Read metadata file, retrying briefly in case the CLI hasn't finished diff --git a/internal/daemon/worker_test.go b/internal/daemon/worker_test.go index dd76c521..562fe70e 100644 --- a/internal/daemon/worker_test.go +++ b/internal/daemon/worker_test.go @@ -1,6 +1,7 @@ package daemon import ( + "strings" "sync/atomic" "testing" "time" @@ -283,6 +284,565 @@ func TestWorkerPoolCancelJobFinalCheckDeadlockSafe(t *testing.T) { } } +func TestIsQuotaError(t *testing.T) { + tests := []struct { + errMsg string + want bool + }{ + // Hard quota exhaustion — should trigger cooldown/skip + {"quota exceeded for model", true}, + {"QUOTA_EXCEEDED: limit reached", true}, + {"quota exhausted, reset after 8h", true}, + {"QUOTA_EXHAUSTED: try later", true}, + {"insufficient_quota: limit reached", true}, + {"You have exhausted your capacity", true}, + {"RESOURCE EXHAUSTED: try later", true}, + // Transient rate limits — should NOT trigger cooldown (use retries) + {"Rate limit reached", false}, + {"rate_limit_error: too fast", false}, + {"Too Many Requests (429)", false}, + {"HTTP 429: slow down", false}, + {"status 429 received from API", false}, + // Other non-quota errors + {"connection reset by peer", false}, + {"timeout after 30s", false}, + {"agent not found", false}, + {"disk quota full", false}, + {"", false}, + } + for _, tt := range tests { + t.Run(tt.errMsg, func(t *testing.T) { + if got := isQuotaError(tt.errMsg); got != tt.want { + t.Errorf("isQuotaError(%q) = %v, want %v", + tt.errMsg, got, tt.want) + } + }) + } +} + +func TestParseQuotaCooldown(t *testing.T) { + tests := []struct { + name string + errMsg string + fallback time.Duration + want time.Duration + }{ + { + name: "extracts go duration", + errMsg: "quota exhausted, reset after 8h26m13s please wait", + fallback: 30 * time.Minute, + want: 8*time.Hour + 26*time.Minute + 13*time.Second, + }, + { + name: "no reset token falls back", + errMsg: "quota exceeded for model gemini-2.5-pro", + fallback: 30 * time.Minute, + want: 30 * time.Minute, + }, + { + name: "unparseable duration falls back", + errMsg: "reset after bogus", + fallback: 15 * time.Minute, + want: 15 * time.Minute, + }, + { + name: "duration at end of string", + errMsg: "reset after 2h30m", + fallback: 30 * time.Minute, + want: 2*time.Hour + 30*time.Minute, + }, + { + name: "trailing punctuation trimmed", + errMsg: "reset after 8h26m13s.", + fallback: 30 * time.Minute, + want: 8*time.Hour + 26*time.Minute + 13*time.Second, + }, + { + name: "trailing paren trimmed", + errMsg: "reset after 1h30m)", + fallback: 30 * time.Minute, + want: 1*time.Hour + 30*time.Minute, + }, + { + name: "clamped to max 24h", + errMsg: "reset after 99999h", + fallback: 30 * time.Minute, + want: 24 * time.Hour, + }, + { + name: "clamped to min 1m", + errMsg: "reset after 5s", + fallback: 30 * time.Minute, + want: 1 * time.Minute, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := parseQuotaCooldown(tt.errMsg, tt.fallback) + if got != tt.want { + t.Errorf("parseQuotaCooldown() = %v, want %v", + got, tt.want) + } + }) + } +} + +func TestAgentCooldown(t *testing.T) { + cfg := config.DefaultConfig() + pool := NewWorkerPool(nil, NewStaticConfig(cfg), 1, NewBroadcaster(), nil) + + // Not cooling down initially + if pool.isAgentCoolingDown("gemini") { + t.Error("expected gemini not in cooldown initially") + } + + // Set cooldown + pool.cooldownAgent("gemini", time.Now().Add(1*time.Hour)) + if !pool.isAgentCoolingDown("gemini") { + t.Error("expected gemini in cooldown after set") + } + + // Different agent not affected + if pool.isAgentCoolingDown("codex") { + t.Error("expected codex not in cooldown") + } + + // Expired cooldown returns false + pool.cooldownAgent("codex", time.Now().Add(-1*time.Second)) + if pool.isAgentCoolingDown("codex") { + t.Error("expected expired cooldown to return false") + } + + // cooldownAgent never shortens + pool.cooldownAgent("gemini", time.Now().Add(1*time.Minute)) + if !pool.isAgentCoolingDown("gemini") { + t.Error("cooldown should not have been shortened") + } +} + +func TestAgentCooldown_ExpiredEntryDeleted(t *testing.T) { + cfg := config.DefaultConfig() + pool := NewWorkerPool( + nil, NewStaticConfig(cfg), 1, NewBroadcaster(), nil, + ) + + // Set an already-expired cooldown + pool.cooldownAgent("gemini", time.Now().Add(-1*time.Second)) + + // Should return false and clean up the entry + if pool.isAgentCoolingDown("gemini") { + t.Error("expected expired cooldown to return false") + } + + // Entry should be deleted from the map + pool.agentCooldownsMu.RLock() + _, exists := pool.agentCooldowns["gemini"] + pool.agentCooldownsMu.RUnlock() + if exists { + t.Error("expected expired entry to be deleted from map") + } +} + +func TestAgentCooldown_RefreshDuringUpgrade(t *testing.T) { + cfg := config.DefaultConfig() + pool := NewWorkerPool( + nil, NewStaticConfig(cfg), 1, NewBroadcaster(), nil, + ) + + // Set an already-expired cooldown so RLock path enters upgrade + pool.cooldownAgent("gemini", time.Now().Add(-1*time.Second)) + + // Use the test hook to refresh the cooldown in the window + // between RUnlock and Lock, simulating a concurrent goroutine. + pool.testHookCooldownLockUpgrade = func() { + pool.agentCooldownsMu.Lock() + pool.agentCooldowns["gemini"] = time.Now().Add(1 * time.Hour) + pool.agentCooldownsMu.Unlock() + } + + // The read-lock path sees expired, upgrades, recheck sees + // refreshed entry — should return true. + if !pool.isAgentCoolingDown("gemini") { + t.Error("expected refreshed cooldown to return true") + } +} + +func TestProcessJob_CooldownResolvesAlias(t *testing.T) { + tc := newWorkerTestContext(t, 1) + sha := testutil.GetHeadSHA(t, tc.TmpDir) + + // Enqueue a job with the alias "claude" (canonical: "claude-code") + commit, err := tc.DB.GetOrCreateCommit( + tc.Repo.ID, sha, "A", "S", time.Now(), + ) + if err != nil { + t.Fatalf("GetOrCreateCommit: %v", err) + } + job, err := tc.DB.EnqueueJob(storage.EnqueueOpts{ + RepoID: tc.Repo.ID, + CommitID: commit.ID, + GitRef: sha, + Agent: "claude", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + claimed, err := tc.DB.ClaimJob("test-worker") + if err != nil || claimed.ID != job.ID { + t.Fatalf("ClaimJob: err=%v, claimed=%v", err, claimed) + } + + // Cool down "claude-code" (canonical name) + tc.Pool.cooldownAgent( + "claude-code", time.Now().Add(1*time.Hour), + ) + + // processJob should detect cooldown via alias resolution + tc.Pool.processJob("test-worker", claimed) + + updated, err := tc.DB.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + if updated.Status != storage.JobStatusFailed { + t.Errorf( + "status=%q, want failed (cooldown via alias)", + updated.Status, + ) + } +} + +func TestResolveBackupAgent_AliasMatchesPrimary(t *testing.T) { + // "claude" is an alias for "claude-code". If job.Agent is "claude" + // and backup resolves to "claude-code", they are the same agent. + cfg := config.DefaultConfig() + cfg.DefaultBackupAgent = "claude-code" + pool := NewWorkerPool( + nil, NewStaticConfig(cfg), 1, NewBroadcaster(), nil, + ) + job := &storage.ReviewJob{ + Agent: "claude", + RepoPath: t.TempDir(), + } + got := pool.resolveBackupAgent(job) + // Should return "" because claude == claude-code after alias + // resolution. (May also return "" if claude-code binary is not + // installed, which is fine — both reasons are correct.) + if got != "" { + t.Errorf( + "resolveBackupAgent() = %q, want empty (alias match)", + got, + ) + } +} + +func TestFailOrRetryInner_QuotaSkipsRetries(t *testing.T) { + tc := newWorkerTestContext(t, 1) + sha := testutil.GetHeadSHA(t, tc.TmpDir) + job := tc.createAndClaimJob(t, sha, "test-worker") + + // Subscribe to events to verify broadcast + _, eventCh := tc.Broadcaster.Subscribe("") + + quotaErr := "resource exhausted: reset after 1h" + tc.Pool.failOrRetryInner("test-worker", job, "gemini", quotaErr, true) + + // Job should be failed (not retried) with quota prefix + updated, err := tc.DB.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + if updated.Status != storage.JobStatusFailed { + t.Errorf("status=%q, want failed", updated.Status) + } + if !strings.HasPrefix(updated.Error, quotaErrorPrefix) { + t.Errorf("error=%q, want prefix %q", updated.Error, quotaErrorPrefix) + } + + // Retry count should be 0 — no retries attempted + retryCount, err := tc.DB.GetJobRetryCount(job.ID) + if err != nil { + t.Fatalf("GetJobRetryCount: %v", err) + } + if retryCount != 0 { + t.Errorf("retry_count=%d, want 0 (quota should skip retries)", retryCount) + } + + // Agent should be in cooldown + if !tc.Pool.isAgentCoolingDown("gemini") { + t.Error("expected gemini in cooldown after quota error") + } + + // Broadcast should have fired + select { + case ev := <-eventCh: + if ev.Type != "review.failed" { + t.Errorf("event type=%q, want review.failed", ev.Type) + } + if !strings.HasPrefix(ev.Error, quotaErrorPrefix) { + t.Errorf("event error=%q, want prefix %q", ev.Error, quotaErrorPrefix) + } + case <-time.After(time.Second): + t.Error("no broadcast event received") + } +} + +func TestFailOrRetryInner_QuotaExhaustedVariant(t *testing.T) { + tc := newWorkerTestContext(t, 1) + sha := testutil.GetHeadSHA(t, tc.TmpDir) + job := tc.createAndClaimJob(t, sha, "test-worker") + + // "quota exhausted" (not "quota exceeded") must also trigger quota-skip + tc.Pool.failOrRetryInner("test-worker", job, "gemini", "quota exhausted, reset after 2h", true) + + updated, err := tc.DB.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + if updated.Status != storage.JobStatusFailed { + t.Errorf("status=%q, want failed", updated.Status) + } + if !strings.HasPrefix(updated.Error, quotaErrorPrefix) { + t.Errorf("error=%q, want prefix %q", updated.Error, quotaErrorPrefix) + } + retryCount, _ := tc.DB.GetJobRetryCount(job.ID) + if retryCount != 0 { + t.Errorf("retry_count=%d, want 0", retryCount) + } +} + +func TestFailOrRetryInner_NonQuotaStillRetries(t *testing.T) { + tc := newWorkerTestContext(t, 1) + sha := testutil.GetHeadSHA(t, tc.TmpDir) + job := tc.createAndClaimJob(t, sha, "test-worker") + + // A non-quota agent error should follow the normal retry path + tc.Pool.failOrRetryInner("test-worker", job, "gemini", "connection reset", true) + + updated, err := tc.DB.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + // Should be queued for retry, not failed + if updated.Status != storage.JobStatusQueued { + t.Errorf("status=%q, want queued (retry)", updated.Status) + } + + retryCount, err := tc.DB.GetJobRetryCount(job.ID) + if err != nil { + t.Fatalf("GetJobRetryCount: %v", err) + } + if retryCount != 1 { + t.Errorf("retry_count=%d, want 1", retryCount) + } + + // Agent should NOT be in cooldown + if tc.Pool.isAgentCoolingDown("gemini") { + t.Error("expected gemini NOT in cooldown for non-quota error") + } +} + +func TestFailoverOrFail_FailsOverToBackup(t *testing.T) { + tc := newWorkerTestContext(t, 1) + sha := testutil.GetHeadSHA(t, tc.TmpDir) + + // Configure backup agent + cfg := config.DefaultConfig() + cfg.DefaultBackupAgent = "test" + tc.Pool = NewWorkerPool(tc.DB, NewStaticConfig(cfg), 1, tc.Broadcaster, nil) + + // Enqueue with agent "codex" (backup is "test") + commit, err := tc.DB.GetOrCreateCommit(tc.Repo.ID, sha, "A", "S", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit: %v", err) + } + job, err := tc.DB.EnqueueJob(storage.EnqueueOpts{ + RepoID: tc.Repo.ID, + CommitID: commit.ID, + GitRef: sha, + Agent: "codex", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + claimed, err := tc.DB.ClaimJob("test-worker") + if err != nil || claimed.ID != job.ID { + t.Fatalf("ClaimJob: err=%v, claimed=%v", err, claimed) + } + // Fill in RepoPath so resolveBackupAgent can work + job.RepoPath = tc.TmpDir + + tc.Pool.failoverOrFail("test-worker", job, "codex", "quota exhausted") + + updated, err := tc.DB.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + // Should be queued for failover, agent changed to "test" + if updated.Status != storage.JobStatusQueued { + t.Errorf("status=%q, want queued (failover)", updated.Status) + } + if updated.Agent != "test" { + t.Errorf("agent=%q, want test (failover)", updated.Agent) + } +} + +func TestFailoverOrFail_NoBackupFailsWithQuotaPrefix(t *testing.T) { + tc := newWorkerTestContext(t, 1) + sha := testutil.GetHeadSHA(t, tc.TmpDir) + job := tc.createAndClaimJob(t, sha, "test-worker") + + // No backup configured — should fail with quota prefix + tc.Pool.failoverOrFail("test-worker", job, "test", "quota exhausted") + + updated, err := tc.DB.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + if updated.Status != storage.JobStatusFailed { + t.Errorf("status=%q, want failed", updated.Status) + } + if !strings.HasPrefix(updated.Error, quotaErrorPrefix) { + t.Errorf("error=%q, want prefix %q", updated.Error, quotaErrorPrefix) + } +} + +func TestFailOrRetryInner_RetryExhaustedBackupInCooldown(t *testing.T) { + tc := newWorkerTestContext(t, 1) + sha := testutil.GetHeadSHA(t, tc.TmpDir) + + // Configure backup agent + cfg := config.DefaultConfig() + cfg.DefaultBackupAgent = "test" + tc.Pool = NewWorkerPool( + tc.DB, NewStaticConfig(cfg), 1, tc.Broadcaster, nil, + ) + + // Enqueue with agent "codex" + commit, err := tc.DB.GetOrCreateCommit( + tc.Repo.ID, sha, "A", "S", time.Now(), + ) + if err != nil { + t.Fatalf("GetOrCreateCommit: %v", err) + } + job, err := tc.DB.EnqueueJob(storage.EnqueueOpts{ + RepoID: tc.Repo.ID, + CommitID: commit.ID, + GitRef: sha, + Agent: "codex", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + claimed, err := tc.DB.ClaimJob("test-worker") + if err != nil || claimed.ID != job.ID { + t.Fatalf("ClaimJob: err=%v, claimed=%v", err, claimed) + } + job.RepoPath = tc.TmpDir + + // Exhaust retries + for i := range maxRetries { + tc.Pool.failOrRetryInner( + "test-worker", job, "codex", + "connection reset", true, + ) + reclaimed, claimErr := tc.DB.ClaimJob("test-worker") + if claimErr != nil || reclaimed == nil { + t.Fatalf("re-claim after retry %d: %v", i, claimErr) + } + job = reclaimed + } + + // Put the backup agent in cooldown + tc.Pool.cooldownAgent( + "test", time.Now().Add(30*time.Minute), + ) + + // Final failure — retries exhausted, backup in cooldown + tc.Pool.failOrRetryInner( + "test-worker", job, "codex", + "connection reset", true, + ) + + updated, err := tc.DB.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + // Should be failed, NOT queued for failover to cooled-down agent + if updated.Status != storage.JobStatusFailed { + t.Errorf("status=%q, want failed", updated.Status) + } + // Agent should still be codex (not failed over) + if updated.Agent != "codex" { + t.Errorf("agent=%q, want codex (no failover)", updated.Agent) + } +} + +func TestFailOrRetryInner_RetryExhaustedFailsOverToBackup(t *testing.T) { + tc := newWorkerTestContext(t, 1) + sha := testutil.GetHeadSHA(t, tc.TmpDir) + + // Configure backup agent + cfg := config.DefaultConfig() + cfg.DefaultBackupAgent = "test" + tc.Pool = NewWorkerPool( + tc.DB, NewStaticConfig(cfg), 1, tc.Broadcaster, nil, + ) + + // Enqueue with agent "codex" + commit, err := tc.DB.GetOrCreateCommit( + tc.Repo.ID, sha, "A", "S", time.Now(), + ) + if err != nil { + t.Fatalf("GetOrCreateCommit: %v", err) + } + job, err := tc.DB.EnqueueJob(storage.EnqueueOpts{ + RepoID: tc.Repo.ID, + CommitID: commit.ID, + GitRef: sha, + Agent: "codex", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + claimed, err := tc.DB.ClaimJob("test-worker") + if err != nil || claimed.ID != job.ID { + t.Fatalf("ClaimJob: err=%v, claimed=%v", err, claimed) + } + job.RepoPath = tc.TmpDir + + // Exhaust retries + for i := range maxRetries { + tc.Pool.failOrRetryInner( + "test-worker", job, "codex", + "connection reset", true, + ) + reclaimed, claimErr := tc.DB.ClaimJob("test-worker") + if claimErr != nil || reclaimed == nil { + t.Fatalf("re-claim after retry %d: %v", i, claimErr) + } + job = reclaimed + } + + // Final failure — retries exhausted, backup available + tc.Pool.failOrRetryInner( + "test-worker", job, "codex", + "connection reset", true, + ) + + updated, err := tc.DB.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + // Should be queued for failover, agent changed to "test" + if updated.Status != storage.JobStatusQueued { + t.Errorf("status=%q, want queued (failover)", updated.Status) + } + if updated.Agent != "test" { + t.Errorf("agent=%q, want test (failover)", updated.Agent) + } +} + func TestResolveBackupAgent(t *testing.T) { tests := []struct { name string