diff --git a/internal/config/config.go b/internal/config/config.go index 6d88e27f..e0ab9cc0 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -49,6 +49,7 @@ type Config struct { ReviewContextCount int `toml:"review_context_count"` DefaultAgent string `toml:"default_agent"` DefaultModel string `toml:"default_model"` // Default model for agents (format varies by agent) + DefaultBackupAgent string `toml:"default_backup_agent"` JobTimeoutMinutes int `toml:"job_timeout_minutes"` // Workflow-specific agent/model configuration @@ -92,7 +93,15 @@ type Config struct { DesignModelFast string `toml:"design_model_fast"` DesignModelStandard string `toml:"design_model_standard"` DesignModelThorough string `toml:"design_model_thorough"` - AllowUnsafeAgents *bool `toml:"allow_unsafe_agents"` // nil = not set, allows commands to choose their own default + + // Backup agents for failover + ReviewBackupAgent string `toml:"review_backup_agent"` + RefineBackupAgent string `toml:"refine_backup_agent"` + FixBackupAgent string `toml:"fix_backup_agent"` + SecurityBackupAgent string `toml:"security_backup_agent"` + DesignBackupAgent string `toml:"design_backup_agent"` + + AllowUnsafeAgents *bool `toml:"allow_unsafe_agents"` // nil = not set, allows commands to choose their own default // Agent commands CodexCmd string `toml:"codex_cmd"` @@ -348,6 +357,7 @@ type RepoCIConfig struct { type RepoConfig struct { Agent string `toml:"agent"` Model string `toml:"model"` // Model for agents (format varies by agent) + BackupAgent string `toml:"backup_agent"` ReviewContextCount int `toml:"review_context_count"` ReviewGuidelines string `toml:"review_guidelines"` JobTimeoutMinutes int `toml:"job_timeout_minutes"` @@ -402,6 +412,13 @@ type RepoConfig struct { DesignModelStandard string `toml:"design_model_standard"` DesignModelThorough string `toml:"design_model_thorough"` + // Backup agents for failover + ReviewBackupAgent string `toml:"review_backup_agent"` + RefineBackupAgent string `toml:"refine_backup_agent"` + FixBackupAgent string `toml:"fix_backup_agent"` + SecurityBackupAgent string `toml:"security_backup_agent"` + DesignBackupAgent string `toml:"design_backup_agent"` + // Hooks configuration (per-repo) Hooks []HookConfig `toml:"hooks"` @@ -729,6 +746,55 @@ func ResolveModelForWorkflow(cli, repoPath string, globalCfg *Config, workflow, return getWorkflowValue(repoCfg, globalCfg, workflow, level, false) } +// ResolveBackupAgentForWorkflow returns the backup agent for a workflow, +// or empty string if none is configured. +// Priority: +// 1. Repo {workflow}_backup_agent +// 2. Repo backup_agent (generic) +// 3. Global {workflow}_backup_agent +// 4. Global default_backup_agent +// 5. "" (no backup) +func ResolveBackupAgentForWorkflow(repoPath string, globalCfg *Config, workflow string) string { + repoCfg, _ := LoadRepoConfig(repoPath) + + // Repo layer: workflow-specific > generic + if repoCfg != nil { + if s := lookupFieldByTag(reflect.ValueOf(*repoCfg), workflow+"_backup_agent"); s != "" { + return s + } + if s := strings.TrimSpace(repoCfg.BackupAgent); s != "" { + return s + } + } + + // Global layer: workflow-specific > default + if globalCfg != nil { + if s := lookupFieldByTag(reflect.ValueOf(*globalCfg), workflow+"_backup_agent"); s != "" { + return s + } + if s := strings.TrimSpace(globalCfg.DefaultBackupAgent); s != "" { + return s + } + } + + return "" +} + +// lookupFieldByTag finds a struct field by its TOML tag and returns its trimmed value. +func lookupFieldByTag(v reflect.Value, key string) string { + t := v.Type() + for i := 0; i < t.NumField(); i++ { + tag := t.Field(i).Tag.Get("toml") + if tag == "" { + continue + } + if strings.Split(tag, ",")[0] == key { + return strings.TrimSpace(v.Field(i).String()) + } + } + return "" +} + // getWorkflowValue looks up agent or model config following Option A priority. func getWorkflowValue(repo *RepoConfig, global *Config, workflow, level string, isAgent bool) string { // Repo layer: level-specific > workflow-specific > generic diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 84987081..49bab706 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -5,6 +5,7 @@ import ( "os" "os/exec" "path/filepath" + "reflect" "strings" "testing" @@ -1227,6 +1228,101 @@ func TestResolveModelForWorkflow(t *testing.T) { } } +func TestResolveBackupAgentForWorkflow(t *testing.T) { + tests := []struct { + name string + repo map[string]string + global map[string]string + workflow string + expect string + }{ + // No backup configured + {"empty config", nil, nil, "review", ""}, + {"only primary agent configured", M{"review_agent": "claude"}, nil, "review", ""}, + + // Global backup agent + {"global backup only", nil, M{"review_backup_agent": "test"}, "review", "test"}, + {"global backup for refine", nil, M{"refine_backup_agent": "claude"}, "refine", "claude"}, + {"global backup for fix", nil, M{"fix_backup_agent": "codex"}, "fix", "codex"}, + {"global backup for security", nil, M{"security_backup_agent": "gemini"}, "security", "gemini"}, + {"global backup for design", nil, M{"design_backup_agent": "droid"}, "design", "droid"}, + + // Repo backup agent overrides global + {"repo overrides global", M{"review_backup_agent": "repo-test"}, M{"review_backup_agent": "global-test"}, "review", "repo-test"}, + {"repo backup only", M{"review_backup_agent": "test"}, nil, "review", "test"}, + + // Different workflows resolve independently + {"review backup doesn't affect refine", M{"review_backup_agent": "claude"}, nil, "refine", ""}, + {"each workflow has own backup", M{"review_backup_agent": "claude", "refine_backup_agent": "codex"}, nil, "review", "claude"}, + {"each workflow has own backup - refine", M{"review_backup_agent": "claude", "refine_backup_agent": "codex"}, nil, "refine", "codex"}, + + // Unknown workflow returns empty + {"unknown workflow", M{"review_backup_agent": "test"}, nil, "unknown", ""}, + + // No reasoning level support for backup agents + {"no level variants recognized", M{"review_backup_agent_fast": "claude"}, nil, "review", ""}, + {"backup agent doesn't use levels", M{"review_backup_agent": "claude"}, nil, "review", "claude"}, + + // Default/generic backup agent fallback + {"global default_backup_agent", nil, M{"default_backup_agent": "test"}, "review", "test"}, + {"global default_backup_agent for any workflow", nil, M{"default_backup_agent": "test"}, "fix", "test"}, + {"global workflow-specific overrides default", nil, M{"default_backup_agent": "test", "review_backup_agent": "claude"}, "review", "claude"}, + {"global default used when workflow not set", nil, M{"default_backup_agent": "test", "review_backup_agent": "claude"}, "fix", "test"}, + {"repo backup_agent generic", M{"backup_agent": "repo-fallback"}, nil, "review", "repo-fallback"}, + {"repo backup_agent generic for any workflow", M{"backup_agent": "repo-fallback"}, nil, "refine", "repo-fallback"}, + {"repo workflow-specific overrides repo generic", M{"backup_agent": "generic", "review_backup_agent": "specific"}, nil, "review", "specific"}, + {"repo generic overrides global workflow-specific", M{"backup_agent": "repo"}, M{"review_backup_agent": "global"}, "review", "repo"}, + {"repo generic overrides global default", M{"backup_agent": "repo"}, M{"default_backup_agent": "global"}, "review", "repo"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create temp dir for repo config + repoDir := t.TempDir() + + // Write repo config if provided + if tt.repo != nil { + writeRepoConfig(t, repoDir, tt.repo) + } + + // Create global config if provided + var globalCfg *Config + if tt.global != nil { + globalCfg = &Config{} + populateConfigFromMap(globalCfg, tt.global) + } + + // Test the function + result := ResolveBackupAgentForWorkflow(repoDir, globalCfg, tt.workflow) + + if result != tt.expect { + t.Errorf("ResolveBackupAgentForWorkflow(%q, global, %q) = %q, want %q", + repoDir, tt.workflow, result, tt.expect) + } + }) + } +} + +// populateConfigFromMap is a helper to set config fields from a map +func populateConfigFromMap(cfg *Config, m map[string]string) { + v := reflect.ValueOf(cfg).Elem() + t := v.Type() + + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + tag := field.Tag.Get("toml") + if tag == "" { + continue + } + tagName := strings.Split(tag, ",")[0] + if val, ok := m[tagName]; ok { + if field.Type.Kind() == reflect.String { + v.Field(i).SetString(val) + } + } + } +} + // M is a shorthand type for map[string]string to keep test tables compact type M = map[string]string diff --git a/internal/daemon/ci_poller.go b/internal/daemon/ci_poller.go index 2f5d4468..d2f981e8 100644 --- a/internal/daemon/ci_poller.go +++ b/internal/daemon/ci_poller.go @@ -427,7 +427,7 @@ func (p *CIPoller) processPR(ctx context.Context, ghRepo string, pr ghPR, cfg *c resolvedAgent = resolved.Name() } - // Resolve model through workflow config when not explicitly set + // Resolve model through workflow config resolvedModel := config.ResolveModelForWorkflow(cfg.CI.Model, repo.RootPath, cfg, workflow, reasoning) job, err := p.db.EnqueueJob(storage.EnqueueOpts{ diff --git a/internal/daemon/ci_poller_test.go b/internal/daemon/ci_poller_test.go index 25d68cde..0be0842a 100644 --- a/internal/daemon/ci_poller_test.go +++ b/internal/daemon/ci_poller_test.go @@ -1278,7 +1278,9 @@ func initGitRepoWithOrigin(t *testing.T) (dir string, runGit func(args ...string runGit("init", "-b", "main") runGit("config", "user.email", "test@test.com") runGit("config", "user.name", "Test") - os.WriteFile(filepath.Join(dir, "README.md"), []byte("init"), 0644) + if err := os.WriteFile(filepath.Join(dir, "README.md"), []byte("init"), 0644); err != nil { + t.Fatalf("write README.md: %v", err) + } runGit("add", "-A") runGit("commit", "-m", "initial") runGit("remote", "add", "origin", dir) @@ -1291,8 +1293,10 @@ func TestLoadCIRepoConfig_LoadsFromDefaultBranch(t *testing.T) { dir, runGit := initGitRepoWithOrigin(t) // Commit .roborev.toml on main with CI agents override - os.WriteFile(filepath.Join(dir, ".roborev.toml"), - []byte("[ci]\nagents = [\"claude\"]\n"), 0644) + if err := os.WriteFile(filepath.Join(dir, ".roborev.toml"), + []byte("[ci]\nagents = [\"claude\"]\n"), 0644); err != nil { + t.Fatalf("write .roborev.toml: %v", err) + } runGit("add", ".roborev.toml") runGit("commit", "-m", "add config") runGit("fetch", "origin") @@ -1313,8 +1317,10 @@ func TestLoadCIRepoConfig_FallsBackWhenNoConfigOnDefaultBranch(t *testing.T) { dir, _ := initGitRepoWithOrigin(t) // No .roborev.toml on origin/main, but put one in the working tree - os.WriteFile(filepath.Join(dir, ".roborev.toml"), - []byte("[ci]\nagents = [\"codex\"]\n"), 0644) + if err := os.WriteFile(filepath.Join(dir, ".roborev.toml"), + []byte("[ci]\nagents = [\"codex\"]\n"), 0644); err != nil { + t.Fatalf("write .roborev.toml: %v", err) + } cfg, err := loadCIRepoConfig(dir) if err != nil { @@ -1332,15 +1338,19 @@ func TestLoadCIRepoConfig_PropagatesParseError(t *testing.T) { dir, runGit := initGitRepoWithOrigin(t) // Commit invalid TOML on main - os.WriteFile(filepath.Join(dir, ".roborev.toml"), - []byte("this is not valid toml [[["), 0644) + if err := os.WriteFile(filepath.Join(dir, ".roborev.toml"), + []byte("this is not valid toml [[["), 0644); err != nil { + t.Fatalf("write .roborev.toml: %v", err) + } runGit("add", ".roborev.toml") runGit("commit", "-m", "add bad config") runGit("fetch", "origin") - // Also put valid config in working tree — should NOT be used - os.WriteFile(filepath.Join(dir, ".roborev.toml"), - []byte("[ci]\nagents = [\"codex\"]\n"), 0644) + // Also put valid config in working tree -- should NOT be used + if err := os.WriteFile(filepath.Join(dir, ".roborev.toml"), + []byte("[ci]\nagents = [\"codex\"]\n"), 0644); err != nil { + t.Fatalf("write .roborev.toml: %v", err) + } cfg, err := loadCIRepoConfig(dir) if err == nil { @@ -1388,6 +1398,9 @@ func TestCIPollerProcessPR_SetsPendingCommitStatus(t *testing.T) { if sc.state != "pending" { t.Errorf("state=%q, want pending", sc.state) } + if sc.desc != "Review in progress" { + t.Errorf("desc=%q, want %q", sc.desc, "Review in progress") + } } func TestCIPollerPostBatchResults_SetsSuccessStatus(t *testing.T) { diff --git a/internal/daemon/server_test.go b/internal/daemon/server_test.go index 1a6c5d0e..e1a40387 100644 --- a/internal/daemon/server_test.go +++ b/internal/daemon/server_test.go @@ -1591,7 +1591,7 @@ func TestHandleRerunJob(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "rerun-failed", "Author", "Subject", time.Now()) job, _ := db.EnqueueJob(storage.EnqueueOpts{RepoID: repo.ID, CommitID: commit.ID, GitRef: "rerun-failed", Agent: "test"}) db.ClaimJob("worker-1") - db.FailJob(job.ID, "some error") + db.FailJob(job.ID, "", "some error") req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/job/rerun", RerunJobRequest{JobID: job.ID}) w := httptest.NewRecorder() diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index a3e99b93..996dd819 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -318,7 +318,7 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { baseAgent, err := agent.GetAvailable(job.Agent) if err != nil { log.Printf("[%s] Error getting agent: %v", workerID, err) - wp.failOrRetry(workerID, job, job.Agent, fmt.Sprintf("get agent: %v", err)) + wp.failOrRetryAgent(workerID, job, job.Agent, fmt.Sprintf("get agent: %v", err)) return } @@ -376,7 +376,7 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { return // Job already marked as canceled in DB, nothing more to do } log.Printf("[%s] Agent error: %v", workerID, err) - wp.failOrRetry(workerID, job, agentName, fmt.Sprintf("agent: %v", err)) + wp.failOrRetryAgent(workerID, job, agentName, fmt.Sprintf("agent: %v", err)) return } @@ -385,7 +385,7 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { // not produce a "done" review that misleads --wait callers. if job.JobType == "compact" && !IsValidCompactOutput(output) { log.Printf("[%s] Compact job %d produced invalid output, failing", workerID, job.ID) - wp.failOrRetry(workerID, job, agentName, "compact output invalid (empty or error)") + wp.failOrRetryAgent(workerID, job, agentName, "compact output invalid (empty or error)") return } @@ -433,17 +433,30 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { }) } -// failOrRetry attempts to retry the job, or marks it as failed if max retries reached +// failOrRetry attempts to retry the job, or marks it as failed if max retries reached. +// This is used for non-agent errors (e.g., prompt build failures) where switching agents won't help. func (wp *WorkerPool) failOrRetry(workerID string, job *storage.ReviewJob, agentName string, errorMsg string) { - retried, err := wp.db.RetryJob(job.ID, maxRetries) + wp.failOrRetryInner(workerID, job, agentName, errorMsg, false) +} + +// failOrRetryAgent is like failOrRetry but allows failover to a backup agent +// when retries are exhausted. Used for agent-execution errors where switching +// agents may resolve the issue. +func (wp *WorkerPool) failOrRetryAgent(workerID string, job *storage.ReviewJob, agentName string, errorMsg string) { + wp.failOrRetryInner(workerID, job, agentName, errorMsg, true) +} + +func (wp *WorkerPool) failOrRetryInner(workerID string, job *storage.ReviewJob, agentName string, errorMsg string, agentError bool) { + retried, err := wp.db.RetryJob(job.ID, workerID, maxRetries) if err != nil { log.Printf("[%s] Error retrying job: %v", workerID, err) - if err := wp.db.FailJob(job.ID, errorMsg); err != nil { - log.Printf("[%s] Failed to mark job %d as failed: %v", workerID, job.ID, err) - } - wp.broadcastFailed(job, agentName, errorMsg) - if wp.errorLog != nil { - wp.errorLog.LogError("worker", fmt.Sprintf("job %d failed: %s", job.ID, errorMsg), job.ID) + if updated, fErr := wp.db.FailJob(job.ID, workerID, errorMsg); fErr != nil { + log.Printf("[%s] Error failing job %d: %v", workerID, job.ID, fErr) + } else if updated { + wp.broadcastFailed(job, agentName, errorMsg) + if wp.errorLog != nil { + wp.errorLog.LogError("worker", fmt.Sprintf("job %d failed: %s", job.ID, errorMsg), job.ID) + } } return } @@ -452,17 +465,61 @@ func (wp *WorkerPool) failOrRetry(workerID string, job *storage.ReviewJob, agent retryCount, _ := wp.db.GetJobRetryCount(job.ID) log.Printf("[%s] Job %d queued for retry (%d/%d)", workerID, job.ID, retryCount, maxRetries) } else { - log.Printf("[%s] Job %d failed after %d retries", workerID, job.ID, maxRetries) - if err := wp.db.FailJob(job.ID, errorMsg); err != nil { - log.Printf("[%s] Failed to mark job %d as failed: %v", workerID, job.ID, err) + // Retries exhausted -- attempt failover to backup agent if this is an agent error + if agentError { + backupAgent := wp.resolveBackupAgent(job) + if backupAgent != "" { + failedOver, foErr := wp.db.FailoverJob(job.ID, workerID, backupAgent) + if foErr != nil { + log.Printf("[%s] Error attempting failover for job %d: %v", workerID, job.ID, foErr) + } + if failedOver { + log.Printf("[%s] Job %d failing over from %s to %s after %d retries: %s", + workerID, job.ID, agentName, backupAgent, maxRetries, errorMsg) + return + } + } } - wp.broadcastFailed(job, agentName, errorMsg) - if wp.errorLog != nil { - wp.errorLog.LogError("worker", fmt.Sprintf("job %d failed after %d retries: %s", job.ID, maxRetries, errorMsg), job.ID) + + // No backup or failover failed -- mark as failed + if updated, fErr := wp.db.FailJob(job.ID, workerID, errorMsg); fErr != nil { + log.Printf("[%s] Error failing job %d: %v", workerID, job.ID, fErr) + } else if updated { + log.Printf("[%s] Job %d failed after %d retries", workerID, job.ID, maxRetries) + wp.broadcastFailed(job, agentName, errorMsg) + if wp.errorLog != nil { + wp.errorLog.LogError("worker", fmt.Sprintf("job %d failed after %d retries: %s", job.ID, maxRetries, errorMsg), job.ID) + } } } } +// resolveBackupAgent determines the backup agent for a job from config. +// Returns the canonicalized backup agent name, or "" if none is +// available or it's the same as the job's current agent. +func (wp *WorkerPool) resolveBackupAgent(job *storage.ReviewJob) string { + cfg := wp.cfgGetter.Config() + workflow := "review" + if !config.IsDefaultReviewType(job.ReviewType) { + workflow = job.ReviewType + } + backup := config.ResolveBackupAgentForWorkflow( + job.RepoPath, cfg, workflow, + ) + if backup == "" { + return "" + } + // Canonicalize: resolve alias, verify installed, skip if same as primary + resolved, err := agent.Get(backup) + if err != nil || !agent.IsAvailable(resolved.Name()) { + return "" + } + if resolved.Name() == job.Agent { + return "" + } + return resolved.Name() +} + // broadcastFailed sends a review.failed event for a job func (wp *WorkerPool) broadcastFailed(job *storage.ReviewJob, agentName, errorMsg string) { wp.broadcaster.Broadcast(Event{ diff --git a/internal/daemon/worker_test.go b/internal/daemon/worker_test.go index 007114be..1e386cbc 100644 --- a/internal/daemon/worker_test.go +++ b/internal/daemon/worker_test.go @@ -333,3 +333,102 @@ func TestWorkerPoolCancelJobFinalCheckDeadlockSafe(t *testing.T) { t.Error("Job should have been canceled via final check path") } } + +func TestResolveBackupAgent(t *testing.T) { + tests := []struct { + name string + jobAgent string + reviewType string + cfgField string // Config field to set + cfgValue string // Value to set + want string + }{ + { + name: "no backup configured", + jobAgent: "test", + want: "", + }, + { + name: "unknown backup agent", + jobAgent: "test", + cfgField: "DefaultBackupAgent", + cfgValue: "nonexistent-agent-xyz", + want: "", + }, + { + name: "backup same as primary", + jobAgent: "test", + cfgField: "DefaultBackupAgent", + cfgValue: "test", + want: "", + }, + { + name: "default review type uses review workflow", + jobAgent: "codex", + cfgField: "ReviewBackupAgent", + cfgValue: "test", + want: "test", + }, + { + name: "security review type uses security workflow", + jobAgent: "codex", + reviewType: "security", + cfgField: "SecurityBackupAgent", + cfgValue: "test", + want: "test", + }, + { + name: "design review type uses design workflow", + jobAgent: "codex", + reviewType: "design", + cfgField: "DesignBackupAgent", + cfgValue: "test", + want: "test", + }, + { + name: "workflow mismatch returns empty", + jobAgent: "codex", + reviewType: "security", + cfgField: "ReviewBackupAgent", + cfgValue: "test", + want: "", + }, + { + name: "default_backup_agent fallback", + jobAgent: "codex", + cfgField: "DefaultBackupAgent", + cfgValue: "test", + want: "test", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := config.DefaultConfig() + + // Set the config field using a switch (avoids reflect) + switch tt.cfgField { + case "DefaultBackupAgent": + cfg.DefaultBackupAgent = tt.cfgValue + case "ReviewBackupAgent": + cfg.ReviewBackupAgent = tt.cfgValue + case "SecurityBackupAgent": + cfg.SecurityBackupAgent = tt.cfgValue + case "DesignBackupAgent": + cfg.DesignBackupAgent = tt.cfgValue + } + + pool := NewWorkerPool(nil, NewStaticConfig(cfg), 1, NewBroadcaster(), nil) + job := &storage.ReviewJob{ + Agent: tt.jobAgent, + RepoPath: t.TempDir(), + ReviewType: tt.reviewType, + } + + got := pool.resolveBackupAgent(job) + if got != tt.want { + t.Errorf("resolveBackupAgent() = %q, want %q", got, tt.want) + } + }) + } +} diff --git a/internal/storage/db_test.go b/internal/storage/db_test.go index bc09c02c..3cd9266c 100644 --- a/internal/storage/db_test.go +++ b/internal/storage/db_test.go @@ -311,7 +311,7 @@ func TestJobFailure(t *testing.T) { claimJob(t, db, "worker-1") // Fail the job - err := db.FailJob(job.ID, "test error message") + _, err := db.FailJob(job.ID, "", "test error message") if err != nil { t.Fatalf("FailJob failed: %v", err) } @@ -328,6 +328,95 @@ func TestJobFailure(t *testing.T) { } } +func TestFailJobOwnerScoped(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + _, _, job := createJobChain(t, db, "/tmp/test-repo", "fail-owner") + claimJob(t, db, "worker-1") + + // Wrong worker should not be able to fail the job + updated, err := db.FailJob(job.ID, "worker-2", "stale fail") + if err != nil { + t.Fatalf("FailJob with wrong worker failed: %v", err) + } + if updated { + t.Error("FailJob should return false for wrong worker") + } + + // Job should still be running + j, err := db.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID failed: %v", err) + } + if j.Status != JobStatusRunning { + t.Errorf("Expected status 'running', got '%s'", j.Status) + } + + // Correct worker should succeed + updated, err = db.FailJob(job.ID, "worker-1", "legit fail") + if err != nil { + t.Fatalf("FailJob with correct worker failed: %v", err) + } + if !updated { + t.Error("FailJob should return true for correct worker") + } + + j, err = db.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID failed: %v", err) + } + if j.Status != JobStatusFailed { + t.Errorf("Expected status 'failed', got '%s'", j.Status) + } + if j.Error != "legit fail" { + t.Errorf("Expected error 'legit fail', got '%s'", j.Error) + } +} + +func TestRetryJobOwnerScoped(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + _, _, job := createJobChain(t, db, "/tmp/test-repo", "retry-owner") + claimJob(t, db, "worker-1") + + // Wrong worker should not be able to retry the job + retried, err := db.RetryJob(job.ID, "worker-2", 3) + if err != nil { + t.Fatalf("RetryJob with wrong worker failed: %v", err) + } + if retried { + t.Error("RetryJob should return false for wrong worker") + } + + // Job should still be running (not requeued) + j, err := db.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID failed: %v", err) + } + if j.Status != JobStatusRunning { + t.Errorf("Expected status 'running', got '%s'", j.Status) + } + + // Correct worker should succeed + retried, err = db.RetryJob(job.ID, "worker-1", 3) + if err != nil { + t.Fatalf("RetryJob with correct worker failed: %v", err) + } + if !retried { + t.Error("RetryJob should return true for correct worker") + } + + j, err = db.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID failed: %v", err) + } + if j.Status != JobStatusQueued { + t.Errorf("Expected status 'queued', got '%s'", j.Status) + } +} + func TestReviewOperations(t *testing.T) { db := openTestDB(t) defer db.Close() @@ -395,7 +484,7 @@ func TestReviewVerdictComputation(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "verdict-error", "Author", "Subject", time.Now()) job, _ := db.EnqueueJob(EnqueueOpts{RepoID: repo.ID, CommitID: commit.ID, GitRef: "verdict-error", Agent: "codex"}) db.ClaimJob("worker-1") - db.FailJob(job.ID, "API rate limit exceeded") + db.FailJob(job.ID, "", "API rate limit exceeded") // Manually insert a review to simulate edge case _, err := db.Exec(`INSERT INTO reviews (job_id, agent, prompt, output) VALUES (?, 'codex', 'prompt', 'No issues found.')`, job.ID) @@ -610,7 +699,7 @@ func TestJobCounts(t *testing.T) { _, _ = db.EnqueueJob(EnqueueOpts{RepoID: repo.ID, CommitID: commit2.ID, GitRef: "fail1", Agent: "codex"}) claimed2, _ := db.ClaimJob("w2") if claimed2 != nil { - db.FailJob(claimed2.ID, "err") + db.FailJob(claimed2.ID, "", "err") } queued, _, done, failed, _, err := db.GetJobCounts() @@ -710,7 +799,7 @@ func TestRetryJob(t *testing.T) { claimJob(t, db, "worker-1") // Retry should succeed (retry_count: 0 -> 1) - retried, err := db.RetryJob(job.ID, 3) + retried, err := db.RetryJob(job.ID, "", 3) if err != nil { t.Fatalf("RetryJob failed: %v", err) } @@ -730,9 +819,9 @@ func TestRetryJob(t *testing.T) { // Claim again and retry twice more (retry_count: 1->2, 2->3) _, _ = db.ClaimJob("worker-1") - db.RetryJob(job.ID, 3) // retry_count becomes 2 + db.RetryJob(job.ID, "", 3) // retry_count becomes 2 _, _ = db.ClaimJob("worker-1") - db.RetryJob(job.ID, 3) // retry_count becomes 3 + db.RetryJob(job.ID, "", 3) // retry_count becomes 3 count, _ = db.GetJobRetryCount(job.ID) if count != 3 { @@ -741,7 +830,7 @@ func TestRetryJob(t *testing.T) { // Claim again - next retry should fail (at max) _, _ = db.ClaimJob("worker-1") - retried, err = db.RetryJob(job.ID, 3) + retried, err = db.RetryJob(job.ID, "", 3) if err != nil { t.Fatalf("RetryJob at max failed: %v", err) } @@ -763,7 +852,7 @@ func TestRetryJobOnlyWorksForRunning(t *testing.T) { _, _, job := createJobChain(t, db, "/tmp/test-repo", "retry-status") // Try to retry a queued job (should fail - not running) - retried, err := db.RetryJob(job.ID, 3) + retried, err := db.RetryJob(job.ID, "", 3) if err != nil { t.Fatalf("RetryJob on queued job failed: %v", err) } @@ -775,7 +864,7 @@ func TestRetryJobOnlyWorksForRunning(t *testing.T) { _, _ = db.ClaimJob("worker-1") db.CompleteJob(job.ID, "codex", "p", "o") - retried, err = db.RetryJob(job.ID, 3) + retried, err = db.RetryJob(job.ID, "", 3) if err != nil { t.Fatalf("RetryJob on done job failed: %v", err) } @@ -793,8 +882,8 @@ func TestRetryJobAtomic(t *testing.T) { // Simulate two concurrent retries - only first should succeed // (In practice this tests the atomic update) - retried1, _ := db.RetryJob(job.ID, 3) - retried2, _ := db.RetryJob(job.ID, 3) // Job is now queued, not running + retried1, _ := db.RetryJob(job.ID, "", 3) + retried2, _ := db.RetryJob(job.ID, "", 3) // Job is now queued, not running if !retried1 { t.Error("First retry should succeed") @@ -810,6 +899,235 @@ func TestRetryJobAtomic(t *testing.T) { } } +func TestFailoverJob(t *testing.T) { + t.Run("succeeds with backup agent", func(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + repo := createRepo(t, db, "/tmp/failover-repo") + commit := createCommit(t, db, repo.ID, "fo-abc123") + + job, err := db.EnqueueJob(EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-abc123", + Agent: "primary", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + + // Claim to make it running + claimJob(t, db, "worker-1") + + // Failover should succeed + ok, err := db.FailoverJob(job.ID, "worker-1", "backup") + if err != nil { + t.Fatalf("FailoverJob: %v", err) + } + if !ok { + t.Fatal("Expected failover to succeed") + } + + // Verify: agent swapped, retry_count reset, status queued + updated, err := db.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + if updated.Agent != "backup" { + t.Errorf("Agent = %q, want %q", updated.Agent, "backup") + } + if updated.Status != JobStatusQueued { + t.Errorf("Status = %q, want %q", updated.Status, JobStatusQueued) + } + count, _ := db.GetJobRetryCount(job.ID) + if count != 0 { + t.Errorf("RetryCount = %d, want 0", count) + } + }) + + t.Run("clears model on failover", func(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + repo := createRepo(t, db, "/tmp/failover-model") + commit := createCommit(t, db, repo.ID, "fo-model") + + job, err := db.EnqueueJob(EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-model", + Agent: "primary", + Model: "o3-mini", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + if job.Model != "o3-mini" { + t.Fatalf("Model = %q, want %q", job.Model, "o3-mini") + } + + claimJob(t, db, "worker-1") + + ok, err := db.FailoverJob(job.ID, "worker-1", "backup") + if err != nil { + t.Fatalf("FailoverJob: %v", err) + } + if !ok { + t.Fatal("Expected failover to succeed") + } + + updated, err := db.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + if updated.Model != "" { + t.Errorf("Model = %q, want empty (cleared on failover)", updated.Model) + } + }) + + t.Run("fails with empty backup agent", func(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + _, _, job := createJobChain(t, db, "/tmp/failover-nobackup", "fo-no-backup") + claimJob(t, db, "worker-1") + + ok, err := db.FailoverJob(job.ID, "worker-1", "") + if err != nil { + t.Fatalf("FailoverJob: %v", err) + } + if ok { + t.Error("Expected failover to return false with empty backup agent") + } + }) + + t.Run("fails when backup equals agent", func(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + repo := createRepo(t, db, "/tmp/failover-same") + commit := createCommit(t, db, repo.ID, "fo-same123") + + job, err := db.EnqueueJob(EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-same123", + Agent: "codex", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + claimJob(t, db, "worker-1") + + ok, err := db.FailoverJob(job.ID, "worker-1", "codex") + if err != nil { + t.Fatalf("FailoverJob: %v", err) + } + if ok { + t.Error("Expected failover to return false when backup == agent") + } + }) + + t.Run("fails when not running", func(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + repo := createRepo(t, db, "/tmp/failover-queued") + commit := createCommit(t, db, repo.ID, "fo-queued") + + // Job is queued (not claimed/running) + job, err := db.EnqueueJob(EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-queued", + Agent: "primary", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + + ok, err := db.FailoverJob(job.ID, "worker-1", "backup") + if err != nil { + t.Fatalf("FailoverJob: %v", err) + } + if ok { + t.Error("Expected failover to return false for queued job") + } + }) + + t.Run("second failover with same backup is no-op", func(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + repo := createRepo(t, db, "/tmp/failover-double") + commit := createCommit(t, db, repo.ID, "fo-double") + + job, err := db.EnqueueJob(EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-double", + Agent: "primary", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + claimJob(t, db, "worker-1") + + // First failover: primary -> backup + db.FailoverJob(job.ID, "worker-1", "backup") + + // Reclaim, now agent is "backup" + claimJob(t, db, "worker-1") + + // Second failover with same backup agent should fail (agent == backup) + ok, err := db.FailoverJob(job.ID, "worker-1", "backup") + if err != nil { + t.Fatalf("FailoverJob second attempt: %v", err) + } + if ok { + t.Error("Expected second failover to return false (agent already is backup)") + } + }) + + t.Run("fails when wrong worker", func(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + repo := createRepo(t, db, "/tmp/failover-wrongworker") + commit := createCommit(t, db, repo.ID, "fo-wrongw") + + job, err := db.EnqueueJob(EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-wrongw", + Agent: "primary", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + claimJob(t, db, "worker-1") + + // A different worker should not be able to failover this job + ok, err := db.FailoverJob(job.ID, "worker-2", "backup") + if err != nil { + t.Fatalf("FailoverJob: %v", err) + } + if ok { + t.Error("Expected failover to return false when called by wrong worker") + } + + // Verify original agent is unchanged + updated, err := db.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + if updated.Agent != "primary" { + t.Errorf("Agent = %q, want %q (should not have changed)", updated.Agent, "primary") + } + }) +} + func TestCancelJob(t *testing.T) { db := openTestDB(t) defer db.Close() @@ -863,7 +1181,7 @@ func TestCancelJob(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "cancel-failed", "A", "S", time.Now()) job, _ := db.EnqueueJob(EnqueueOpts{RepoID: repo.ID, CommitID: commit.ID, GitRef: "cancel-failed", Agent: "codex"}) db.ClaimJob("worker-1") - db.FailJob(job.ID, "some error") + db.FailJob(job.ID, "", "some error") err := db.CancelJob(job.ID) if err == nil { @@ -901,7 +1219,7 @@ func TestCancelJob(t *testing.T) { db.CancelJob(job.ID) // FailJob should not overwrite canceled status - db.FailJob(job.ID, "some error") + db.FailJob(job.ID, "", "some error") updated, _ := db.GetJobByID(job.ID) if updated.Status != JobStatusCanceled { @@ -1505,7 +1823,7 @@ func TestListReposWithReviewCounts(t *testing.T) { // Claim and fail another job claimed2, _ := db.ClaimJob("worker-1") if claimed2 != nil { - db.FailJob(claimed2.ID, "test error") + db.FailJob(claimed2.ID, "", "test error") } // Counts should still be the same (counts all jobs, not just completed) @@ -1872,7 +2190,7 @@ func TestReenqueueJob(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "rerun-failed", "A", "S", time.Now()) job, _ := db.EnqueueJob(EnqueueOpts{RepoID: repo.ID, CommitID: commit.ID, GitRef: "rerun-failed", Agent: "codex"}) db.ClaimJob("worker-1") - db.FailJob(job.ID, "some error") + db.FailJob(job.ID, "", "some error") err := db.ReenqueueJob(job.ID) if err != nil { diff --git a/internal/storage/jobs.go b/internal/storage/jobs.go index ec15e275..49a3babe 100644 --- a/internal/storage/jobs.go +++ b/internal/storage/jobs.go @@ -1004,12 +1004,30 @@ func (db *DB) CompleteJob(jobID int64, agent, prompt, output string) error { } // FailJob marks a job as failed with an error message. -// Only updates if job is still in 'running' state (respects cancellation). -func (db *DB) FailJob(jobID int64, errorMsg string) error { +// Only updates if job is still in 'running' state and owned by the given worker +// (respects cancellation and prevents stale workers from failing reclaimed jobs). +// Pass empty workerID to skip the ownership check (for admin/test callers). +// Returns true if the job was actually updated (false when ownership or status +// check prevented the update). +func (db *DB) FailJob(jobID int64, workerID string, errorMsg string) (bool, error) { now := time.Now().Format(time.RFC3339) - _, err := db.Exec(`UPDATE review_jobs SET status = 'failed', finished_at = ?, error = ?, updated_at = ? WHERE id = ? AND status = 'running'`, - now, errorMsg, now, jobID) - return err + var result sql.Result + var err error + if workerID != "" { + result, err = db.Exec(`UPDATE review_jobs SET status = 'failed', finished_at = ?, error = ?, updated_at = ? WHERE id = ? AND status = 'running' AND worker_id = ?`, + now, errorMsg, now, jobID, workerID) + } else { + result, err = db.Exec(`UPDATE review_jobs SET status = 'failed', finished_at = ?, error = ?, updated_at = ? WHERE id = ? AND status = 'running'`, + now, errorMsg, now, jobID) + } + if err != nil { + return false, err + } + rows, err := result.RowsAffected() + if err != nil { + return false, err + } + return rows > 0, nil } // CancelJob marks a running or queued job as canceled @@ -1087,17 +1105,26 @@ func (db *DB) ReenqueueJob(jobID int64) error { return nil } -// RetryJob atomically resets a running job to queued for retry. -// Returns false if max retries reached or job is not in running state. -// maxRetries is the number of retries allowed (e.g., 3 means up to 4 total attempts). -func (db *DB) RetryJob(jobID int64, maxRetries int) (bool, error) { - // Atomically update only if retry_count < maxRetries and status is running - // This prevents race conditions with multiple workers - result, err := db.Exec(` - UPDATE review_jobs - SET status = 'queued', worker_id = NULL, started_at = NULL, finished_at = NULL, error = NULL, retry_count = retry_count + 1 - WHERE id = ? AND retry_count < ? AND status = 'running' - `, jobID, maxRetries) +// RetryJob requeues a running job for retry if retry_count < maxRetries. +// When workerID is non-empty the update is scoped to the owning worker, +// preventing a stale/zombie worker from requeuing a reclaimed job. +// Pass empty workerID to skip the ownership check (for admin/test callers). +func (db *DB) RetryJob(jobID int64, workerID string, maxRetries int) (bool, error) { + var result sql.Result + var err error + if workerID != "" { + result, err = db.Exec(` + UPDATE review_jobs + SET status = 'queued', worker_id = NULL, started_at = NULL, finished_at = NULL, error = NULL, retry_count = retry_count + 1 + WHERE id = ? AND retry_count < ? AND status = 'running' AND worker_id = ? + `, jobID, maxRetries, workerID) + } else { + result, err = db.Exec(` + UPDATE review_jobs + SET status = 'queued', worker_id = NULL, started_at = NULL, finished_at = NULL, error = NULL, retry_count = retry_count + 1 + WHERE id = ? AND retry_count < ? AND status = 'running' + `, jobID, maxRetries) + } if err != nil { return false, err } @@ -1110,6 +1137,36 @@ func (db *DB) RetryJob(jobID int64, maxRetries int) (bool, error) { return rows > 0, nil } +// FailoverJob atomically switches a running job to the given backup agent +// and requeues it. Returns false if the job is not in running state, the +// worker doesn't own the job, or the backup agent is the same as the +// current agent. +func (db *DB) FailoverJob(jobID int64, workerID string, backupAgent string) (bool, error) { + if backupAgent == "" { + return false, nil + } + result, err := db.Exec(` + UPDATE review_jobs + SET agent = ?, + model = NULL, + retry_count = 0, + status = 'queued', + worker_id = NULL, + started_at = NULL, + finished_at = NULL, + error = NULL + WHERE id = ? + AND status = 'running' + AND worker_id = ? + AND agent != ? + `, backupAgent, jobID, workerID, backupAgent) + if err != nil { + return false, err + } + rows, err := result.RowsAffected() + return rows > 0, err +} + // GetJobRetryCount returns the retry count for a job func (db *DB) GetJobRetryCount(jobID int64) (int, error) { var count int diff --git a/internal/storage/models.go b/internal/storage/models.go index 18829f4f..bcede9da 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -65,7 +65,6 @@ type ReviewJob struct { ReviewType string `json:"review_type,omitempty"` // Review type (e.g., "security") - changes system prompt PatchID string `json:"patch_id,omitempty"` // Stable patch-id for rebase tracking OutputPrefix string `json:"output_prefix,omitempty"` // Prefix to prepend to review output - // Sync fields UUID string `json:"uuid,omitempty"` // Globally unique identifier for sync SourceMachineID string `json:"source_machine_id,omitempty"` // Machine that created this job diff --git a/internal/storage/repos_test.go b/internal/storage/repos_test.go index 02eccf40..91f3bdfb 100644 --- a/internal/storage/repos_test.go +++ b/internal/storage/repos_test.go @@ -553,7 +553,7 @@ func TestGetRepoStats(t *testing.T) { // Fail job3 claimJob(t, db, "worker-1") - if err := db.FailJob(job3.ID, "agent error"); err != nil { + if _, err := db.FailJob(job3.ID, "", "agent error"); err != nil { t.Fatalf("FailJob failed: %v", err) } @@ -1073,7 +1073,7 @@ func TestRetriedReviewJobNotRoutedAsPromptJob(t *testing.T) { } // 4. Retry the job (reset to queued) - retried, err := db.RetryJob(claimed.ID, 3) + retried, err := db.RetryJob(claimed.ID, "", 3) if err != nil { t.Fatalf("RetryJob failed: %v", err) } @@ -1124,7 +1124,7 @@ func TestRetriedReviewJobNotRoutedAsPromptJob(t *testing.T) { } // 3. Retry - retried, err := db.RetryJob(claimed.ID, 3) + retried, err := db.RetryJob(claimed.ID, "", 3) if err != nil { t.Fatalf("RetryJob failed: %v", err) } @@ -1170,7 +1170,7 @@ func TestRetriedReviewJobNotRoutedAsPromptJob(t *testing.T) { t.Error("Compact job: IsPromptJob() should be true") } - retried, err := db.RetryJob(claimed.ID, 3) + retried, err := db.RetryJob(claimed.ID, "", 3) if err != nil { t.Fatalf("RetryJob failed: %v", err) } @@ -1210,7 +1210,7 @@ func TestRetriedReviewJobNotRoutedAsPromptJob(t *testing.T) { t.Fatalf("SaveJobPrompt failed: %v", err) } - retried, err := db.RetryJob(claimed.ID, 3) + retried, err := db.RetryJob(claimed.ID, "", 3) if err != nil { t.Fatalf("RetryJob failed: %v", err) } @@ -1246,7 +1246,7 @@ func TestRetriedReviewJobNotRoutedAsPromptJob(t *testing.T) { t.Fatalf("SaveJobPrompt failed: %v", err) } - retried, err := db.RetryJob(claimed.ID, 3) + retried, err := db.RetryJob(claimed.ID, "", 3) if err != nil { t.Fatalf("RetryJob failed: %v", err) }