From e6ba1fa48cd77b91e5676d2adfe8550c08de6aa1 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 19 Feb 2026 08:14:48 -0600 Subject: [PATCH 01/11] Add quota-aware agent cooldown with graceful skip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Detect quota/rate-limit errors from agents (e.g., Gemini free tier), skip retries, cool down the agent for the duration reported in the error (or 30min default), and attempt failover to backup agent. Jobs that fail due to quota get a "quota: " error prefix. The CI poller uses this prefix to distinguish quota skips from real failures: quota-only batches get "success" commit status instead of "error", and PR comments show "skipped (quota)" instead of "failed". No schema changes — quota skips reuse the existing "failed" status with a convention-based error prefix. Co-Authored-By: Claude Opus 4.6 --- internal/daemon/ci_poller.go | 122 +++++++++++++++++++++++--- internal/daemon/ci_poller_test.go | 127 +++++++++++++++++++++++++++ internal/daemon/worker.go | 139 ++++++++++++++++++++++++++++++ internal/daemon/worker_test.go | 102 ++++++++++++++++++++++ 4 files changed, 477 insertions(+), 13 deletions(-) diff --git a/internal/daemon/ci_poller.go b/internal/daemon/ci_poller.go index d2f981e8..025bf767 100644 --- a/internal/daemon/ci_poller.go +++ b/internal/daemon/ci_poller.go @@ -853,20 +853,34 @@ func (p *CIPoller) postBatchResults(batch *storage.CIPRBatch) { } // Set commit status based on job outcomes: - // all succeeded → success - // mixed → failure (some reviews did not complete) - // all failed → error + // all succeeded → success + // all failures are quota skips → success (with note) + // mixed real failures → failure + // all failed (real) → error + quotaSkips := countQuotaFailures(reviews) + realFailures := batch.FailedJobs - quotaSkips statusState := "success" statusDesc := "Review complete" switch { + case batch.CompletedJobs == 0 && realFailures == 0 && quotaSkips > 0: + // All failures are quota skips — not the code's fault + statusDesc = fmt.Sprintf( + "Review complete (%d agent(s) skipped — quota)", + quotaSkips, + ) case batch.CompletedJobs == 0: statusState = "error" statusDesc = "All reviews failed" - case batch.FailedJobs > 0: + case realFailures > 0: statusState = "failure" statusDesc = fmt.Sprintf( "Review complete (%d/%d jobs failed)", - batch.FailedJobs, batch.TotalJobs, + realFailures, batch.TotalJobs, + ) + case quotaSkips > 0: + statusDesc = fmt.Sprintf( + "Review complete (%d agent(s) skipped — quota)", + quotaSkips, ) } if err := p.callSetCommitStatus(batch.GithubRepo, batch.HeadSHA, statusState, statusDesc); err != nil { @@ -1120,11 +1134,15 @@ Rules: for i, r := range reviews { fmt.Fprintf(&b, "---\n### Review %d: Agent=%s, Type=%s", i+1, r.Agent, r.ReviewType) - if r.Status == "failed" { + if isQuotaFailure(r) { + b.WriteString(" [SKIPPED]") + } else if r.Status == "failed" { b.WriteString(" [FAILED]") } b.WriteString("\n") - if r.Output != "" { + if isQuotaFailure(r) { + b.WriteString("(review skipped — agent quota exhausted)") + } else if r.Output != "" { output := r.Output if len(output) > maxPerReview { output = output[:maxPerReview] + "\n\n...(truncated)" @@ -1167,6 +1185,10 @@ func formatSynthesizedComment(output string, reviews []storage.BatchReviewResult fmt.Fprintf(&b, "\n\n---\n*Synthesized from %d reviews (agents: %s | types: %s)*\n", len(reviews), strings.Join(agents, ", "), strings.Join(types, ", ")) + if note := skippedAgentNote(reviews); note != "" { + b.WriteString(note) + } + return b.String() } @@ -1178,9 +1200,15 @@ func formatRawBatchComment(reviews []storage.BatchReviewResult, headSHA string) b.WriteString("> Synthesis unavailable. Showing raw review outputs.\n\n") for _, r := range reviews { - summary := fmt.Sprintf("Agent: %s | Type: %s | Status: %s", r.Agent, r.ReviewType, r.Status) + status := r.Status + if isQuotaFailure(r) { + status = "skipped (quota)" + } + summary := fmt.Sprintf("Agent: %s | Type: %s | Status: %s", r.Agent, r.ReviewType, status) fmt.Fprintf(&b, "
\n%s\n\n", summary) - if r.Status == "failed" { + if isQuotaFailure(r) { + b.WriteString("Review skipped — agent quota exhausted.\n") + } else if r.Status == "failed" { b.WriteString("**Error:** Review failed. Check daemon logs for details.\n") } else if r.Output != "" { output := r.Output @@ -1195,24 +1223,92 @@ func formatRawBatchComment(reviews []storage.BatchReviewResult, headSHA string) b.WriteString("\n\n
\n\n") } + if note := skippedAgentNote(reviews); note != "" { + b.WriteString(note) + } + return b.String() } // formatAllFailedComment formats a comment when every job in a batch failed. func formatAllFailedComment(reviews []storage.BatchReviewResult, headSHA string) string { + quotaSkips := countQuotaFailures(reviews) + allQuota := quotaSkips == len(reviews) + var b strings.Builder - fmt.Fprintf(&b, "## roborev: Review Failed (`%s`)\n\n", shortSHA(headSHA)) - b.WriteString("All review jobs in this batch failed.\n\n") + if allQuota { + fmt.Fprintf(&b, "## roborev: Review Skipped (`%s`)\n\n", shortSHA(headSHA)) + b.WriteString("All review agents were skipped due to quota exhaustion.\n\n") + } else { + fmt.Fprintf(&b, "## roborev: Review Failed (`%s`)\n\n", shortSHA(headSHA)) + b.WriteString("All review jobs in this batch failed.\n\n") + } for _, r := range reviews { - fmt.Fprintf(&b, "- **%s** (%s): failed\n", r.Agent, r.ReviewType) + if isQuotaFailure(r) { + fmt.Fprintf(&b, "- **%s** (%s): skipped (quota)\n", r.Agent, r.ReviewType) + } else { + fmt.Fprintf(&b, "- **%s** (%s): failed\n", r.Agent, r.ReviewType) + } } - b.WriteString("\nCheck daemon logs for error details.") + if !allQuota { + b.WriteString("\nCheck daemon logs for error details.") + } + + if note := skippedAgentNote(reviews); note != "" { + b.WriteString(note) + } return b.String() } +// isQuotaFailure returns true if a batch review's error indicates a quota +// skip rather than a real failure. Matches the prefix set by worker.go. +func isQuotaFailure(r storage.BatchReviewResult) bool { + return r.Status == "failed" && strings.HasPrefix(r.Error, quotaErrorPrefix) +} + +// countQuotaFailures returns the number of reviews that failed due to +// agent quota exhaustion rather than a real error. +func countQuotaFailures(reviews []storage.BatchReviewResult) int { + n := 0 + for _, r := range reviews { + if isQuotaFailure(r) { + n++ + } + } + return n +} + +// skippedAgentNote returns a markdown note listing agents that were +// skipped due to quota exhaustion. Returns "" if none were skipped. +func skippedAgentNote(reviews []storage.BatchReviewResult) string { + agents := make(map[string]struct{}) + for _, r := range reviews { + if isQuotaFailure(r) { + agents[r.Agent] = struct{}{} + } + } + if len(agents) == 0 { + return "" + } + names := make([]string, 0, len(agents)) + for a := range agents { + names = append(names, a) + } + if len(names) == 1 { + return fmt.Sprintf( + "\n*Note: %s review skipped (agent quota exhausted)*\n", + names[0], + ) + } + return fmt.Sprintf( + "\n*Note: %s reviews skipped (agent quota exhausted)*\n", + strings.Join(names, ", "), + ) +} + // shortSHA returns the first 8 characters of a SHA, or the full string if shorter. func shortSHA(sha string) string { if len(sha) > 8 { diff --git a/internal/daemon/ci_poller_test.go b/internal/daemon/ci_poller_test.go index 99de2110..8b5e95fb 100644 --- a/internal/daemon/ci_poller_test.go +++ b/internal/daemon/ci_poller_test.go @@ -1550,6 +1550,133 @@ func TestCIPollerPostBatchResults_SetsFailureStatusOnMixedOutcome(t *testing.T) } } +func TestCIPollerPostBatchResults_QuotaSkippedNotFailure(t *testing.T) { + h := newCIPollerHarness(t, "git@github.com:acme/api.git") + + // One success, one quota-skipped + batch, jobs := h.seedBatchWithJobs(t, 70, "quota-sha", + jobSpec{Agent: "codex", ReviewType: "security", Status: "done", Output: "No issues found."}, + jobSpec{Agent: "gemini", ReviewType: "security", Status: "failed", Error: quotaErrorPrefix + "gemini quota exhausted"}, + ) + + capturedStatuses := h.CaptureCommitStatuses() + h.CaptureComments() + + // Simulate: job1 succeeded, job2 failed (quota) + h.Poller.handleBatchJobDone(batch, jobs[0].ID, true) + h.Poller.handleBatchJobDone(batch, jobs[1].ID, false) + + if len(*capturedStatuses) != 1 { + t.Fatalf("expected 1 status call, got %d", len(*capturedStatuses)) + } + sc := (*capturedStatuses)[0] + // Quota skip with at least one success → success, not failure + if sc.State != "success" { + t.Errorf("state=%q, want success (quota skip not a failure)", sc.State) + } + if !strings.Contains(sc.Desc, "skipped") { + t.Errorf("desc=%q, expected mention of skipped", sc.Desc) + } +} + +func TestCIPollerPostBatchResults_AllQuotaSkippedIsSuccess(t *testing.T) { + h := newCIPollerHarness(t, "git@github.com:acme/api.git") + + batch, jobs := h.seedBatchWithJobs(t, 71, "all-quota-sha", + jobSpec{Agent: "gemini", ReviewType: "security", Status: "failed", Error: quotaErrorPrefix + "gemini quota exhausted"}, + ) + + capturedStatuses := h.CaptureCommitStatuses() + h.CaptureComments() + + h.Poller.handleBatchJobDone(batch, jobs[0].ID, false) + + if len(*capturedStatuses) != 1 { + t.Fatalf("expected 1 status call, got %d", len(*capturedStatuses)) + } + sc := (*capturedStatuses)[0] + if sc.State != "success" { + t.Errorf("state=%q, want success (all-quota batch is not an error)", sc.State) + } + if !strings.Contains(sc.Desc, "skipped") { + t.Errorf("desc=%q, expected mention of skipped", sc.Desc) + } +} + +func TestCIPollerPostBatchResults_MixedQuotaAndRealFailure(t *testing.T) { + h := newCIPollerHarness(t, "git@github.com:acme/api.git") + + batch, jobs := h.seedBatchWithJobs(t, 72, "mixed-quota-sha", + jobSpec{Agent: "codex", ReviewType: "security", Status: "failed", Error: "timeout"}, + jobSpec{Agent: "gemini", ReviewType: "security", Status: "failed", Error: quotaErrorPrefix + "quota exhausted"}, + ) + + capturedStatuses := h.CaptureCommitStatuses() + h.CaptureComments() + + h.Poller.handleBatchJobDone(batch, jobs[0].ID, false) + h.Poller.handleBatchJobDone(batch, jobs[1].ID, false) + + if len(*capturedStatuses) != 1 { + t.Fatalf("expected 1 status call, got %d", len(*capturedStatuses)) + } + sc := (*capturedStatuses)[0] + // Real failure + quota skip → error (CompletedJobs == 0 and realFailures > 0) + if sc.State != "error" { + t.Errorf("state=%q, want error (real failure present)", sc.State) + } +} + +func TestFormatAllFailedComment_AllQuotaSkipped(t *testing.T) { + reviews := []storage.BatchReviewResult{ + {JobID: 1, Agent: "gemini", ReviewType: "security", Status: "failed", Error: quotaErrorPrefix + "quota exhausted"}, + } + + comment := formatAllFailedComment(reviews, "abc123def456") + + assertContainsAll(t, comment, "comment", + "## roborev: Review Skipped", + "quota exhaustion", + "skipped (quota)", + ) + if strings.Contains(comment, "Check daemon logs") { + t.Error("all-quota comment should not mention daemon logs") + } +} + +func TestFormatRawBatchComment_QuotaSkippedNote(t *testing.T) { + reviews := []storage.BatchReviewResult{ + {JobID: 1, Agent: "codex", ReviewType: "security", Output: "Finding A", Status: "done"}, + {JobID: 2, Agent: "gemini", ReviewType: "security", Status: "failed", Error: quotaErrorPrefix + "quota exhausted"}, + } + + comment := formatRawBatchComment(reviews, "abc123def456") + + assertContainsAll(t, comment, "comment", + "skipped (quota)", + "gemini review skipped", + ) +} + +func TestBuildSynthesisPrompt_QuotaSkippedLabel(t *testing.T) { + reviews := []storage.BatchReviewResult{ + {JobID: 1, Agent: "codex", ReviewType: "security", Output: "No issues.", Status: "done"}, + {JobID: 2, Agent: "gemini", ReviewType: "security", Status: "failed", Error: quotaErrorPrefix + "quota exhausted"}, + } + + prompt := buildSynthesisPrompt(reviews, "") + + assertContainsAll(t, prompt, "prompt", + "[SKIPPED]", + "review skipped", + ) + // Should NOT contain [FAILED] for the quota-skipped review + // Count occurrences of [FAILED] + if strings.Contains(prompt, "[FAILED]") { + t.Error("quota-skipped review should use [SKIPPED], not [FAILED]") + } +} + func TestCIPollerPostBatchResults_SetsErrorStatusOnCommentPostFailure(t *testing.T) { h := newCIPollerHarness(t, "git@github.com:acme/api.git") batch, _ := h.seedBatchWithJobs(t, 63, "post-fail-sha", jobSpec{ diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index 996dd819..2813524d 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -33,6 +33,10 @@ type WorkerPool struct { pendingCancels map[int64]bool // Jobs canceled before registered runningJobsMu sync.Mutex + // Agent cooldowns for quota exhaustion + agentCooldowns map[string]time.Time // agent name -> expiry + agentCooldownsMu sync.RWMutex + // Output capture for tail command outputBuffers *OutputBuffer @@ -52,6 +56,7 @@ func NewWorkerPool(db *storage.DB, cfgGetter ConfigGetter, numWorkers int, broad stopCh: make(chan struct{}), runningJobs: make(map[int64]context.CancelFunc), pendingCancels: make(map[int64]bool), + agentCooldowns: make(map[string]time.Time), outputBuffers: NewOutputBuffer(512*1024, 4*1024*1024), // 512KB/job, 4MB total } } @@ -280,6 +285,15 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { wp.registerRunningJob(job.ID, cancel) defer wp.unregisterRunningJob(job.ID) + // Skip immediately if the agent is in quota cooldown + if wp.isAgentCoolingDown(job.Agent) { + log.Printf("[%s] Agent %s in cooldown, skipping job %d", + workerID, job.Agent, job.ID) + wp.failoverOrFail(workerID, job, job.Agent, + fmt.Sprintf("agent %s quota cooldown active", job.Agent)) + return + } + // Build the prompt (or use pre-stored prompt for task/compact jobs) var reviewPrompt string var err error @@ -447,6 +461,17 @@ func (wp *WorkerPool) failOrRetryAgent(workerID string, job *storage.ReviewJob, } func (wp *WorkerPool) failOrRetryInner(workerID string, job *storage.ReviewJob, agentName string, errorMsg string, agentError bool) { + // Quota errors skip retries entirely — cool down the agent and + // attempt failover or fail with a quota-prefixed error. + if agentError && isQuotaError(errorMsg) { + dur := parseQuotaCooldown(errorMsg, defaultCooldown) + wp.cooldownAgent(agentName, time.Now().Add(dur)) + log.Printf("[%s] Agent %s quota exhausted, cooldown %v", + workerID, agentName, dur) + wp.failoverOrFail(workerID, job, agentName, errorMsg) + return + } + retried, err := wp.db.RetryJob(job.ID, workerID, maxRetries) if err != nil { log.Printf("[%s] Error retrying job: %v", workerID, err) @@ -534,6 +559,120 @@ func (wp *WorkerPool) broadcastFailed(job *storage.ReviewJob, agentName, errorMs }) } +// quotaErrorPrefix is prepended to error messages when a job is failed +// due to agent quota exhaustion. CI poller checks for this prefix to +// distinguish quota failures from real failures. +const quotaErrorPrefix = "quota: " + +// defaultCooldown is the fallback duration when the error message doesn't +// contain a parseable "reset after" token. +const defaultCooldown = 30 * time.Minute + +// isQuotaError returns true if the error message indicates a quota or +// rate-limit exhaustion (case-insensitive). +func isQuotaError(errMsg string) bool { + lower := strings.ToLower(errMsg) + patterns := []string{ + "quota", + "rate limit", + "rate_limit", + "exhausted your capacity", + "too many requests", + } + for _, p := range patterns { + if strings.Contains(lower, p) { + return true + } + } + return false +} + +// parseQuotaCooldown extracts a Go-format duration from a "reset after +// " substring. Returns fallback if not found or unparseable. +func parseQuotaCooldown(errMsg string, fallback time.Duration) time.Duration { + lower := strings.ToLower(errMsg) + idx := strings.Index(lower, "reset after ") + if idx < 0 { + return fallback + } + rest := errMsg[idx+len("reset after "):] + // Take the next whitespace-delimited token as a duration + token := rest + if sp := strings.IndexAny(rest, " \t\n,;)"); sp > 0 { + token = rest[:sp] + } + d, err := time.ParseDuration(token) + if err != nil || d <= 0 { + return fallback + } + return d +} + +// cooldownAgent sets or extends the cooldown expiry for an agent. +// Only extends — never shortens an existing cooldown. +func (wp *WorkerPool) cooldownAgent(name string, until time.Time) { + wp.agentCooldownsMu.Lock() + defer wp.agentCooldownsMu.Unlock() + if existing, ok := wp.agentCooldowns[name]; ok && existing.After(until) { + return + } + wp.agentCooldowns[name] = until +} + +// isAgentCoolingDown returns true if the agent is currently in a +// quota cooldown period. +func (wp *WorkerPool) isAgentCoolingDown(name string) bool { + wp.agentCooldownsMu.RLock() + defer wp.agentCooldownsMu.RUnlock() + expiry, ok := wp.agentCooldowns[name] + if !ok { + return false + } + if time.Now().After(expiry) { + // Expired — clean up lazily on next write + return false + } + return true +} + +// failoverOrFail attempts failover to a backup agent for the job. +// If no backup is available, fails the job with a quota-prefixed error. +func (wp *WorkerPool) failoverOrFail( + workerID string, job *storage.ReviewJob, + agentName, errorMsg string, +) { + backupAgent := wp.resolveBackupAgent(job) + if backupAgent != "" && !wp.isAgentCoolingDown(backupAgent) { + failedOver, err := wp.db.FailoverJob( + job.ID, workerID, backupAgent, + ) + if err != nil { + log.Printf("[%s] Error attempting failover for job %d: %v", + workerID, job.ID, err) + } + if failedOver { + log.Printf("[%s] Job %d failing over from %s to %s (quota): %s", + workerID, job.ID, agentName, backupAgent, errorMsg) + return + } + } + + // No backup or failover failed — fail with quota prefix + quotaMsg := quotaErrorPrefix + errorMsg + if updated, err := wp.db.FailJob(job.ID, workerID, quotaMsg); err != nil { + log.Printf("[%s] Error failing job %d: %v", workerID, job.ID, err) + } else if updated { + log.Printf("[%s] Job %d skipped (agent %s quota exhausted)", + workerID, job.ID, agentName) + wp.broadcastFailed(job, agentName, quotaMsg) + if wp.errorLog != nil { + wp.errorLog.LogError("worker", + fmt.Sprintf("job %d skipped (quota): %s", job.ID, errorMsg), + job.ID) + } + } +} + // markCompactSourceJobs marks all source jobs as addressed for a completed compact job func (wp *WorkerPool) markCompactSourceJobs(workerID string, jobID int64) error { // Read metadata file, retrying briefly in case the CLI hasn't finished diff --git a/internal/daemon/worker_test.go b/internal/daemon/worker_test.go index dd76c521..128a8714 100644 --- a/internal/daemon/worker_test.go +++ b/internal/daemon/worker_test.go @@ -283,6 +283,108 @@ func TestWorkerPoolCancelJobFinalCheckDeadlockSafe(t *testing.T) { } } +func TestIsQuotaError(t *testing.T) { + tests := []struct { + errMsg string + want bool + }{ + {"quota exceeded for model", true}, + {"Rate limit reached", true}, + {"rate_limit_error: too fast", true}, + {"You have exhausted your capacity", true}, + {"Too Many Requests (429)", true}, + {"QUOTA exhausted", true}, + {"connection reset by peer", false}, + {"timeout after 30s", false}, + {"agent not found", false}, + {"", false}, + } + for _, tt := range tests { + t.Run(tt.errMsg, func(t *testing.T) { + if got := isQuotaError(tt.errMsg); got != tt.want { + t.Errorf("isQuotaError(%q) = %v, want %v", + tt.errMsg, got, tt.want) + } + }) + } +} + +func TestParseQuotaCooldown(t *testing.T) { + tests := []struct { + name string + errMsg string + fallback time.Duration + want time.Duration + }{ + { + name: "extracts go duration", + errMsg: "quota exhausted, reset after 8h26m13s please wait", + fallback: 30 * time.Minute, + want: 8*time.Hour + 26*time.Minute + 13*time.Second, + }, + { + name: "no reset token falls back", + errMsg: "quota exceeded for model gemini-2.5-pro", + fallback: 30 * time.Minute, + want: 30 * time.Minute, + }, + { + name: "unparseable duration falls back", + errMsg: "reset after bogus", + fallback: 15 * time.Minute, + want: 15 * time.Minute, + }, + { + name: "duration at end of string", + errMsg: "reset after 2h30m", + fallback: 30 * time.Minute, + want: 2*time.Hour + 30*time.Minute, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := parseQuotaCooldown(tt.errMsg, tt.fallback) + if got != tt.want { + t.Errorf("parseQuotaCooldown() = %v, want %v", + got, tt.want) + } + }) + } +} + +func TestAgentCooldown(t *testing.T) { + cfg := config.DefaultConfig() + pool := NewWorkerPool(nil, NewStaticConfig(cfg), 1, NewBroadcaster(), nil) + + // Not cooling down initially + if pool.isAgentCoolingDown("gemini") { + t.Error("expected gemini not in cooldown initially") + } + + // Set cooldown + pool.cooldownAgent("gemini", time.Now().Add(1*time.Hour)) + if !pool.isAgentCoolingDown("gemini") { + t.Error("expected gemini in cooldown after set") + } + + // Different agent not affected + if pool.isAgentCoolingDown("codex") { + t.Error("expected codex not in cooldown") + } + + // Expired cooldown returns false + pool.cooldownAgent("codex", time.Now().Add(-1*time.Second)) + if pool.isAgentCoolingDown("codex") { + t.Error("expected expired cooldown to return false") + } + + // cooldownAgent never shortens + pool.cooldownAgent("gemini", time.Now().Add(1*time.Minute)) + if !pool.isAgentCoolingDown("gemini") { + t.Error("cooldown should not have been shortened") + } +} + func TestResolveBackupAgent(t *testing.T) { tests := []struct { name string From e9264390845cca37fb4515122f99732f9e281d20 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 19 Feb 2026 08:25:51 -0600 Subject: [PATCH 02/11] Address review findings: tighten quota detection, sort agents, add tests - Tighten isQuotaError patterns to avoid false positives (e.g., disk quota): use "resource exhausted", "quota exceeded", "429" instead of bare "quota" - Trim trailing punctuation in parseQuotaCooldown so "8h26m13s." parses correctly instead of falling back to 30m default - Sort agent names in skippedAgentNote for deterministic comment output - Add integration tests for failOrRetryInner (quota skips retries, non-quota still retries) and failoverOrFail (backup agent failover, no-backup quota prefix) Co-Authored-By: Claude Opus 4.6 --- internal/daemon/ci_poller.go | 2 + internal/daemon/worker.go | 11 ++- internal/daemon/worker_test.go | 165 ++++++++++++++++++++++++++++++++- 3 files changed, 174 insertions(+), 4 deletions(-) diff --git a/internal/daemon/ci_poller.go b/internal/daemon/ci_poller.go index 025bf767..8467de0c 100644 --- a/internal/daemon/ci_poller.go +++ b/internal/daemon/ci_poller.go @@ -10,6 +10,7 @@ import ( "log" "os" "os/exec" + "sort" "strings" "sync" "time" @@ -1297,6 +1298,7 @@ func skippedAgentNote(reviews []storage.BatchReviewResult) string { for a := range agents { names = append(names, a) } + sort.Strings(names) if len(names) == 1 { return fmt.Sprintf( "\n*Note: %s review skipped (agent quota exhausted)*\n", diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index 2813524d..cb6eac90 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -568,16 +568,20 @@ const quotaErrorPrefix = "quota: " // contain a parseable "reset after" token. const defaultCooldown = 30 * time.Minute -// isQuotaError returns true if the error message indicates a quota or -// rate-limit exhaustion (case-insensitive). +// isQuotaError returns true if the error message indicates an API quota +// or rate-limit exhaustion (case-insensitive). Patterns are scoped to +// provider API errors to avoid false positives (e.g., disk quota). func isQuotaError(errMsg string) bool { lower := strings.ToLower(errMsg) patterns := []string{ - "quota", + "resource exhausted", "rate limit", "rate_limit", + "quota exceeded", + "quota_exceeded", "exhausted your capacity", "too many requests", + "429", } for _, p := range patterns { if strings.Contains(lower, p) { @@ -601,6 +605,7 @@ func parseQuotaCooldown(errMsg string, fallback time.Duration) time.Duration { if sp := strings.IndexAny(rest, " \t\n,;)"); sp > 0 { token = rest[:sp] } + token = strings.TrimRight(token, ".,;:)]}") d, err := time.ParseDuration(token) if err != nil || d <= 0 { return fallback diff --git a/internal/daemon/worker_test.go b/internal/daemon/worker_test.go index 128a8714..97cff7b5 100644 --- a/internal/daemon/worker_test.go +++ b/internal/daemon/worker_test.go @@ -1,6 +1,7 @@ package daemon import ( + "strings" "sync/atomic" "testing" "time" @@ -289,14 +290,17 @@ func TestIsQuotaError(t *testing.T) { want bool }{ {"quota exceeded for model", true}, + {"QUOTA_EXCEEDED: limit reached", true}, {"Rate limit reached", true}, {"rate_limit_error: too fast", true}, {"You have exhausted your capacity", true}, {"Too Many Requests (429)", true}, - {"QUOTA exhausted", true}, + {"HTTP 429: slow down", true}, + {"RESOURCE EXHAUSTED: try later", true}, {"connection reset by peer", false}, {"timeout after 30s", false}, {"agent not found", false}, + {"disk quota full", false}, {"", false}, } for _, tt := range tests { @@ -340,6 +344,18 @@ func TestParseQuotaCooldown(t *testing.T) { fallback: 30 * time.Minute, want: 2*time.Hour + 30*time.Minute, }, + { + name: "trailing punctuation trimmed", + errMsg: "reset after 8h26m13s.", + fallback: 30 * time.Minute, + want: 8*time.Hour + 26*time.Minute + 13*time.Second, + }, + { + name: "trailing paren trimmed", + errMsg: "reset after 1h30m)", + fallback: 30 * time.Minute, + want: 1*time.Hour + 30*time.Minute, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -385,6 +401,153 @@ func TestAgentCooldown(t *testing.T) { } } +func TestFailOrRetryInner_QuotaSkipsRetries(t *testing.T) { + tc := newWorkerTestContext(t, 1) + sha := testutil.GetHeadSHA(t, tc.TmpDir) + job := tc.createAndClaimJob(t, sha, "test-worker") + + // Subscribe to events to verify broadcast + _, eventCh := tc.Broadcaster.Subscribe("") + + quotaErr := "resource exhausted: reset after 1h" + tc.Pool.failOrRetryInner("test-worker", job, "gemini", quotaErr, true) + + // Job should be failed (not retried) with quota prefix + updated, err := tc.DB.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + if updated.Status != storage.JobStatusFailed { + t.Errorf("status=%q, want failed", updated.Status) + } + if !strings.HasPrefix(updated.Error, quotaErrorPrefix) { + t.Errorf("error=%q, want prefix %q", updated.Error, quotaErrorPrefix) + } + + // Retry count should be 0 — no retries attempted + retryCount, err := tc.DB.GetJobRetryCount(job.ID) + if err != nil { + t.Fatalf("GetJobRetryCount: %v", err) + } + if retryCount != 0 { + t.Errorf("retry_count=%d, want 0 (quota should skip retries)", retryCount) + } + + // Agent should be in cooldown + if !tc.Pool.isAgentCoolingDown("gemini") { + t.Error("expected gemini in cooldown after quota error") + } + + // Broadcast should have fired + select { + case ev := <-eventCh: + if ev.Type != "review.failed" { + t.Errorf("event type=%q, want review.failed", ev.Type) + } + if !strings.HasPrefix(ev.Error, quotaErrorPrefix) { + t.Errorf("event error=%q, want prefix %q", ev.Error, quotaErrorPrefix) + } + case <-time.After(time.Second): + t.Error("no broadcast event received") + } +} + +func TestFailOrRetryInner_NonQuotaStillRetries(t *testing.T) { + tc := newWorkerTestContext(t, 1) + sha := testutil.GetHeadSHA(t, tc.TmpDir) + job := tc.createAndClaimJob(t, sha, "test-worker") + + // A non-quota agent error should follow the normal retry path + tc.Pool.failOrRetryInner("test-worker", job, "gemini", "connection reset", true) + + updated, err := tc.DB.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + // Should be queued for retry, not failed + if updated.Status != storage.JobStatusQueued { + t.Errorf("status=%q, want queued (retry)", updated.Status) + } + + retryCount, err := tc.DB.GetJobRetryCount(job.ID) + if err != nil { + t.Fatalf("GetJobRetryCount: %v", err) + } + if retryCount != 1 { + t.Errorf("retry_count=%d, want 1", retryCount) + } + + // Agent should NOT be in cooldown + if tc.Pool.isAgentCoolingDown("gemini") { + t.Error("expected gemini NOT in cooldown for non-quota error") + } +} + +func TestFailoverOrFail_FailsOverToBackup(t *testing.T) { + tc := newWorkerTestContext(t, 1) + sha := testutil.GetHeadSHA(t, tc.TmpDir) + + // Configure backup agent + cfg := config.DefaultConfig() + cfg.DefaultBackupAgent = "test" + tc.Pool = NewWorkerPool(tc.DB, NewStaticConfig(cfg), 1, tc.Broadcaster, nil) + + // Enqueue with agent "codex" (backup is "test") + commit, err := tc.DB.GetOrCreateCommit(tc.Repo.ID, sha, "A", "S", time.Now()) + if err != nil { + t.Fatalf("GetOrCreateCommit: %v", err) + } + job, err := tc.DB.EnqueueJob(storage.EnqueueOpts{ + RepoID: tc.Repo.ID, + CommitID: commit.ID, + GitRef: sha, + Agent: "codex", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + claimed, err := tc.DB.ClaimJob("test-worker") + if err != nil || claimed.ID != job.ID { + t.Fatalf("ClaimJob: err=%v, claimed=%v", err, claimed) + } + // Fill in RepoPath so resolveBackupAgent can work + job.RepoPath = tc.TmpDir + + tc.Pool.failoverOrFail("test-worker", job, "codex", "quota exhausted") + + updated, err := tc.DB.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + // Should be queued for failover, agent changed to "test" + if updated.Status != storage.JobStatusQueued { + t.Errorf("status=%q, want queued (failover)", updated.Status) + } + if updated.Agent != "test" { + t.Errorf("agent=%q, want test (failover)", updated.Agent) + } +} + +func TestFailoverOrFail_NoBackupFailsWithQuotaPrefix(t *testing.T) { + tc := newWorkerTestContext(t, 1) + sha := testutil.GetHeadSHA(t, tc.TmpDir) + job := tc.createAndClaimJob(t, sha, "test-worker") + + // No backup configured — should fail with quota prefix + tc.Pool.failoverOrFail("test-worker", job, "test", "quota exhausted") + + updated, err := tc.DB.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + if updated.Status != storage.JobStatusFailed { + t.Errorf("status=%q, want failed", updated.Status) + } + if !strings.HasPrefix(updated.Error, quotaErrorPrefix) { + t.Errorf("error=%q, want prefix %q", updated.Error, quotaErrorPrefix) + } +} + func TestResolveBackupAgent(t *testing.T) { tests := []struct { name string From 76e7ef40141fca307c2c1ac0a50e4ef2f1ca6d2e Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 19 Feb 2026 10:53:37 -0600 Subject: [PATCH 03/11] Fix quota detection regression: add exhausted variants, drop bare 429 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add "quota exhausted", "quota_exhausted", and "insufficient_quota" patterns to isQuotaError — the previous tightening dropped the bare "quota" match but only added "quota exceeded", missing the exhausted variant that Gemini actually returns. Remove bare "429" substring match which was too broad (matched "1429", "429ms", etc.) — HTTP 429 errors are already caught by "too many requests". Add test cases for false negatives (quota exhausted) and false positives (error code 1429, timeout 429ms), plus an end-to-end failOrRetryInner test using the "quota exhausted" error variant. Co-Authored-By: Claude Opus 4.6 --- internal/daemon/worker.go | 4 +++- internal/daemon/worker_test.go | 30 +++++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index cb6eac90..f9b3c2f1 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -579,9 +579,11 @@ func isQuotaError(errMsg string) bool { "rate_limit", "quota exceeded", "quota_exceeded", + "quota exhausted", + "quota_exhausted", + "insufficient_quota", "exhausted your capacity", "too many requests", - "429", } for _, p := range patterns { if strings.Contains(lower, p) { diff --git a/internal/daemon/worker_test.go b/internal/daemon/worker_test.go index 97cff7b5..1a1329a0 100644 --- a/internal/daemon/worker_test.go +++ b/internal/daemon/worker_test.go @@ -291,16 +291,20 @@ func TestIsQuotaError(t *testing.T) { }{ {"quota exceeded for model", true}, {"QUOTA_EXCEEDED: limit reached", true}, + {"quota exhausted, reset after 8h", true}, + {"QUOTA_EXHAUSTED: try later", true}, + {"insufficient_quota: limit reached", true}, {"Rate limit reached", true}, {"rate_limit_error: too fast", true}, {"You have exhausted your capacity", true}, {"Too Many Requests (429)", true}, - {"HTTP 429: slow down", true}, {"RESOURCE EXHAUSTED: try later", true}, {"connection reset by peer", false}, {"timeout after 30s", false}, {"agent not found", false}, {"disk quota full", false}, + {"error code 1429", false}, + {"timeout after 429ms", false}, {"", false}, } for _, tt := range tests { @@ -452,6 +456,30 @@ func TestFailOrRetryInner_QuotaSkipsRetries(t *testing.T) { } } +func TestFailOrRetryInner_QuotaExhaustedVariant(t *testing.T) { + tc := newWorkerTestContext(t, 1) + sha := testutil.GetHeadSHA(t, tc.TmpDir) + job := tc.createAndClaimJob(t, sha, "test-worker") + + // "quota exhausted" (not "quota exceeded") must also trigger quota-skip + tc.Pool.failOrRetryInner("test-worker", job, "gemini", "quota exhausted, reset after 2h", true) + + updated, err := tc.DB.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + if updated.Status != storage.JobStatusFailed { + t.Errorf("status=%q, want failed", updated.Status) + } + if !strings.HasPrefix(updated.Error, quotaErrorPrefix) { + t.Errorf("error=%q, want prefix %q", updated.Error, quotaErrorPrefix) + } + retryCount, _ := tc.DB.GetJobRetryCount(job.ID) + if retryCount != 0 { + t.Errorf("retry_count=%d, want 0", retryCount) + } +} + func TestFailOrRetryInner_NonQuotaStillRetries(t *testing.T) { tc := newWorkerTestContext(t, 1) sha := testutil.GetHeadSHA(t, tc.TmpDir) From 367bcb604a33601dc5e1f8af5a912a7f79523a7e Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 19 Feb 2026 10:55:17 -0600 Subject: [PATCH 04/11] Add bounded 429 patterns to quota detection Add "http 429", "status 429", and "error 429" to isQuotaError so HTTP 429 responses without "too many requests" text are still caught. These bounded forms avoid the false positives that bare "429" caused (e.g., "1429", "429ms"). Co-Authored-By: Claude Opus 4.6 --- internal/daemon/worker.go | 3 +++ internal/daemon/worker_test.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index f9b3c2f1..c10f496b 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -584,6 +584,9 @@ func isQuotaError(errMsg string) bool { "insufficient_quota", "exhausted your capacity", "too many requests", + "http 429", + "status 429", + "error 429", } for _, p := range patterns { if strings.Contains(lower, p) { diff --git a/internal/daemon/worker_test.go b/internal/daemon/worker_test.go index 1a1329a0..f85a6f56 100644 --- a/internal/daemon/worker_test.go +++ b/internal/daemon/worker_test.go @@ -298,6 +298,9 @@ func TestIsQuotaError(t *testing.T) { {"rate_limit_error: too fast", true}, {"You have exhausted your capacity", true}, {"Too Many Requests (429)", true}, + {"HTTP 429: slow down", true}, + {"status 429 received from API", true}, + {"Error 429: rate limited", true}, {"RESOURCE EXHAUSTED: try later", true}, {"connection reset by peer", false}, {"timeout after 30s", false}, From 6e1970bb56e9f5f537f5133c92aa09c8f9877a48 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 19 Feb 2026 14:23:36 -0600 Subject: [PATCH 05/11] Narrow quota detection to hard exhaustion, clean up expired cooldowns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Restrict isQuotaError to hard quota-exhaustion patterns only (resource exhausted, quota exceeded/exhausted, insufficient_quota). Transient rate-limit/429 errors now follow the normal retry path instead of triggering cooldown and skip — they're recoverable with retries. Clean up expired cooldown entries eagerly in isAgentCoolingDown via write-lock upgrade and double-check, preventing unbounded map growth over long daemon runtimes. Guard formatAllFailedComment against empty reviews slice so it doesn't misclassify a no-results state as "all quota skipped". Co-Authored-By: Claude Opus 4.6 --- internal/daemon/ci_poller.go | 2 +- internal/daemon/worker.go | 26 ++++++++++++++------------ internal/daemon/worker_test.go | 16 ++++++++-------- 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/internal/daemon/ci_poller.go b/internal/daemon/ci_poller.go index 8467de0c..ee6f4e27 100644 --- a/internal/daemon/ci_poller.go +++ b/internal/daemon/ci_poller.go @@ -1234,7 +1234,7 @@ func formatRawBatchComment(reviews []storage.BatchReviewResult, headSHA string) // formatAllFailedComment formats a comment when every job in a batch failed. func formatAllFailedComment(reviews []storage.BatchReviewResult, headSHA string) string { quotaSkips := countQuotaFailures(reviews) - allQuota := quotaSkips == len(reviews) + allQuota := len(reviews) > 0 && quotaSkips == len(reviews) var b strings.Builder if allQuota { diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index c10f496b..377711b5 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -568,25 +568,19 @@ const quotaErrorPrefix = "quota: " // contain a parseable "reset after" token. const defaultCooldown = 30 * time.Minute -// isQuotaError returns true if the error message indicates an API quota -// or rate-limit exhaustion (case-insensitive). Patterns are scoped to -// provider API errors to avoid false positives (e.g., disk quota). +// isQuotaError returns true if the error message indicates a hard API +// quota exhaustion (case-insensitive). Transient rate-limit/429 errors +// are excluded — those should go through normal retries, not cooldown. func isQuotaError(errMsg string) bool { lower := strings.ToLower(errMsg) patterns := []string{ "resource exhausted", - "rate limit", - "rate_limit", "quota exceeded", "quota_exceeded", "quota exhausted", "quota_exhausted", "insufficient_quota", "exhausted your capacity", - "too many requests", - "http 429", - "status 429", - "error 429", } for _, p := range patterns { if strings.Contains(lower, p) { @@ -630,18 +624,26 @@ func (wp *WorkerPool) cooldownAgent(name string, until time.Time) { } // isAgentCoolingDown returns true if the agent is currently in a -// quota cooldown period. +// quota cooldown period. Expired entries are cleaned up eagerly. func (wp *WorkerPool) isAgentCoolingDown(name string) bool { wp.agentCooldownsMu.RLock() - defer wp.agentCooldownsMu.RUnlock() expiry, ok := wp.agentCooldowns[name] if !ok { + wp.agentCooldownsMu.RUnlock() return false } if time.Now().After(expiry) { - // Expired — clean up lazily on next write + wp.agentCooldownsMu.RUnlock() + // Upgrade to write lock and delete expired entry + wp.agentCooldownsMu.Lock() + // Re-check under write lock (may have been updated) + if exp, ok := wp.agentCooldowns[name]; ok && time.Now().After(exp) { + delete(wp.agentCooldowns, name) + } + wp.agentCooldownsMu.Unlock() return false } + wp.agentCooldownsMu.RUnlock() return true } diff --git a/internal/daemon/worker_test.go b/internal/daemon/worker_test.go index f85a6f56..a5e93c4b 100644 --- a/internal/daemon/worker_test.go +++ b/internal/daemon/worker_test.go @@ -289,25 +289,25 @@ func TestIsQuotaError(t *testing.T) { errMsg string want bool }{ + // Hard quota exhaustion — should trigger cooldown/skip {"quota exceeded for model", true}, {"QUOTA_EXCEEDED: limit reached", true}, {"quota exhausted, reset after 8h", true}, {"QUOTA_EXHAUSTED: try later", true}, {"insufficient_quota: limit reached", true}, - {"Rate limit reached", true}, - {"rate_limit_error: too fast", true}, {"You have exhausted your capacity", true}, - {"Too Many Requests (429)", true}, - {"HTTP 429: slow down", true}, - {"status 429 received from API", true}, - {"Error 429: rate limited", true}, {"RESOURCE EXHAUSTED: try later", true}, + // Transient rate limits — should NOT trigger cooldown (use retries) + {"Rate limit reached", false}, + {"rate_limit_error: too fast", false}, + {"Too Many Requests (429)", false}, + {"HTTP 429: slow down", false}, + {"status 429 received from API", false}, + // Other non-quota errors {"connection reset by peer", false}, {"timeout after 30s", false}, {"agent not found", false}, {"disk quota full", false}, - {"error code 1429", false}, - {"timeout after 429ms", false}, {"", false}, } for _, tt := range tests { From cfe42860370ef7defef311871ad3b7b0786cb387 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 19 Feb 2026 14:32:50 -0600 Subject: [PATCH 06/11] Check backup agent cooldown in retry-exhaustion failover path The normal retry-exhaustion failover in failOrRetryInner did not check whether the backup agent was in cooldown, creating a potential bounce loop between agents. Now skips failover and fails the job when the backup agent is cooling down, consistent with failoverOrFail behavior. Co-Authored-By: Claude Opus 4.6 --- internal/daemon/worker.go | 2 +- internal/daemon/worker_test.go | 71 ++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index 377711b5..1829353c 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -493,7 +493,7 @@ func (wp *WorkerPool) failOrRetryInner(workerID string, job *storage.ReviewJob, // Retries exhausted -- attempt failover to backup agent if this is an agent error if agentError { backupAgent := wp.resolveBackupAgent(job) - if backupAgent != "" { + if backupAgent != "" && !wp.isAgentCoolingDown(backupAgent) { failedOver, foErr := wp.db.FailoverJob(job.ID, workerID, backupAgent) if foErr != nil { log.Printf("[%s] Error attempting failover for job %d: %v", workerID, job.ID, foErr) diff --git a/internal/daemon/worker_test.go b/internal/daemon/worker_test.go index a5e93c4b..7bfe0e3d 100644 --- a/internal/daemon/worker_test.go +++ b/internal/daemon/worker_test.go @@ -579,6 +579,77 @@ func TestFailoverOrFail_NoBackupFailsWithQuotaPrefix(t *testing.T) { } } +func TestFailOrRetryInner_RetryExhaustedBackupInCooldown(t *testing.T) { + tc := newWorkerTestContext(t, 1) + sha := testutil.GetHeadSHA(t, tc.TmpDir) + + // Configure backup agent + cfg := config.DefaultConfig() + cfg.DefaultBackupAgent = "test" + tc.Pool = NewWorkerPool( + tc.DB, NewStaticConfig(cfg), 1, tc.Broadcaster, nil, + ) + + // Enqueue with agent "codex" + commit, err := tc.DB.GetOrCreateCommit( + tc.Repo.ID, sha, "A", "S", time.Now(), + ) + if err != nil { + t.Fatalf("GetOrCreateCommit: %v", err) + } + job, err := tc.DB.EnqueueJob(storage.EnqueueOpts{ + RepoID: tc.Repo.ID, + CommitID: commit.ID, + GitRef: sha, + Agent: "codex", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + claimed, err := tc.DB.ClaimJob("test-worker") + if err != nil || claimed.ID != job.ID { + t.Fatalf("ClaimJob: err=%v, claimed=%v", err, claimed) + } + job.RepoPath = tc.TmpDir + + // Exhaust retries + for i := range maxRetries { + tc.Pool.failOrRetryInner( + "test-worker", job, "codex", + "connection reset", true, + ) + reclaimed, claimErr := tc.DB.ClaimJob("test-worker") + if claimErr != nil || reclaimed == nil { + t.Fatalf("re-claim after retry %d: %v", i, claimErr) + } + job = reclaimed + } + + // Put the backup agent in cooldown + tc.Pool.cooldownAgent( + "test", time.Now().Add(30*time.Minute), + ) + + // Final failure — retries exhausted, backup in cooldown + tc.Pool.failOrRetryInner( + "test-worker", job, "codex", + "connection reset", true, + ) + + updated, err := tc.DB.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + // Should be failed, NOT queued for failover to cooled-down agent + if updated.Status != storage.JobStatusFailed { + t.Errorf("status=%q, want failed", updated.Status) + } + // Agent should still be codex (not failed over) + if updated.Agent != "codex" { + t.Errorf("agent=%q, want codex (no failover)", updated.Agent) + } +} + func TestResolveBackupAgent(t *testing.T) { tests := []struct { name string From 80fb51aacad56d8e97ed6bc6de78277f85f0bad0 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 19 Feb 2026 14:44:52 -0600 Subject: [PATCH 07/11] Fix false negative in isAgentCoolingDown lock-upgrade path When the read lock saw an expired entry, the write-lock recheck would unconditionally return false even if another goroutine had refreshed the cooldown between RUnlock and Lock. Now returns true when the rechecked entry exists and is no longer expired. Adds tests for expired entry cleanup and concurrent refresh. Co-Authored-By: Claude Opus 4.6 --- internal/daemon/worker.go | 9 ++++--- internal/daemon/worker_test.go | 44 ++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index 1829353c..1d21285f 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -636,12 +636,15 @@ func (wp *WorkerPool) isAgentCoolingDown(name string) bool { wp.agentCooldownsMu.RUnlock() // Upgrade to write lock and delete expired entry wp.agentCooldownsMu.Lock() - // Re-check under write lock (may have been updated) - if exp, ok := wp.agentCooldowns[name]; ok && time.Now().After(exp) { + // Re-check under write lock (may have been refreshed) + exp, stillExists := wp.agentCooldowns[name] + if stillExists && time.Now().After(exp) { delete(wp.agentCooldowns, name) + wp.agentCooldownsMu.Unlock() + return false } wp.agentCooldownsMu.Unlock() - return false + return stillExists } wp.agentCooldownsMu.RUnlock() return true diff --git a/internal/daemon/worker_test.go b/internal/daemon/worker_test.go index 7bfe0e3d..6d0ff20d 100644 --- a/internal/daemon/worker_test.go +++ b/internal/daemon/worker_test.go @@ -408,6 +408,50 @@ func TestAgentCooldown(t *testing.T) { } } +func TestAgentCooldown_ExpiredEntryDeleted(t *testing.T) { + cfg := config.DefaultConfig() + pool := NewWorkerPool( + nil, NewStaticConfig(cfg), 1, NewBroadcaster(), nil, + ) + + // Set an already-expired cooldown + pool.cooldownAgent("gemini", time.Now().Add(-1*time.Second)) + + // Should return false and clean up the entry + if pool.isAgentCoolingDown("gemini") { + t.Error("expected expired cooldown to return false") + } + + // Entry should be deleted from the map + pool.agentCooldownsMu.RLock() + _, exists := pool.agentCooldowns["gemini"] + pool.agentCooldownsMu.RUnlock() + if exists { + t.Error("expected expired entry to be deleted from map") + } +} + +func TestAgentCooldown_RefreshDuringUpgrade(t *testing.T) { + cfg := config.DefaultConfig() + pool := NewWorkerPool( + nil, NewStaticConfig(cfg), 1, NewBroadcaster(), nil, + ) + + // Set an already-expired cooldown + pool.cooldownAgent("gemini", time.Now().Add(-1*time.Second)) + + // Simulate another goroutine refreshing the cooldown between + // RUnlock and Lock by directly writing a fresh expiry. + pool.agentCooldownsMu.Lock() + pool.agentCooldowns["gemini"] = time.Now().Add(1 * time.Hour) + pool.agentCooldownsMu.Unlock() + + // isAgentCoolingDown should return true: the entry was refreshed + if !pool.isAgentCoolingDown("gemini") { + t.Error("expected refreshed cooldown to return true") + } +} + func TestFailOrRetryInner_QuotaSkipsRetries(t *testing.T) { tc := newWorkerTestContext(t, 1) sha := testutil.GetHeadSHA(t, tc.TmpDir) From c05bbbc0e15288a16f4837b8e594fce7d69d8120 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 19 Feb 2026 14:46:02 -0600 Subject: [PATCH 08/11] Add test for retry-exhaustion failover when backup is available Paired test for RetryExhaustedBackupInCooldown: verifies that the normal retry-exhaustion path still fails over to the backup agent when it is not in cooldown. Guards against condition inversions. Co-Authored-By: Claude Opus 4.6 --- internal/daemon/worker_test.go | 65 ++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/internal/daemon/worker_test.go b/internal/daemon/worker_test.go index 6d0ff20d..c2fa52ae 100644 --- a/internal/daemon/worker_test.go +++ b/internal/daemon/worker_test.go @@ -694,6 +694,71 @@ func TestFailOrRetryInner_RetryExhaustedBackupInCooldown(t *testing.T) { } } +func TestFailOrRetryInner_RetryExhaustedFailsOverToBackup(t *testing.T) { + tc := newWorkerTestContext(t, 1) + sha := testutil.GetHeadSHA(t, tc.TmpDir) + + // Configure backup agent + cfg := config.DefaultConfig() + cfg.DefaultBackupAgent = "test" + tc.Pool = NewWorkerPool( + tc.DB, NewStaticConfig(cfg), 1, tc.Broadcaster, nil, + ) + + // Enqueue with agent "codex" + commit, err := tc.DB.GetOrCreateCommit( + tc.Repo.ID, sha, "A", "S", time.Now(), + ) + if err != nil { + t.Fatalf("GetOrCreateCommit: %v", err) + } + job, err := tc.DB.EnqueueJob(storage.EnqueueOpts{ + RepoID: tc.Repo.ID, + CommitID: commit.ID, + GitRef: sha, + Agent: "codex", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + claimed, err := tc.DB.ClaimJob("test-worker") + if err != nil || claimed.ID != job.ID { + t.Fatalf("ClaimJob: err=%v, claimed=%v", err, claimed) + } + job.RepoPath = tc.TmpDir + + // Exhaust retries + for i := range maxRetries { + tc.Pool.failOrRetryInner( + "test-worker", job, "codex", + "connection reset", true, + ) + reclaimed, claimErr := tc.DB.ClaimJob("test-worker") + if claimErr != nil || reclaimed == nil { + t.Fatalf("re-claim after retry %d: %v", i, claimErr) + } + job = reclaimed + } + + // Final failure — retries exhausted, backup available + tc.Pool.failOrRetryInner( + "test-worker", job, "codex", + "connection reset", true, + ) + + updated, err := tc.DB.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + // Should be queued for failover, agent changed to "test" + if updated.Status != storage.JobStatusQueued { + t.Errorf("status=%q, want queued (failover)", updated.Status) + } + if updated.Agent != "test" { + t.Errorf("agent=%q, want test (failover)", updated.Agent) + } +} + func TestResolveBackupAgent(t *testing.T) { tests := []struct { name string From 2de4272d2d8bd76cd45efa8335b3d24bc92fc43c Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 19 Feb 2026 14:50:03 -0600 Subject: [PATCH 09/11] Use test hook to exercise cooldown lock-upgrade race window The RefreshDuringUpgrade test was setting the fresh expiry before calling isAgentCoolingDown, so the read-lock fast path returned true without ever reaching the lock-upgrade branch. Now uses a test hook (testHookCooldownLockUpgrade) that fires between RUnlock and Lock, injecting the refresh in the actual race window. Co-Authored-By: Claude Opus 4.6 --- internal/daemon/worker.go | 6 +++++- internal/daemon/worker_test.go | 17 ++++++++++------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index 1d21285f..d327410e 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -41,7 +41,8 @@ type WorkerPool struct { outputBuffers *OutputBuffer // Test hooks for deterministic synchronization (nil in production) - testHookAfterSecondCheck func() // Called after second runningJobs check, before second DB lookup + testHookAfterSecondCheck func() // Called after second runningJobs check, before second DB lookup + testHookCooldownLockUpgrade func() // Called between RUnlock and Lock in isAgentCoolingDown } // NewWorkerPool creates a new worker pool @@ -634,6 +635,9 @@ func (wp *WorkerPool) isAgentCoolingDown(name string) bool { } if time.Now().After(expiry) { wp.agentCooldownsMu.RUnlock() + if wp.testHookCooldownLockUpgrade != nil { + wp.testHookCooldownLockUpgrade() + } // Upgrade to write lock and delete expired entry wp.agentCooldownsMu.Lock() // Re-check under write lock (may have been refreshed) diff --git a/internal/daemon/worker_test.go b/internal/daemon/worker_test.go index c2fa52ae..12a96015 100644 --- a/internal/daemon/worker_test.go +++ b/internal/daemon/worker_test.go @@ -437,16 +437,19 @@ func TestAgentCooldown_RefreshDuringUpgrade(t *testing.T) { nil, NewStaticConfig(cfg), 1, NewBroadcaster(), nil, ) - // Set an already-expired cooldown + // Set an already-expired cooldown so RLock path enters upgrade pool.cooldownAgent("gemini", time.Now().Add(-1*time.Second)) - // Simulate another goroutine refreshing the cooldown between - // RUnlock and Lock by directly writing a fresh expiry. - pool.agentCooldownsMu.Lock() - pool.agentCooldowns["gemini"] = time.Now().Add(1 * time.Hour) - pool.agentCooldownsMu.Unlock() + // Use the test hook to refresh the cooldown in the window + // between RUnlock and Lock, simulating a concurrent goroutine. + pool.testHookCooldownLockUpgrade = func() { + pool.agentCooldownsMu.Lock() + pool.agentCooldowns["gemini"] = time.Now().Add(1 * time.Hour) + pool.agentCooldownsMu.Unlock() + } - // isAgentCoolingDown should return true: the entry was refreshed + // The read-lock path sees expired, upgrades, recheck sees + // refreshed entry — should return true. if !pool.isAgentCoolingDown("gemini") { t.Error("expected refreshed cooldown to return true") } From f2245497e4d6c7a3cccabb04a1b5e0f0985c9a1b Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 19 Feb 2026 15:18:52 -0600 Subject: [PATCH 10/11] Clamp cooldown duration and resolve agent aliases in cooldown checks parseQuotaCooldown now clamps to [1m, 24h] so malformed error text cannot suppress an agent indefinitely. The pre-flight cooldown check in processJob and the same-agent guard in resolveBackupAgent now resolve aliases via agent.CanonicalName, so "claude" correctly checks cooldown for "claude-code" and backup failover to "claude-code" is recognized as same-agent when the job uses the "claude" alias. Co-Authored-By: Claude Opus 4.6 --- internal/agent/agent.go | 6 +++ internal/daemon/worker.go | 22 ++++++--- internal/daemon/worker_test.go | 81 ++++++++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 6 deletions(-) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 303bdfd3..45c5afbb 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -111,6 +111,12 @@ func resolveAlias(name string) string { return name } +// CanonicalName resolves an agent alias to its canonical name. +// Returns the name unchanged if it is not an alias. +func CanonicalName(name string) string { + return resolveAlias(name) +} + // Register adds an agent to the registry func Register(a Agent) { registry[a.Name()] = a diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index d327410e..c704297c 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -286,12 +286,14 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { wp.registerRunningJob(job.ID, cancel) defer wp.unregisterRunningJob(job.ID) - // Skip immediately if the agent is in quota cooldown - if wp.isAgentCoolingDown(job.Agent) { + // Skip immediately if the agent is in quota cooldown. + // Resolve alias so "claude" checks cooldown for "claude-code". + canonicalAgent := agent.CanonicalName(job.Agent) + if wp.isAgentCoolingDown(canonicalAgent) { log.Printf("[%s] Agent %s in cooldown, skipping job %d", - workerID, job.Agent, job.ID) - wp.failoverOrFail(workerID, job, job.Agent, - fmt.Sprintf("agent %s quota cooldown active", job.Agent)) + workerID, canonicalAgent, job.ID) + wp.failoverOrFail(workerID, job, canonicalAgent, + fmt.Sprintf("agent %s quota cooldown active", canonicalAgent)) return } @@ -540,7 +542,7 @@ func (wp *WorkerPool) resolveBackupAgent(job *storage.ReviewJob) string { if err != nil || !agent.IsAvailable(resolved.Name()) { return "" } - if resolved.Name() == job.Agent { + if resolved.Name() == agent.CanonicalName(job.Agent) { return "" } return resolved.Name() @@ -568,6 +570,8 @@ const quotaErrorPrefix = "quota: " // defaultCooldown is the fallback duration when the error message doesn't // contain a parseable "reset after" token. const defaultCooldown = 30 * time.Minute +const minCooldown = 1 * time.Minute +const maxCooldown = 24 * time.Hour // isQuotaError returns true if the error message indicates a hard API // quota exhaustion (case-insensitive). Transient rate-limit/429 errors @@ -610,6 +614,12 @@ func parseQuotaCooldown(errMsg string, fallback time.Duration) time.Duration { if err != nil || d <= 0 { return fallback } + if d < minCooldown { + return minCooldown + } + if d > maxCooldown { + return maxCooldown + } return d } diff --git a/internal/daemon/worker_test.go b/internal/daemon/worker_test.go index 12a96015..562fe70e 100644 --- a/internal/daemon/worker_test.go +++ b/internal/daemon/worker_test.go @@ -363,6 +363,18 @@ func TestParseQuotaCooldown(t *testing.T) { fallback: 30 * time.Minute, want: 1*time.Hour + 30*time.Minute, }, + { + name: "clamped to max 24h", + errMsg: "reset after 99999h", + fallback: 30 * time.Minute, + want: 24 * time.Hour, + }, + { + name: "clamped to min 1m", + errMsg: "reset after 5s", + fallback: 30 * time.Minute, + want: 1 * time.Minute, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -455,6 +467,75 @@ func TestAgentCooldown_RefreshDuringUpgrade(t *testing.T) { } } +func TestProcessJob_CooldownResolvesAlias(t *testing.T) { + tc := newWorkerTestContext(t, 1) + sha := testutil.GetHeadSHA(t, tc.TmpDir) + + // Enqueue a job with the alias "claude" (canonical: "claude-code") + commit, err := tc.DB.GetOrCreateCommit( + tc.Repo.ID, sha, "A", "S", time.Now(), + ) + if err != nil { + t.Fatalf("GetOrCreateCommit: %v", err) + } + job, err := tc.DB.EnqueueJob(storage.EnqueueOpts{ + RepoID: tc.Repo.ID, + CommitID: commit.ID, + GitRef: sha, + Agent: "claude", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + claimed, err := tc.DB.ClaimJob("test-worker") + if err != nil || claimed.ID != job.ID { + t.Fatalf("ClaimJob: err=%v, claimed=%v", err, claimed) + } + + // Cool down "claude-code" (canonical name) + tc.Pool.cooldownAgent( + "claude-code", time.Now().Add(1*time.Hour), + ) + + // processJob should detect cooldown via alias resolution + tc.Pool.processJob("test-worker", claimed) + + updated, err := tc.DB.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + if updated.Status != storage.JobStatusFailed { + t.Errorf( + "status=%q, want failed (cooldown via alias)", + updated.Status, + ) + } +} + +func TestResolveBackupAgent_AliasMatchesPrimary(t *testing.T) { + // "claude" is an alias for "claude-code". If job.Agent is "claude" + // and backup resolves to "claude-code", they are the same agent. + cfg := config.DefaultConfig() + cfg.DefaultBackupAgent = "claude-code" + pool := NewWorkerPool( + nil, NewStaticConfig(cfg), 1, NewBroadcaster(), nil, + ) + job := &storage.ReviewJob{ + Agent: "claude", + RepoPath: t.TempDir(), + } + got := pool.resolveBackupAgent(job) + // Should return "" because claude == claude-code after alias + // resolution. (May also return "" if claude-code binary is not + // installed, which is fine — both reasons are correct.) + if got != "" { + t.Errorf( + "resolveBackupAgent() = %q, want empty (alias match)", + got, + ) + } +} + func TestFailOrRetryInner_QuotaSkipsRetries(t *testing.T) { tc := newWorkerTestContext(t, 1) sha := testutil.GetHeadSHA(t, tc.TmpDir) From 05d2e6fccfd574cd879b573d0bd2b8852514ff36 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 19 Feb 2026 16:03:34 -0600 Subject: [PATCH 11/11] Canonicalize agent names inside cooldown methods cooldownAgent and isAgentCoolingDown now resolve aliases internally rather than relying on callers to pass canonical names. This makes the cooldown map key invariant (always canonical) impossible to violate from any call site. Co-Authored-By: Claude Opus 4.6 --- internal/daemon/worker.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index c704297c..022d366d 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -626,6 +626,7 @@ func parseQuotaCooldown(errMsg string, fallback time.Duration) time.Duration { // cooldownAgent sets or extends the cooldown expiry for an agent. // Only extends — never shortens an existing cooldown. func (wp *WorkerPool) cooldownAgent(name string, until time.Time) { + name = agent.CanonicalName(name) wp.agentCooldownsMu.Lock() defer wp.agentCooldownsMu.Unlock() if existing, ok := wp.agentCooldowns[name]; ok && existing.After(until) { @@ -637,6 +638,7 @@ func (wp *WorkerPool) cooldownAgent(name string, until time.Time) { // isAgentCoolingDown returns true if the agent is currently in a // quota cooldown period. Expired entries are cleaned up eagerly. func (wp *WorkerPool) isAgentCoolingDown(name string) bool { + name = agent.CanonicalName(name) wp.agentCooldownsMu.RLock() expiry, ok := wp.agentCooldowns[name] if !ok {