From 3169a4a537e2c92d77e9751d82285dee85154d1f Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 18 Feb 2026 11:42:12 -0600 Subject: [PATCH 1/4] Add backup agent failover for review jobs Add backup_agent configuration (per-workflow and default fallback) that allows review jobs to automatically retry with a different agent when the primary agent fails. Backup agent is resolved and canonicalized at enqueue time, and FailoverJob atomically swaps the agent and requeues. Worker pool distinguishes agent errors (eligible for failover) from prompt/infra errors. FailJob, RetryJob, and FailoverJob are all scoped by worker_id to prevent stale workers from interfering with reclaimed jobs. Co-Authored-By: Nick Strayer --- internal/config/config.go | 68 ++++++- internal/config/config_test.go | 96 +++++++++ internal/daemon/ci_poller.go | 31 ++- internal/daemon/ci_poller_test.go | 103 +++++++++- internal/daemon/server.go | 52 +++-- internal/daemon/server_test.go | 71 ++++++- internal/daemon/worker.go | 62 ++++-- internal/storage/db.go | 12 ++ internal/storage/db_test.go | 314 ++++++++++++++++++++++++++++-- internal/storage/jobs.go | 121 +++++++++--- internal/storage/models.go | 1 + internal/storage/repos_test.go | 12 +- 12 files changed, 843 insertions(+), 100 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 6d88e27f..e0ab9cc0 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -49,6 +49,7 @@ type Config struct { ReviewContextCount int `toml:"review_context_count"` DefaultAgent string `toml:"default_agent"` DefaultModel string `toml:"default_model"` // Default model for agents (format varies by agent) + DefaultBackupAgent string `toml:"default_backup_agent"` JobTimeoutMinutes int `toml:"job_timeout_minutes"` // Workflow-specific agent/model configuration @@ -92,7 +93,15 @@ type Config struct { DesignModelFast string `toml:"design_model_fast"` DesignModelStandard string `toml:"design_model_standard"` DesignModelThorough string `toml:"design_model_thorough"` - AllowUnsafeAgents *bool `toml:"allow_unsafe_agents"` // nil = not set, allows commands to choose their own default + + // Backup agents for failover + ReviewBackupAgent string `toml:"review_backup_agent"` + RefineBackupAgent string `toml:"refine_backup_agent"` + FixBackupAgent string `toml:"fix_backup_agent"` + SecurityBackupAgent string `toml:"security_backup_agent"` + DesignBackupAgent string `toml:"design_backup_agent"` + + AllowUnsafeAgents *bool `toml:"allow_unsafe_agents"` // nil = not set, allows commands to choose their own default // Agent commands CodexCmd string `toml:"codex_cmd"` @@ -348,6 +357,7 @@ type RepoCIConfig struct { type RepoConfig struct { Agent string `toml:"agent"` Model string `toml:"model"` // Model for agents (format varies by agent) + BackupAgent string `toml:"backup_agent"` ReviewContextCount int `toml:"review_context_count"` ReviewGuidelines string `toml:"review_guidelines"` JobTimeoutMinutes int `toml:"job_timeout_minutes"` @@ -402,6 +412,13 @@ type RepoConfig struct { DesignModelStandard string `toml:"design_model_standard"` DesignModelThorough string `toml:"design_model_thorough"` + // Backup agents for failover + ReviewBackupAgent string `toml:"review_backup_agent"` + RefineBackupAgent string `toml:"refine_backup_agent"` + FixBackupAgent string `toml:"fix_backup_agent"` + SecurityBackupAgent string `toml:"security_backup_agent"` + DesignBackupAgent string `toml:"design_backup_agent"` + // Hooks configuration (per-repo) Hooks []HookConfig `toml:"hooks"` @@ -729,6 +746,55 @@ func ResolveModelForWorkflow(cli, repoPath string, globalCfg *Config, workflow, return getWorkflowValue(repoCfg, globalCfg, workflow, level, false) } +// ResolveBackupAgentForWorkflow returns the backup agent for a workflow, +// or empty string if none is configured. +// Priority: +// 1. Repo {workflow}_backup_agent +// 2. Repo backup_agent (generic) +// 3. Global {workflow}_backup_agent +// 4. Global default_backup_agent +// 5. "" (no backup) +func ResolveBackupAgentForWorkflow(repoPath string, globalCfg *Config, workflow string) string { + repoCfg, _ := LoadRepoConfig(repoPath) + + // Repo layer: workflow-specific > generic + if repoCfg != nil { + if s := lookupFieldByTag(reflect.ValueOf(*repoCfg), workflow+"_backup_agent"); s != "" { + return s + } + if s := strings.TrimSpace(repoCfg.BackupAgent); s != "" { + return s + } + } + + // Global layer: workflow-specific > default + if globalCfg != nil { + if s := lookupFieldByTag(reflect.ValueOf(*globalCfg), workflow+"_backup_agent"); s != "" { + return s + } + if s := strings.TrimSpace(globalCfg.DefaultBackupAgent); s != "" { + return s + } + } + + return "" +} + +// lookupFieldByTag finds a struct field by its TOML tag and returns its trimmed value. +func lookupFieldByTag(v reflect.Value, key string) string { + t := v.Type() + for i := 0; i < t.NumField(); i++ { + tag := t.Field(i).Tag.Get("toml") + if tag == "" { + continue + } + if strings.Split(tag, ",")[0] == key { + return strings.TrimSpace(v.Field(i).String()) + } + } + return "" +} + // getWorkflowValue looks up agent or model config following Option A priority. func getWorkflowValue(repo *RepoConfig, global *Config, workflow, level string, isAgent bool) string { // Repo layer: level-specific > workflow-specific > generic diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 84987081..49bab706 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -5,6 +5,7 @@ import ( "os" "os/exec" "path/filepath" + "reflect" "strings" "testing" @@ -1227,6 +1228,101 @@ func TestResolveModelForWorkflow(t *testing.T) { } } +func TestResolveBackupAgentForWorkflow(t *testing.T) { + tests := []struct { + name string + repo map[string]string + global map[string]string + workflow string + expect string + }{ + // No backup configured + {"empty config", nil, nil, "review", ""}, + {"only primary agent configured", M{"review_agent": "claude"}, nil, "review", ""}, + + // Global backup agent + {"global backup only", nil, M{"review_backup_agent": "test"}, "review", "test"}, + {"global backup for refine", nil, M{"refine_backup_agent": "claude"}, "refine", "claude"}, + {"global backup for fix", nil, M{"fix_backup_agent": "codex"}, "fix", "codex"}, + {"global backup for security", nil, M{"security_backup_agent": "gemini"}, "security", "gemini"}, + {"global backup for design", nil, M{"design_backup_agent": "droid"}, "design", "droid"}, + + // Repo backup agent overrides global + {"repo overrides global", M{"review_backup_agent": "repo-test"}, M{"review_backup_agent": "global-test"}, "review", "repo-test"}, + {"repo backup only", M{"review_backup_agent": "test"}, nil, "review", "test"}, + + // Different workflows resolve independently + {"review backup doesn't affect refine", M{"review_backup_agent": "claude"}, nil, "refine", ""}, + {"each workflow has own backup", M{"review_backup_agent": "claude", "refine_backup_agent": "codex"}, nil, "review", "claude"}, + {"each workflow has own backup - refine", M{"review_backup_agent": "claude", "refine_backup_agent": "codex"}, nil, "refine", "codex"}, + + // Unknown workflow returns empty + {"unknown workflow", M{"review_backup_agent": "test"}, nil, "unknown", ""}, + + // No reasoning level support for backup agents + {"no level variants recognized", M{"review_backup_agent_fast": "claude"}, nil, "review", ""}, + {"backup agent doesn't use levels", M{"review_backup_agent": "claude"}, nil, "review", "claude"}, + + // Default/generic backup agent fallback + {"global default_backup_agent", nil, M{"default_backup_agent": "test"}, "review", "test"}, + {"global default_backup_agent for any workflow", nil, M{"default_backup_agent": "test"}, "fix", "test"}, + {"global workflow-specific overrides default", nil, M{"default_backup_agent": "test", "review_backup_agent": "claude"}, "review", "claude"}, + {"global default used when workflow not set", nil, M{"default_backup_agent": "test", "review_backup_agent": "claude"}, "fix", "test"}, + {"repo backup_agent generic", M{"backup_agent": "repo-fallback"}, nil, "review", "repo-fallback"}, + {"repo backup_agent generic for any workflow", M{"backup_agent": "repo-fallback"}, nil, "refine", "repo-fallback"}, + {"repo workflow-specific overrides repo generic", M{"backup_agent": "generic", "review_backup_agent": "specific"}, nil, "review", "specific"}, + {"repo generic overrides global workflow-specific", M{"backup_agent": "repo"}, M{"review_backup_agent": "global"}, "review", "repo"}, + {"repo generic overrides global default", M{"backup_agent": "repo"}, M{"default_backup_agent": "global"}, "review", "repo"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create temp dir for repo config + repoDir := t.TempDir() + + // Write repo config if provided + if tt.repo != nil { + writeRepoConfig(t, repoDir, tt.repo) + } + + // Create global config if provided + var globalCfg *Config + if tt.global != nil { + globalCfg = &Config{} + populateConfigFromMap(globalCfg, tt.global) + } + + // Test the function + result := ResolveBackupAgentForWorkflow(repoDir, globalCfg, tt.workflow) + + if result != tt.expect { + t.Errorf("ResolveBackupAgentForWorkflow(%q, global, %q) = %q, want %q", + repoDir, tt.workflow, result, tt.expect) + } + }) + } +} + +// populateConfigFromMap is a helper to set config fields from a map +func populateConfigFromMap(cfg *Config, m map[string]string) { + v := reflect.ValueOf(cfg).Elem() + t := v.Type() + + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + tag := field.Tag.Get("toml") + if tag == "" { + continue + } + tagName := strings.Split(tag, ",")[0] + if val, ok := m[tagName]; ok { + if field.Type.Kind() == reflect.String { + v.Field(i).SetString(val) + } + } + } +} + // M is a shorthand type for map[string]string to keep test tables compact type M = map[string]string diff --git a/internal/daemon/ci_poller.go b/internal/daemon/ci_poller.go index 2f5d4468..4d8ad2d6 100644 --- a/internal/daemon/ci_poller.go +++ b/internal/daemon/ci_poller.go @@ -427,16 +427,33 @@ func (p *CIPoller) processPR(ctx context.Context, ghRepo string, pr ghPR, cfg *c resolvedAgent = resolved.Name() } - // Resolve model through workflow config when not explicitly set + // Resolve model and backup agent through workflow config resolvedModel := config.ResolveModelForWorkflow(cfg.CI.Model, repo.RootPath, cfg, workflow, reasoning) + backupAgent := config.ResolveBackupAgentForWorkflow(repo.RootPath, cfg, workflow) + + // Canonicalize backup agent: resolve alias to canonical name, + // then verify the agent is actually installed. Clear if unknown, + // unavailable, or same as primary. Uses strict resolution + // (Get+IsAvailable) to avoid silently falling back to a + // different agent. + if backupAgent != "" { + if resolved, err := agent.Get(backupAgent); err != nil || !agent.IsAvailable(resolved.Name()) { + backupAgent = "" + } else if resolved.Name() == resolvedAgent { + backupAgent = "" + } else { + backupAgent = resolved.Name() + } + } job, err := p.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - GitRef: gitRef, - Agent: resolvedAgent, - Model: resolvedModel, - Reasoning: reasoning, - ReviewType: rt, + RepoID: repo.ID, + GitRef: gitRef, + Agent: resolvedAgent, + BackupAgent: backupAgent, + Model: resolvedModel, + Reasoning: reasoning, + ReviewType: rt, }) if err != nil { rollback() diff --git a/internal/daemon/ci_poller_test.go b/internal/daemon/ci_poller_test.go index 25d68cde..43d9ff67 100644 --- a/internal/daemon/ci_poller_test.go +++ b/internal/daemon/ci_poller_test.go @@ -1255,6 +1255,76 @@ func TestResolveMinSeverity(t *testing.T) { }) } +func TestCIPollerProcessPR_BackupAgentCanonicalization(t *testing.T) { + t.Run("unknown backup agent is cleared", func(t *testing.T) { + h := newCIPollerHarness(t, "git@github.com:acme/api.git") + h.Cfg.CI.ReviewTypes = []string{"review"} + h.Cfg.CI.Agents = []string{"test"} + h.Poller = NewCIPoller(h.DB, NewStaticConfig(h.Cfg), nil) + h.stubProcessPRGit() + + // Write repo config with an unknown backup agent + repoConfig := h.RepoPath + "/.roborev.toml" + if err := os.WriteFile(repoConfig, []byte(`backup_agent = "nonexistent-agent-xyz"`), 0644); err != nil { + t.Fatalf("write repo config: %v", err) + } + + err := h.Poller.processPR(context.Background(), "acme/api", ghPR{ + Number: 1, + HeadRefOid: "head-abc", + BaseRefName: "main", + }, h.Cfg) + if err != nil { + t.Fatalf("processPR: %v", err) + } + + jobs, err := h.DB.ListJobs("", h.RepoPath, 0, 0, storage.WithGitRef("base-head-abc..head-abc")) + if err != nil { + t.Fatalf("ListJobs: %v", err) + } + if len(jobs) != 1 { + t.Fatalf("expected 1 job, got %d", len(jobs)) + } + if jobs[0].BackupAgent != "" { + t.Errorf("expected backup_agent cleared for unknown agent, got %q", jobs[0].BackupAgent) + } + }) + + t.Run("backup same as primary is cleared", func(t *testing.T) { + h := newCIPollerHarness(t, "git@github.com:acme/api.git") + h.Cfg.CI.ReviewTypes = []string{"review"} + h.Cfg.CI.Agents = []string{"test"} + h.Poller = NewCIPoller(h.DB, NewStaticConfig(h.Cfg), nil) + h.stubProcessPRGit() + + // Write repo config with backup_agent = "test" (same as primary) + repoConfig := h.RepoPath + "/.roborev.toml" + if err := os.WriteFile(repoConfig, []byte(`backup_agent = "test"`), 0644); err != nil { + t.Fatalf("write repo config: %v", err) + } + + err := h.Poller.processPR(context.Background(), "acme/api", ghPR{ + Number: 2, + HeadRefOid: "head-def", + BaseRefName: "main", + }, h.Cfg) + if err != nil { + t.Fatalf("processPR: %v", err) + } + + jobs, err := h.DB.ListJobs("", h.RepoPath, 0, 0, storage.WithGitRef("base-head-def..head-def")) + if err != nil { + t.Fatalf("ListJobs: %v", err) + } + if len(jobs) != 1 { + t.Fatalf("expected 1 job, got %d", len(jobs)) + } + if jobs[0].BackupAgent != "" { + t.Errorf("expected backup_agent cleared when same as primary, got %q", jobs[0].BackupAgent) + } + }) +} + // initGitRepoWithOrigin creates a git repo with an initial commit and // origin pointing to itself, so origin/main and GetDefaultBranch work. func initGitRepoWithOrigin(t *testing.T) (dir string, runGit func(args ...string) string) { @@ -1278,7 +1348,9 @@ func initGitRepoWithOrigin(t *testing.T) (dir string, runGit func(args ...string runGit("init", "-b", "main") runGit("config", "user.email", "test@test.com") runGit("config", "user.name", "Test") - os.WriteFile(filepath.Join(dir, "README.md"), []byte("init"), 0644) + if err := os.WriteFile(filepath.Join(dir, "README.md"), []byte("init"), 0644); err != nil { + t.Fatalf("write README.md: %v", err) + } runGit("add", "-A") runGit("commit", "-m", "initial") runGit("remote", "add", "origin", dir) @@ -1291,8 +1363,10 @@ func TestLoadCIRepoConfig_LoadsFromDefaultBranch(t *testing.T) { dir, runGit := initGitRepoWithOrigin(t) // Commit .roborev.toml on main with CI agents override - os.WriteFile(filepath.Join(dir, ".roborev.toml"), - []byte("[ci]\nagents = [\"claude\"]\n"), 0644) + if err := os.WriteFile(filepath.Join(dir, ".roborev.toml"), + []byte("[ci]\nagents = [\"claude\"]\n"), 0644); err != nil { + t.Fatalf("write .roborev.toml: %v", err) + } runGit("add", ".roborev.toml") runGit("commit", "-m", "add config") runGit("fetch", "origin") @@ -1313,8 +1387,10 @@ func TestLoadCIRepoConfig_FallsBackWhenNoConfigOnDefaultBranch(t *testing.T) { dir, _ := initGitRepoWithOrigin(t) // No .roborev.toml on origin/main, but put one in the working tree - os.WriteFile(filepath.Join(dir, ".roborev.toml"), - []byte("[ci]\nagents = [\"codex\"]\n"), 0644) + if err := os.WriteFile(filepath.Join(dir, ".roborev.toml"), + []byte("[ci]\nagents = [\"codex\"]\n"), 0644); err != nil { + t.Fatalf("write .roborev.toml: %v", err) + } cfg, err := loadCIRepoConfig(dir) if err != nil { @@ -1332,15 +1408,19 @@ func TestLoadCIRepoConfig_PropagatesParseError(t *testing.T) { dir, runGit := initGitRepoWithOrigin(t) // Commit invalid TOML on main - os.WriteFile(filepath.Join(dir, ".roborev.toml"), - []byte("this is not valid toml [[["), 0644) + if err := os.WriteFile(filepath.Join(dir, ".roborev.toml"), + []byte("this is not valid toml [[["), 0644); err != nil { + t.Fatalf("write .roborev.toml: %v", err) + } runGit("add", ".roborev.toml") runGit("commit", "-m", "add bad config") runGit("fetch", "origin") - // Also put valid config in working tree — should NOT be used - os.WriteFile(filepath.Join(dir, ".roborev.toml"), - []byte("[ci]\nagents = [\"codex\"]\n"), 0644) + // Also put valid config in working tree -- should NOT be used + if err := os.WriteFile(filepath.Join(dir, ".roborev.toml"), + []byte("[ci]\nagents = [\"codex\"]\n"), 0644); err != nil { + t.Fatalf("write .roborev.toml: %v", err) + } cfg, err := loadCIRepoConfig(dir) if err == nil { @@ -1388,6 +1468,9 @@ func TestCIPollerProcessPR_SetsPendingCommitStatus(t *testing.T) { if sc.state != "pending" { t.Errorf("state=%q, want pending", sc.state) } + if sc.desc != "Review in progress" { + t.Errorf("desc=%q, want %q", sc.desc, "Review in progress") + } } func TestCIPollerPostBatchResults_SetsSuccessStatus(t *testing.T) { diff --git a/internal/daemon/server.go b/internal/daemon/server.go index 37ddbdd1..cb675891 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -576,6 +576,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { // Resolve agent for workflow at this reasoning level agentName := config.ResolveAgentForWorkflow(req.Agent, repoRoot, s.configWatcher.Config(), workflow, reasoning) + backupAgent := config.ResolveBackupAgentForWorkflow(repoRoot, s.configWatcher.Config(), workflow) // Resolve to an installed agent: if the configured agent isn't available, // fall back through the chain (codex -> claude-code -> gemini -> ...). @@ -587,6 +588,21 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { agentName = resolved.Name() } + // Canonicalize backup agent: resolve alias to canonical name, then verify + // the agent is actually installed. Clear if unknown, unavailable, or same + // as primary (failover to the same agent is a no-op). Uses strict + // resolution (Get+IsAvailable) instead of GetAvailable to avoid silently + // falling back to a different agent. + if backupAgent != "" { + if resolved, err := agent.Get(backupAgent); err != nil || !agent.IsAvailable(resolved.Name()) { + backupAgent = "" // unknown or not installed, skip failover + } else if resolved.Name() == agentName { + backupAgent = "" // same as primary after resolution + } else { + backupAgent = resolved.Name() + } + } + // Resolve model for workflow at this reasoning level model := config.ResolveModelForWorkflow(req.Model, repoRoot, s.configWatcher.Config(), workflow, reasoning) @@ -617,6 +633,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { RepoID: repo.ID, Branch: req.Branch, Agent: agentName, + BackupAgent: backupAgent, Model: model, Reasoning: reasoning, ReviewType: req.ReviewType, @@ -637,6 +654,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { GitRef: gitRef, Branch: req.Branch, Agent: agentName, + BackupAgent: backupAgent, Model: model, Reasoning: reasoning, ReviewType: req.ReviewType, @@ -676,13 +694,14 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { // Store as full SHA range fullRef := startSHA + ".." + endSHA job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - GitRef: fullRef, - Branch: req.Branch, - Agent: agentName, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, + RepoID: repo.ID, + GitRef: fullRef, + Branch: req.Branch, + Agent: agentName, + BackupAgent: backupAgent, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue job: %v", err)) @@ -713,15 +732,16 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { patchID := git.GetPatchID(gitCwd, sha) job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - CommitID: commit.ID, - GitRef: sha, - Branch: req.Branch, - Agent: agentName, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, - PatchID: patchID, + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: sha, + Branch: req.Branch, + Agent: agentName, + BackupAgent: backupAgent, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, + PatchID: patchID, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue job: %v", err)) diff --git a/internal/daemon/server_test.go b/internal/daemon/server_test.go index 1a6c5d0e..9fed8b78 100644 --- a/internal/daemon/server_test.go +++ b/internal/daemon/server_test.go @@ -1591,7 +1591,7 @@ func TestHandleRerunJob(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "rerun-failed", "Author", "Subject", time.Now()) job, _ := db.EnqueueJob(storage.EnqueueOpts{RepoID: repo.ID, CommitID: commit.ID, GitRef: "rerun-failed", Agent: "test"}) db.ClaimJob("worker-1") - db.FailJob(job.ID, "some error") + db.FailJob(job.ID, "", "some error") req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/job/rerun", RerunJobRequest{JobID: job.ID}) w := httptest.NewRecorder() @@ -3487,3 +3487,72 @@ func TestHandleRemap(t *testing.T) { } }) } + +func TestHandleEnqueueBackupAgentCanonicalization(t *testing.T) { + server, db, tmpDir := newTestServer(t) + + repoDir := filepath.Join(tmpDir, "testrepo") + testutil.InitTestGitRepo(t, repoDir) + + t.Run("unknown backup agent is cleared", func(t *testing.T) { + // Write repo config with a backup agent that does not exist + repoConfig := filepath.Join(repoDir, ".roborev.toml") + if err := os.WriteFile(repoConfig, []byte(`backup_agent = "nonexistent-agent-xyz"`), 0644); err != nil { + t.Fatalf("write repo config: %v", err) + } + defer os.Remove(repoConfig) + + reqData := map[string]string{"repo_path": repoDir, "git_ref": "HEAD", "agent": "test"} + req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/enqueue", reqData) + w := httptest.NewRecorder() + server.handleEnqueue(w, req) + + if w.Code != http.StatusCreated { + t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String()) + } + + var respJob storage.ReviewJob + if err := json.NewDecoder(w.Body).Decode(&respJob); err != nil { + t.Fatalf("decode response: %v", err) + } + + job, err := db.GetJobByID(respJob.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + if job.BackupAgent != "" { + t.Errorf("expected backup_agent cleared for unknown agent, got %q", job.BackupAgent) + } + }) + + t.Run("backup same as primary is cleared", func(t *testing.T) { + // Write repo config with backup_agent = "test" (same as primary) + repoConfig := filepath.Join(repoDir, ".roborev.toml") + if err := os.WriteFile(repoConfig, []byte(`backup_agent = "test"`), 0644); err != nil { + t.Fatalf("write repo config: %v", err) + } + defer os.Remove(repoConfig) + + reqData := map[string]string{"repo_path": repoDir, "git_ref": "HEAD", "agent": "test"} + req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/enqueue", reqData) + w := httptest.NewRecorder() + server.handleEnqueue(w, req) + + if w.Code != http.StatusCreated { + t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String()) + } + + var respJob storage.ReviewJob + if err := json.NewDecoder(w.Body).Decode(&respJob); err != nil { + t.Fatalf("decode response: %v", err) + } + + job, err := db.GetJobByID(respJob.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + if job.BackupAgent != "" { + t.Errorf("expected backup_agent cleared when same as primary, got %q", job.BackupAgent) + } + }) +} diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index a3e99b93..598eb0b2 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -318,7 +318,7 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { baseAgent, err := agent.GetAvailable(job.Agent) if err != nil { log.Printf("[%s] Error getting agent: %v", workerID, err) - wp.failOrRetry(workerID, job, job.Agent, fmt.Sprintf("get agent: %v", err)) + wp.failOrRetryAgent(workerID, job, job.Agent, fmt.Sprintf("get agent: %v", err)) return } @@ -376,7 +376,7 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { return // Job already marked as canceled in DB, nothing more to do } log.Printf("[%s] Agent error: %v", workerID, err) - wp.failOrRetry(workerID, job, agentName, fmt.Sprintf("agent: %v", err)) + wp.failOrRetryAgent(workerID, job, agentName, fmt.Sprintf("agent: %v", err)) return } @@ -385,7 +385,7 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { // not produce a "done" review that misleads --wait callers. if job.JobType == "compact" && !IsValidCompactOutput(output) { log.Printf("[%s] Compact job %d produced invalid output, failing", workerID, job.ID) - wp.failOrRetry(workerID, job, agentName, "compact output invalid (empty or error)") + wp.failOrRetryAgent(workerID, job, agentName, "compact output invalid (empty or error)") return } @@ -433,17 +433,30 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { }) } -// failOrRetry attempts to retry the job, or marks it as failed if max retries reached +// failOrRetry attempts to retry the job, or marks it as failed if max retries reached. +// This is used for non-agent errors (e.g., prompt build failures) where switching agents won't help. func (wp *WorkerPool) failOrRetry(workerID string, job *storage.ReviewJob, agentName string, errorMsg string) { - retried, err := wp.db.RetryJob(job.ID, maxRetries) + wp.failOrRetryInner(workerID, job, agentName, errorMsg, false) +} + +// failOrRetryAgent is like failOrRetry but allows failover to a backup agent +// when retries are exhausted. Used for agent-execution errors where switching +// agents may resolve the issue. +func (wp *WorkerPool) failOrRetryAgent(workerID string, job *storage.ReviewJob, agentName string, errorMsg string) { + wp.failOrRetryInner(workerID, job, agentName, errorMsg, true) +} + +func (wp *WorkerPool) failOrRetryInner(workerID string, job *storage.ReviewJob, agentName string, errorMsg string, agentError bool) { + retried, err := wp.db.RetryJob(job.ID, workerID, maxRetries) if err != nil { log.Printf("[%s] Error retrying job: %v", workerID, err) - if err := wp.db.FailJob(job.ID, errorMsg); err != nil { - log.Printf("[%s] Failed to mark job %d as failed: %v", workerID, job.ID, err) - } - wp.broadcastFailed(job, agentName, errorMsg) - if wp.errorLog != nil { - wp.errorLog.LogError("worker", fmt.Sprintf("job %d failed: %s", job.ID, errorMsg), job.ID) + if updated, fErr := wp.db.FailJob(job.ID, workerID, errorMsg); fErr != nil { + log.Printf("[%s] Error failing job %d: %v", workerID, job.ID, fErr) + } else if updated { + wp.broadcastFailed(job, agentName, errorMsg) + if wp.errorLog != nil { + wp.errorLog.LogError("worker", fmt.Sprintf("job %d failed: %s", job.ID, errorMsg), job.ID) + } } return } @@ -452,13 +465,28 @@ func (wp *WorkerPool) failOrRetry(workerID string, job *storage.ReviewJob, agent retryCount, _ := wp.db.GetJobRetryCount(job.ID) log.Printf("[%s] Job %d queued for retry (%d/%d)", workerID, job.ID, retryCount, maxRetries) } else { - log.Printf("[%s] Job %d failed after %d retries", workerID, job.ID, maxRetries) - if err := wp.db.FailJob(job.ID, errorMsg); err != nil { - log.Printf("[%s] Failed to mark job %d as failed: %v", workerID, job.ID, err) + // Retries exhausted -- attempt failover to backup agent if this is an agent error + if agentError && job.BackupAgent != "" { + failedOver, foErr := wp.db.FailoverJob(job.ID, workerID) + if foErr != nil { + log.Printf("[%s] Error attempting failover for job %d: %v", workerID, job.ID, foErr) + } + if failedOver { + log.Printf("[%s] Job %d failing over from %s to %s after %d retries: %s", + workerID, job.ID, agentName, job.BackupAgent, maxRetries, errorMsg) + return + } } - wp.broadcastFailed(job, agentName, errorMsg) - if wp.errorLog != nil { - wp.errorLog.LogError("worker", fmt.Sprintf("job %d failed after %d retries: %s", job.ID, maxRetries, errorMsg), job.ID) + + // No backup or failover failed -- mark as failed + if updated, fErr := wp.db.FailJob(job.ID, workerID, errorMsg); fErr != nil { + log.Printf("[%s] Error failing job %d: %v", workerID, job.ID, fErr) + } else if updated { + log.Printf("[%s] Job %d failed after %d retries", workerID, job.ID, maxRetries) + wp.broadcastFailed(job, agentName, errorMsg) + if wp.errorLog != nil { + wp.errorLog.LogError("worker", fmt.Sprintf("job %d failed after %d retries: %s", job.ID, maxRetries, errorMsg), job.ID) + } } } } diff --git a/internal/storage/db.go b/internal/storage/db.go index a44ee8cb..e46c6196 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -596,6 +596,18 @@ func (db *DB) migrate() error { } } + // Migration: add backup_agent column to review_jobs if missing + err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('review_jobs') WHERE name = 'backup_agent'`).Scan(&count) + if err != nil { + return fmt.Errorf("check backup_agent column: %w", err) + } + if count == 0 { + _, err = db.Exec(`ALTER TABLE review_jobs ADD COLUMN backup_agent TEXT`) + if err != nil { + return fmt.Errorf("add backup_agent column: %w", err) + } + } + // Migration: add index on reviews.addressed for server-side filtering _, err = db.Exec(`CREATE INDEX IF NOT EXISTS idx_reviews_addressed ON reviews(addressed)`) if err != nil { diff --git a/internal/storage/db_test.go b/internal/storage/db_test.go index bc09c02c..dd86f242 100644 --- a/internal/storage/db_test.go +++ b/internal/storage/db_test.go @@ -311,7 +311,7 @@ func TestJobFailure(t *testing.T) { claimJob(t, db, "worker-1") // Fail the job - err := db.FailJob(job.ID, "test error message") + _, err := db.FailJob(job.ID, "", "test error message") if err != nil { t.Fatalf("FailJob failed: %v", err) } @@ -328,6 +328,95 @@ func TestJobFailure(t *testing.T) { } } +func TestFailJobOwnerScoped(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + _, _, job := createJobChain(t, db, "/tmp/test-repo", "fail-owner") + claimJob(t, db, "worker-1") + + // Wrong worker should not be able to fail the job + updated, err := db.FailJob(job.ID, "worker-2", "stale fail") + if err != nil { + t.Fatalf("FailJob with wrong worker failed: %v", err) + } + if updated { + t.Error("FailJob should return false for wrong worker") + } + + // Job should still be running + j, err := db.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID failed: %v", err) + } + if j.Status != JobStatusRunning { + t.Errorf("Expected status 'running', got '%s'", j.Status) + } + + // Correct worker should succeed + updated, err = db.FailJob(job.ID, "worker-1", "legit fail") + if err != nil { + t.Fatalf("FailJob with correct worker failed: %v", err) + } + if !updated { + t.Error("FailJob should return true for correct worker") + } + + j, err = db.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID failed: %v", err) + } + if j.Status != JobStatusFailed { + t.Errorf("Expected status 'failed', got '%s'", j.Status) + } + if j.Error != "legit fail" { + t.Errorf("Expected error 'legit fail', got '%s'", j.Error) + } +} + +func TestRetryJobOwnerScoped(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + _, _, job := createJobChain(t, db, "/tmp/test-repo", "retry-owner") + claimJob(t, db, "worker-1") + + // Wrong worker should not be able to retry the job + retried, err := db.RetryJob(job.ID, "worker-2", 3) + if err != nil { + t.Fatalf("RetryJob with wrong worker failed: %v", err) + } + if retried { + t.Error("RetryJob should return false for wrong worker") + } + + // Job should still be running (not requeued) + j, err := db.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID failed: %v", err) + } + if j.Status != JobStatusRunning { + t.Errorf("Expected status 'running', got '%s'", j.Status) + } + + // Correct worker should succeed + retried, err = db.RetryJob(job.ID, "worker-1", 3) + if err != nil { + t.Fatalf("RetryJob with correct worker failed: %v", err) + } + if !retried { + t.Error("RetryJob should return true for correct worker") + } + + j, err = db.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID failed: %v", err) + } + if j.Status != JobStatusQueued { + t.Errorf("Expected status 'queued', got '%s'", j.Status) + } +} + func TestReviewOperations(t *testing.T) { db := openTestDB(t) defer db.Close() @@ -395,7 +484,7 @@ func TestReviewVerdictComputation(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "verdict-error", "Author", "Subject", time.Now()) job, _ := db.EnqueueJob(EnqueueOpts{RepoID: repo.ID, CommitID: commit.ID, GitRef: "verdict-error", Agent: "codex"}) db.ClaimJob("worker-1") - db.FailJob(job.ID, "API rate limit exceeded") + db.FailJob(job.ID, "", "API rate limit exceeded") // Manually insert a review to simulate edge case _, err := db.Exec(`INSERT INTO reviews (job_id, agent, prompt, output) VALUES (?, 'codex', 'prompt', 'No issues found.')`, job.ID) @@ -610,7 +699,7 @@ func TestJobCounts(t *testing.T) { _, _ = db.EnqueueJob(EnqueueOpts{RepoID: repo.ID, CommitID: commit2.ID, GitRef: "fail1", Agent: "codex"}) claimed2, _ := db.ClaimJob("w2") if claimed2 != nil { - db.FailJob(claimed2.ID, "err") + db.FailJob(claimed2.ID, "", "err") } queued, _, done, failed, _, err := db.GetJobCounts() @@ -710,7 +799,7 @@ func TestRetryJob(t *testing.T) { claimJob(t, db, "worker-1") // Retry should succeed (retry_count: 0 -> 1) - retried, err := db.RetryJob(job.ID, 3) + retried, err := db.RetryJob(job.ID, "", 3) if err != nil { t.Fatalf("RetryJob failed: %v", err) } @@ -730,9 +819,9 @@ func TestRetryJob(t *testing.T) { // Claim again and retry twice more (retry_count: 1->2, 2->3) _, _ = db.ClaimJob("worker-1") - db.RetryJob(job.ID, 3) // retry_count becomes 2 + db.RetryJob(job.ID, "", 3) // retry_count becomes 2 _, _ = db.ClaimJob("worker-1") - db.RetryJob(job.ID, 3) // retry_count becomes 3 + db.RetryJob(job.ID, "", 3) // retry_count becomes 3 count, _ = db.GetJobRetryCount(job.ID) if count != 3 { @@ -741,7 +830,7 @@ func TestRetryJob(t *testing.T) { // Claim again - next retry should fail (at max) _, _ = db.ClaimJob("worker-1") - retried, err = db.RetryJob(job.ID, 3) + retried, err = db.RetryJob(job.ID, "", 3) if err != nil { t.Fatalf("RetryJob at max failed: %v", err) } @@ -763,7 +852,7 @@ func TestRetryJobOnlyWorksForRunning(t *testing.T) { _, _, job := createJobChain(t, db, "/tmp/test-repo", "retry-status") // Try to retry a queued job (should fail - not running) - retried, err := db.RetryJob(job.ID, 3) + retried, err := db.RetryJob(job.ID, "", 3) if err != nil { t.Fatalf("RetryJob on queued job failed: %v", err) } @@ -775,7 +864,7 @@ func TestRetryJobOnlyWorksForRunning(t *testing.T) { _, _ = db.ClaimJob("worker-1") db.CompleteJob(job.ID, "codex", "p", "o") - retried, err = db.RetryJob(job.ID, 3) + retried, err = db.RetryJob(job.ID, "", 3) if err != nil { t.Fatalf("RetryJob on done job failed: %v", err) } @@ -793,8 +882,8 @@ func TestRetryJobAtomic(t *testing.T) { // Simulate two concurrent retries - only first should succeed // (In practice this tests the atomic update) - retried1, _ := db.RetryJob(job.ID, 3) - retried2, _ := db.RetryJob(job.ID, 3) // Job is now queued, not running + retried1, _ := db.RetryJob(job.ID, "", 3) + retried2, _ := db.RetryJob(job.ID, "", 3) // Job is now queued, not running if !retried1 { t.Error("First retry should succeed") @@ -810,6 +899,201 @@ func TestRetryJobAtomic(t *testing.T) { } } +func TestFailoverJob(t *testing.T) { + t.Run("succeeds with backup agent", func(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + repo := createRepo(t, db, "/tmp/failover-repo") + commit := createCommit(t, db, repo.ID, "fo-abc123") + + // Enqueue with backup agent + job, err := db.EnqueueJob(EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-abc123", + Agent: "primary", + BackupAgent: "backup", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + + // Claim to make it running + claimJob(t, db, "worker-1") + + // Failover should succeed + ok, err := db.FailoverJob(job.ID, "worker-1") + if err != nil { + t.Fatalf("FailoverJob: %v", err) + } + if !ok { + t.Fatal("Expected failover to succeed") + } + + // Verify: agent swapped, backup cleared, retry_count reset, status queued + updated, err := db.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + if updated.Agent != "backup" { + t.Errorf("Agent = %q, want %q", updated.Agent, "backup") + } + if updated.BackupAgent != "" { + t.Errorf("BackupAgent = %q, want empty", updated.BackupAgent) + } + if updated.Status != JobStatusQueued { + t.Errorf("Status = %q, want %q", updated.Status, JobStatusQueued) + } + count, _ := db.GetJobRetryCount(job.ID) + if count != 0 { + t.Errorf("RetryCount = %d, want 0", count) + } + }) + + t.Run("fails without backup agent", func(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + _, _, job := createJobChain(t, db, "/tmp/failover-nobackup", "fo-no-backup") + claimJob(t, db, "worker-1") + + ok, err := db.FailoverJob(job.ID, "worker-1") + if err != nil { + t.Fatalf("FailoverJob: %v", err) + } + if ok { + t.Error("Expected failover to return false with no backup agent") + } + }) + + t.Run("fails when backup equals agent", func(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + repo := createRepo(t, db, "/tmp/failover-same") + commit := createCommit(t, db, repo.ID, "fo-same123") + + job, err := db.EnqueueJob(EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-same123", + Agent: "codex", + BackupAgent: "codex", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + claimJob(t, db, "worker-1") + + ok, err := db.FailoverJob(job.ID, "worker-1") + if err != nil { + t.Fatalf("FailoverJob: %v", err) + } + if ok { + t.Error("Expected failover to return false when backup == agent") + } + }) + + t.Run("fails when not running", func(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + repo := createRepo(t, db, "/tmp/failover-queued") + commit := createCommit(t, db, repo.ID, "fo-queued") + + // Job is queued (not claimed/running) + job, err := db.EnqueueJob(EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-queued", + Agent: "primary", + BackupAgent: "backup", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + + ok, err := db.FailoverJob(job.ID, "worker-1") + if err != nil { + t.Fatalf("FailoverJob: %v", err) + } + if ok { + t.Error("Expected failover to return false for queued job") + } + }) + + t.Run("backup agent is null after failover", func(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + repo := createRepo(t, db, "/tmp/failover-null") + commit := createCommit(t, db, repo.ID, "fo-null123") + + job, err := db.EnqueueJob(EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-null123", + Agent: "primary", + BackupAgent: "backup", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + claimJob(t, db, "worker-1") + + db.FailoverJob(job.ID, "worker-1") + + // Second failover should fail (backup_agent is now NULL) + claimJob(t, db, "worker-1") + ok, err := db.FailoverJob(job.ID, "worker-1") + if err != nil { + t.Fatalf("FailoverJob second attempt: %v", err) + } + if ok { + t.Error("Expected second failover to return false (backup_agent cleared)") + } + }) + + t.Run("fails when wrong worker", func(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + repo := createRepo(t, db, "/tmp/failover-wrongworker") + commit := createCommit(t, db, repo.ID, "fo-wrongw") + + job, err := db.EnqueueJob(EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-wrongw", + Agent: "primary", + BackupAgent: "backup", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + claimJob(t, db, "worker-1") + + // A different worker should not be able to failover this job + ok, err := db.FailoverJob(job.ID, "worker-2") + if err != nil { + t.Fatalf("FailoverJob: %v", err) + } + if ok { + t.Error("Expected failover to return false when called by wrong worker") + } + + // Verify original agent is unchanged + updated, err := db.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + if updated.Agent != "primary" { + t.Errorf("Agent = %q, want %q (should not have changed)", updated.Agent, "primary") + } + }) +} + func TestCancelJob(t *testing.T) { db := openTestDB(t) defer db.Close() @@ -863,7 +1147,7 @@ func TestCancelJob(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "cancel-failed", "A", "S", time.Now()) job, _ := db.EnqueueJob(EnqueueOpts{RepoID: repo.ID, CommitID: commit.ID, GitRef: "cancel-failed", Agent: "codex"}) db.ClaimJob("worker-1") - db.FailJob(job.ID, "some error") + db.FailJob(job.ID, "", "some error") err := db.CancelJob(job.ID) if err == nil { @@ -901,7 +1185,7 @@ func TestCancelJob(t *testing.T) { db.CancelJob(job.ID) // FailJob should not overwrite canceled status - db.FailJob(job.ID, "some error") + db.FailJob(job.ID, "", "some error") updated, _ := db.GetJobByID(job.ID) if updated.Status != JobStatusCanceled { @@ -1505,7 +1789,7 @@ func TestListReposWithReviewCounts(t *testing.T) { // Claim and fail another job claimed2, _ := db.ClaimJob("worker-1") if claimed2 != nil { - db.FailJob(claimed2.ID, "test error") + db.FailJob(claimed2.ID, "", "test error") } // Counts should still be the same (counts all jobs, not just completed) @@ -1872,7 +2156,7 @@ func TestReenqueueJob(t *testing.T) { commit, _ := db.GetOrCreateCommit(repo.ID, "rerun-failed", "A", "S", time.Now()) job, _ := db.EnqueueJob(EnqueueOpts{RepoID: repo.ID, CommitID: commit.ID, GitRef: "rerun-failed", Agent: "codex"}) db.ClaimJob("worker-1") - db.FailJob(job.ID, "some error") + db.FailJob(job.ID, "", "some error") err := db.ReenqueueJob(job.ID) if err != nil { diff --git a/internal/storage/jobs.go b/internal/storage/jobs.go index ec15e275..9ad01c20 100644 --- a/internal/storage/jobs.go +++ b/internal/storage/jobs.go @@ -717,6 +717,7 @@ type EnqueueOpts struct { GitRef string // SHA, "start..end" range, or "dirty" Branch string Agent string + BackupAgent string // Backup agent for failover (resolved at enqueue time) Model string Reasoning string ReviewType string // e.g. "security" — changes which system prompt is used @@ -780,12 +781,12 @@ func (db *DB) EnqueueJob(opts EnqueueOpts) (*ReviewJob, error) { } result, err := db.Exec(` - INSERT INTO review_jobs (repo_id, commit_id, git_ref, branch, agent, model, reasoning, + INSERT INTO review_jobs (repo_id, commit_id, git_ref, branch, agent, backup_agent, model, reasoning, status, job_type, review_type, patch_id, diff_content, prompt, agentic, output_prefix, uuid, source_machine_id, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, 'queued', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'queued', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, opts.RepoID, commitIDParam, gitRef, nullString(opts.Branch), - opts.Agent, nullString(opts.Model), reasoning, + opts.Agent, nullString(opts.BackupAgent), nullString(opts.Model), reasoning, jobType, opts.ReviewType, nullString(opts.PatchID), nullString(opts.DiffContent), nullString(opts.Prompt), agenticInt, nullString(opts.OutputPrefix), @@ -801,6 +802,7 @@ func (db *DB) EnqueueJob(opts EnqueueOpts) (*ReviewJob, error) { GitRef: gitRef, Branch: opts.Branch, Agent: opts.Agent, + BackupAgent: opts.BackupAgent, Model: opts.Model, Reasoning: reasoning, JobType: jobType, @@ -867,10 +869,11 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { var reviewType sql.NullString var outputPrefix sql.NullString var patchID sql.NullString + var backupAgent sql.NullString err = db.QueryRow(` SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.agent, j.model, j.reasoning, j.status, j.enqueued_at, r.root_path, r.name, c.subject, j.diff_content, j.prompt, COALESCE(j.agentic, 0), j.job_type, j.review_type, - j.output_prefix, j.patch_id + j.output_prefix, j.patch_id, j.backup_agent FROM review_jobs j JOIN repos r ON r.id = j.repo_id LEFT JOIN commits c ON c.id = j.commit_id @@ -879,7 +882,7 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { LIMIT 1 `, workerID).Scan(&job.ID, &job.RepoID, &commitID, &job.GitRef, &branch, &job.Agent, &model, &job.Reasoning, &job.Status, &enqueuedAt, &job.RepoPath, &job.RepoName, &commitSubject, &diffContent, &prompt, &agenticInt, &jobType, &reviewType, - &outputPrefix, &patchID) + &outputPrefix, &patchID, &backupAgent) if err != nil { return nil, err } @@ -915,6 +918,9 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { if patchID.Valid { job.PatchID = patchID.String } + if backupAgent.Valid { + job.BackupAgent = backupAgent.String + } job.EnqueuedAt = parseSQLiteTime(enqueuedAt) job.Status = JobStatusRunning job.WorkerID = workerID @@ -1004,12 +1010,30 @@ func (db *DB) CompleteJob(jobID int64, agent, prompt, output string) error { } // FailJob marks a job as failed with an error message. -// Only updates if job is still in 'running' state (respects cancellation). -func (db *DB) FailJob(jobID int64, errorMsg string) error { +// Only updates if job is still in 'running' state and owned by the given worker +// (respects cancellation and prevents stale workers from failing reclaimed jobs). +// Pass empty workerID to skip the ownership check (for admin/test callers). +// Returns true if the job was actually updated (false when ownership or status +// check prevented the update). +func (db *DB) FailJob(jobID int64, workerID string, errorMsg string) (bool, error) { now := time.Now().Format(time.RFC3339) - _, err := db.Exec(`UPDATE review_jobs SET status = 'failed', finished_at = ?, error = ?, updated_at = ? WHERE id = ? AND status = 'running'`, - now, errorMsg, now, jobID) - return err + var result sql.Result + var err error + if workerID != "" { + result, err = db.Exec(`UPDATE review_jobs SET status = 'failed', finished_at = ?, error = ?, updated_at = ? WHERE id = ? AND status = 'running' AND worker_id = ?`, + now, errorMsg, now, jobID, workerID) + } else { + result, err = db.Exec(`UPDATE review_jobs SET status = 'failed', finished_at = ?, error = ?, updated_at = ? WHERE id = ? AND status = 'running'`, + now, errorMsg, now, jobID) + } + if err != nil { + return false, err + } + rows, err := result.RowsAffected() + if err != nil { + return false, err + } + return rows > 0, nil } // CancelJob marks a running or queued job as canceled @@ -1087,17 +1111,26 @@ func (db *DB) ReenqueueJob(jobID int64) error { return nil } -// RetryJob atomically resets a running job to queued for retry. -// Returns false if max retries reached or job is not in running state. -// maxRetries is the number of retries allowed (e.g., 3 means up to 4 total attempts). -func (db *DB) RetryJob(jobID int64, maxRetries int) (bool, error) { - // Atomically update only if retry_count < maxRetries and status is running - // This prevents race conditions with multiple workers - result, err := db.Exec(` - UPDATE review_jobs - SET status = 'queued', worker_id = NULL, started_at = NULL, finished_at = NULL, error = NULL, retry_count = retry_count + 1 - WHERE id = ? AND retry_count < ? AND status = 'running' - `, jobID, maxRetries) +// RetryJob requeues a running job for retry if retry_count < maxRetries. +// When workerID is non-empty the update is scoped to the owning worker, +// preventing a stale/zombie worker from requeuing a reclaimed job. +// Pass empty workerID to skip the ownership check (for admin/test callers). +func (db *DB) RetryJob(jobID int64, workerID string, maxRetries int) (bool, error) { + var result sql.Result + var err error + if workerID != "" { + result, err = db.Exec(` + UPDATE review_jobs + SET status = 'queued', worker_id = NULL, started_at = NULL, finished_at = NULL, error = NULL, retry_count = retry_count + 1 + WHERE id = ? AND retry_count < ? AND status = 'running' AND worker_id = ? + `, jobID, maxRetries, workerID) + } else { + result, err = db.Exec(` + UPDATE review_jobs + SET status = 'queued', worker_id = NULL, started_at = NULL, finished_at = NULL, error = NULL, retry_count = retry_count + 1 + WHERE id = ? AND retry_count < ? AND status = 'running' + `, jobID, maxRetries) + } if err != nil { return false, err } @@ -1110,6 +1143,34 @@ func (db *DB) RetryJob(jobID int64, maxRetries int) (bool, error) { return rows > 0, nil } +// FailoverJob atomically switches a failed job to its backup agent and requeues it. +// Returns false if the job has no backup agent, is not in running state, the +// backup agent is the same as the current agent, or the worker doesn't own the job. +func (db *DB) FailoverJob(jobID int64, workerID string) (bool, error) { + result, err := db.Exec(` + UPDATE review_jobs + SET agent = backup_agent, + backup_agent = NULL, + retry_count = 0, + status = 'queued', + worker_id = NULL, + started_at = NULL, + finished_at = NULL, + error = NULL + WHERE id = ? + AND status = 'running' + AND worker_id = ? + AND backup_agent IS NOT NULL + AND backup_agent != '' + AND backup_agent != agent + `, jobID, workerID) + if err != nil { + return false, err + } + rows, err := result.RowsAffected() + return rows > 0, err +} + // GetJobRetryCount returns the retry count for a job func (db *DB) GetJobRetryCount(jobID int64) (int, error) { var count int @@ -1158,7 +1219,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.agent, j.reasoning, j.status, j.enqueued_at, j.started_at, j.finished_at, j.worker_id, j.error, j.prompt, j.retry_count, COALESCE(j.agentic, 0), r.root_path, r.name, c.subject, rv.addressed, rv.output, - j.source_machine_id, j.uuid, j.model, j.job_type, j.review_type, j.patch_id + j.source_machine_id, j.uuid, j.model, j.job_type, j.review_type, j.patch_id, j.backup_agent FROM review_jobs j JOIN repos r ON r.id = j.repo_id LEFT JOIN commits c ON c.id = j.commit_id @@ -1225,7 +1286,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int for rows.Next() { var j ReviewJob var enqueuedAt string - var startedAt, finishedAt, workerID, errMsg, prompt, output, sourceMachineID, jobUUID, model, branch, jobTypeStr, reviewTypeStr, patchIDStr sql.NullString + var startedAt, finishedAt, workerID, errMsg, prompt, output, sourceMachineID, jobUUID, model, branch, jobTypeStr, reviewTypeStr, patchIDStr, backupAgentStr sql.NullString var commitID sql.NullInt64 var commitSubject sql.NullString var addressed sql.NullInt64 @@ -1234,7 +1295,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int err := rows.Scan(&j.ID, &j.RepoID, &commitID, &j.GitRef, &branch, &j.Agent, &j.Reasoning, &j.Status, &enqueuedAt, &startedAt, &finishedAt, &workerID, &errMsg, &prompt, &j.RetryCount, &agentic, &j.RepoPath, &j.RepoName, &commitSubject, &addressed, &output, - &sourceMachineID, &jobUUID, &model, &jobTypeStr, &reviewTypeStr, &patchIDStr) + &sourceMachineID, &jobUUID, &model, &jobTypeStr, &reviewTypeStr, &patchIDStr, &backupAgentStr) if err != nil { return nil, err } @@ -1285,6 +1346,9 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int if branch.Valid { j.Branch = branch.String } + if backupAgentStr.Valid { + j.BackupAgent = backupAgentStr.String + } if addressed.Valid { val := addressed.Int64 != 0 j.Addressed = &val @@ -1359,18 +1423,18 @@ func (db *DB) GetJobByID(id int64) (*ReviewJob, error) { var commitSubject sql.NullString var agentic int - var model, branch, jobTypeStr, reviewTypeStr, patchIDStr sql.NullString + var model, branch, jobTypeStr, reviewTypeStr, patchIDStr, backupAgentStr sql.NullString err := db.QueryRow(` SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.agent, j.reasoning, j.status, j.enqueued_at, j.started_at, j.finished_at, j.worker_id, j.error, j.prompt, COALESCE(j.agentic, 0), - r.root_path, r.name, c.subject, j.model, j.job_type, j.review_type, j.patch_id + r.root_path, r.name, c.subject, j.model, j.job_type, j.review_type, j.patch_id, j.backup_agent FROM review_jobs j JOIN repos r ON r.id = j.repo_id LEFT JOIN commits c ON c.id = j.commit_id WHERE j.id = ? `, id).Scan(&j.ID, &j.RepoID, &commitID, &j.GitRef, &branch, &j.Agent, &j.Reasoning, &j.Status, &enqueuedAt, &startedAt, &finishedAt, &workerID, &errMsg, &prompt, &agentic, - &j.RepoPath, &j.RepoName, &commitSubject, &model, &jobTypeStr, &reviewTypeStr, &patchIDStr) + &j.RepoPath, &j.RepoName, &commitSubject, &model, &jobTypeStr, &reviewTypeStr, &patchIDStr, &backupAgentStr) if err != nil { return nil, err } @@ -1415,6 +1479,9 @@ func (db *DB) GetJobByID(id int64) (*ReviewJob, error) { if branch.Valid { j.Branch = branch.String } + if backupAgentStr.Valid { + j.BackupAgent = backupAgentStr.String + } return &j, nil } diff --git a/internal/storage/models.go b/internal/storage/models.go index 18829f4f..25d1fd0c 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -65,6 +65,7 @@ type ReviewJob struct { ReviewType string `json:"review_type,omitempty"` // Review type (e.g., "security") - changes system prompt PatchID string `json:"patch_id,omitempty"` // Stable patch-id for rebase tracking OutputPrefix string `json:"output_prefix,omitempty"` // Prefix to prepend to review output + BackupAgent string `json:"backup_agent,omitempty"` // Backup agent for failover // Sync fields UUID string `json:"uuid,omitempty"` // Globally unique identifier for sync diff --git a/internal/storage/repos_test.go b/internal/storage/repos_test.go index 02eccf40..91f3bdfb 100644 --- a/internal/storage/repos_test.go +++ b/internal/storage/repos_test.go @@ -553,7 +553,7 @@ func TestGetRepoStats(t *testing.T) { // Fail job3 claimJob(t, db, "worker-1") - if err := db.FailJob(job3.ID, "agent error"); err != nil { + if _, err := db.FailJob(job3.ID, "", "agent error"); err != nil { t.Fatalf("FailJob failed: %v", err) } @@ -1073,7 +1073,7 @@ func TestRetriedReviewJobNotRoutedAsPromptJob(t *testing.T) { } // 4. Retry the job (reset to queued) - retried, err := db.RetryJob(claimed.ID, 3) + retried, err := db.RetryJob(claimed.ID, "", 3) if err != nil { t.Fatalf("RetryJob failed: %v", err) } @@ -1124,7 +1124,7 @@ func TestRetriedReviewJobNotRoutedAsPromptJob(t *testing.T) { } // 3. Retry - retried, err := db.RetryJob(claimed.ID, 3) + retried, err := db.RetryJob(claimed.ID, "", 3) if err != nil { t.Fatalf("RetryJob failed: %v", err) } @@ -1170,7 +1170,7 @@ func TestRetriedReviewJobNotRoutedAsPromptJob(t *testing.T) { t.Error("Compact job: IsPromptJob() should be true") } - retried, err := db.RetryJob(claimed.ID, 3) + retried, err := db.RetryJob(claimed.ID, "", 3) if err != nil { t.Fatalf("RetryJob failed: %v", err) } @@ -1210,7 +1210,7 @@ func TestRetriedReviewJobNotRoutedAsPromptJob(t *testing.T) { t.Fatalf("SaveJobPrompt failed: %v", err) } - retried, err := db.RetryJob(claimed.ID, 3) + retried, err := db.RetryJob(claimed.ID, "", 3) if err != nil { t.Fatalf("RetryJob failed: %v", err) } @@ -1246,7 +1246,7 @@ func TestRetriedReviewJobNotRoutedAsPromptJob(t *testing.T) { t.Fatalf("SaveJobPrompt failed: %v", err) } - retried, err := db.RetryJob(claimed.ID, 3) + retried, err := db.RetryJob(claimed.ID, "", 3) if err != nil { t.Fatalf("RetryJob failed: %v", err) } From 1a7144f9b1727182320046b4b243ade1899f9464 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 18 Feb 2026 11:55:08 -0600 Subject: [PATCH 2/4] Resolve backup agent from config at failover time Remove backup_agent from the database schema and review_jobs model. Instead of storing the backup agent per-job at enqueue time, the worker now resolves it from config.toml when failover is actually needed. This avoids a schema migration for an infrequent operation and keeps the backup agent decision current with config changes. Changes: - Remove BackupAgent from ReviewJob struct and EnqueueOpts - Remove backup_agent column migration from db.go - Remove backup_agent from all SQL (INSERT/SELECT/Scan) - Refactor FailoverJob to accept backupAgent as a parameter - Add resolveBackupAgent to WorkerPool (reads config at failover time) - Remove backup agent resolution from server.go and ci_poller.go enqueue paths - Remove enqueue-time canonicalization tests (now handled at failover) - Update FailoverJob tests for new signature Co-Authored-By: Claude Opus 4.6 --- internal/daemon/ci_poller.go | 31 +++--------- internal/daemon/ci_poller_test.go | 70 -------------------------- internal/daemon/server.go | 52 ++++++-------------- internal/daemon/server_test.go | 69 -------------------------- internal/daemon/worker.go | 47 ++++++++++++++---- internal/storage/db.go | 12 ----- internal/storage/db_test.go | 82 ++++++++++++++----------------- internal/storage/jobs.go | 55 +++++++++------------ internal/storage/models.go | 2 - 9 files changed, 121 insertions(+), 299 deletions(-) diff --git a/internal/daemon/ci_poller.go b/internal/daemon/ci_poller.go index 4d8ad2d6..d2f981e8 100644 --- a/internal/daemon/ci_poller.go +++ b/internal/daemon/ci_poller.go @@ -427,33 +427,16 @@ func (p *CIPoller) processPR(ctx context.Context, ghRepo string, pr ghPR, cfg *c resolvedAgent = resolved.Name() } - // Resolve model and backup agent through workflow config + // Resolve model through workflow config resolvedModel := config.ResolveModelForWorkflow(cfg.CI.Model, repo.RootPath, cfg, workflow, reasoning) - backupAgent := config.ResolveBackupAgentForWorkflow(repo.RootPath, cfg, workflow) - - // Canonicalize backup agent: resolve alias to canonical name, - // then verify the agent is actually installed. Clear if unknown, - // unavailable, or same as primary. Uses strict resolution - // (Get+IsAvailable) to avoid silently falling back to a - // different agent. - if backupAgent != "" { - if resolved, err := agent.Get(backupAgent); err != nil || !agent.IsAvailable(resolved.Name()) { - backupAgent = "" - } else if resolved.Name() == resolvedAgent { - backupAgent = "" - } else { - backupAgent = resolved.Name() - } - } job, err := p.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - GitRef: gitRef, - Agent: resolvedAgent, - BackupAgent: backupAgent, - Model: resolvedModel, - Reasoning: reasoning, - ReviewType: rt, + RepoID: repo.ID, + GitRef: gitRef, + Agent: resolvedAgent, + Model: resolvedModel, + Reasoning: reasoning, + ReviewType: rt, }) if err != nil { rollback() diff --git a/internal/daemon/ci_poller_test.go b/internal/daemon/ci_poller_test.go index 43d9ff67..0be0842a 100644 --- a/internal/daemon/ci_poller_test.go +++ b/internal/daemon/ci_poller_test.go @@ -1255,76 +1255,6 @@ func TestResolveMinSeverity(t *testing.T) { }) } -func TestCIPollerProcessPR_BackupAgentCanonicalization(t *testing.T) { - t.Run("unknown backup agent is cleared", func(t *testing.T) { - h := newCIPollerHarness(t, "git@github.com:acme/api.git") - h.Cfg.CI.ReviewTypes = []string{"review"} - h.Cfg.CI.Agents = []string{"test"} - h.Poller = NewCIPoller(h.DB, NewStaticConfig(h.Cfg), nil) - h.stubProcessPRGit() - - // Write repo config with an unknown backup agent - repoConfig := h.RepoPath + "/.roborev.toml" - if err := os.WriteFile(repoConfig, []byte(`backup_agent = "nonexistent-agent-xyz"`), 0644); err != nil { - t.Fatalf("write repo config: %v", err) - } - - err := h.Poller.processPR(context.Background(), "acme/api", ghPR{ - Number: 1, - HeadRefOid: "head-abc", - BaseRefName: "main", - }, h.Cfg) - if err != nil { - t.Fatalf("processPR: %v", err) - } - - jobs, err := h.DB.ListJobs("", h.RepoPath, 0, 0, storage.WithGitRef("base-head-abc..head-abc")) - if err != nil { - t.Fatalf("ListJobs: %v", err) - } - if len(jobs) != 1 { - t.Fatalf("expected 1 job, got %d", len(jobs)) - } - if jobs[0].BackupAgent != "" { - t.Errorf("expected backup_agent cleared for unknown agent, got %q", jobs[0].BackupAgent) - } - }) - - t.Run("backup same as primary is cleared", func(t *testing.T) { - h := newCIPollerHarness(t, "git@github.com:acme/api.git") - h.Cfg.CI.ReviewTypes = []string{"review"} - h.Cfg.CI.Agents = []string{"test"} - h.Poller = NewCIPoller(h.DB, NewStaticConfig(h.Cfg), nil) - h.stubProcessPRGit() - - // Write repo config with backup_agent = "test" (same as primary) - repoConfig := h.RepoPath + "/.roborev.toml" - if err := os.WriteFile(repoConfig, []byte(`backup_agent = "test"`), 0644); err != nil { - t.Fatalf("write repo config: %v", err) - } - - err := h.Poller.processPR(context.Background(), "acme/api", ghPR{ - Number: 2, - HeadRefOid: "head-def", - BaseRefName: "main", - }, h.Cfg) - if err != nil { - t.Fatalf("processPR: %v", err) - } - - jobs, err := h.DB.ListJobs("", h.RepoPath, 0, 0, storage.WithGitRef("base-head-def..head-def")) - if err != nil { - t.Fatalf("ListJobs: %v", err) - } - if len(jobs) != 1 { - t.Fatalf("expected 1 job, got %d", len(jobs)) - } - if jobs[0].BackupAgent != "" { - t.Errorf("expected backup_agent cleared when same as primary, got %q", jobs[0].BackupAgent) - } - }) -} - // initGitRepoWithOrigin creates a git repo with an initial commit and // origin pointing to itself, so origin/main and GetDefaultBranch work. func initGitRepoWithOrigin(t *testing.T) (dir string, runGit func(args ...string) string) { diff --git a/internal/daemon/server.go b/internal/daemon/server.go index cb675891..37ddbdd1 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -576,7 +576,6 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { // Resolve agent for workflow at this reasoning level agentName := config.ResolveAgentForWorkflow(req.Agent, repoRoot, s.configWatcher.Config(), workflow, reasoning) - backupAgent := config.ResolveBackupAgentForWorkflow(repoRoot, s.configWatcher.Config(), workflow) // Resolve to an installed agent: if the configured agent isn't available, // fall back through the chain (codex -> claude-code -> gemini -> ...). @@ -588,21 +587,6 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { agentName = resolved.Name() } - // Canonicalize backup agent: resolve alias to canonical name, then verify - // the agent is actually installed. Clear if unknown, unavailable, or same - // as primary (failover to the same agent is a no-op). Uses strict - // resolution (Get+IsAvailable) instead of GetAvailable to avoid silently - // falling back to a different agent. - if backupAgent != "" { - if resolved, err := agent.Get(backupAgent); err != nil || !agent.IsAvailable(resolved.Name()) { - backupAgent = "" // unknown or not installed, skip failover - } else if resolved.Name() == agentName { - backupAgent = "" // same as primary after resolution - } else { - backupAgent = resolved.Name() - } - } - // Resolve model for workflow at this reasoning level model := config.ResolveModelForWorkflow(req.Model, repoRoot, s.configWatcher.Config(), workflow, reasoning) @@ -633,7 +617,6 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { RepoID: repo.ID, Branch: req.Branch, Agent: agentName, - BackupAgent: backupAgent, Model: model, Reasoning: reasoning, ReviewType: req.ReviewType, @@ -654,7 +637,6 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { GitRef: gitRef, Branch: req.Branch, Agent: agentName, - BackupAgent: backupAgent, Model: model, Reasoning: reasoning, ReviewType: req.ReviewType, @@ -694,14 +676,13 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { // Store as full SHA range fullRef := startSHA + ".." + endSHA job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - GitRef: fullRef, - Branch: req.Branch, - Agent: agentName, - BackupAgent: backupAgent, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, + RepoID: repo.ID, + GitRef: fullRef, + Branch: req.Branch, + Agent: agentName, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue job: %v", err)) @@ -732,16 +713,15 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { patchID := git.GetPatchID(gitCwd, sha) job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - CommitID: commit.ID, - GitRef: sha, - Branch: req.Branch, - Agent: agentName, - BackupAgent: backupAgent, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, - PatchID: patchID, + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: sha, + Branch: req.Branch, + Agent: agentName, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, + PatchID: patchID, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue job: %v", err)) diff --git a/internal/daemon/server_test.go b/internal/daemon/server_test.go index 9fed8b78..e1a40387 100644 --- a/internal/daemon/server_test.go +++ b/internal/daemon/server_test.go @@ -3487,72 +3487,3 @@ func TestHandleRemap(t *testing.T) { } }) } - -func TestHandleEnqueueBackupAgentCanonicalization(t *testing.T) { - server, db, tmpDir := newTestServer(t) - - repoDir := filepath.Join(tmpDir, "testrepo") - testutil.InitTestGitRepo(t, repoDir) - - t.Run("unknown backup agent is cleared", func(t *testing.T) { - // Write repo config with a backup agent that does not exist - repoConfig := filepath.Join(repoDir, ".roborev.toml") - if err := os.WriteFile(repoConfig, []byte(`backup_agent = "nonexistent-agent-xyz"`), 0644); err != nil { - t.Fatalf("write repo config: %v", err) - } - defer os.Remove(repoConfig) - - reqData := map[string]string{"repo_path": repoDir, "git_ref": "HEAD", "agent": "test"} - req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/enqueue", reqData) - w := httptest.NewRecorder() - server.handleEnqueue(w, req) - - if w.Code != http.StatusCreated { - t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String()) - } - - var respJob storage.ReviewJob - if err := json.NewDecoder(w.Body).Decode(&respJob); err != nil { - t.Fatalf("decode response: %v", err) - } - - job, err := db.GetJobByID(respJob.ID) - if err != nil { - t.Fatalf("GetJobByID: %v", err) - } - if job.BackupAgent != "" { - t.Errorf("expected backup_agent cleared for unknown agent, got %q", job.BackupAgent) - } - }) - - t.Run("backup same as primary is cleared", func(t *testing.T) { - // Write repo config with backup_agent = "test" (same as primary) - repoConfig := filepath.Join(repoDir, ".roborev.toml") - if err := os.WriteFile(repoConfig, []byte(`backup_agent = "test"`), 0644); err != nil { - t.Fatalf("write repo config: %v", err) - } - defer os.Remove(repoConfig) - - reqData := map[string]string{"repo_path": repoDir, "git_ref": "HEAD", "agent": "test"} - req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/enqueue", reqData) - w := httptest.NewRecorder() - server.handleEnqueue(w, req) - - if w.Code != http.StatusCreated { - t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String()) - } - - var respJob storage.ReviewJob - if err := json.NewDecoder(w.Body).Decode(&respJob); err != nil { - t.Fatalf("decode response: %v", err) - } - - job, err := db.GetJobByID(respJob.ID) - if err != nil { - t.Fatalf("GetJobByID: %v", err) - } - if job.BackupAgent != "" { - t.Errorf("expected backup_agent cleared when same as primary, got %q", job.BackupAgent) - } - }) -} diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index 598eb0b2..996dd819 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -466,15 +466,18 @@ func (wp *WorkerPool) failOrRetryInner(workerID string, job *storage.ReviewJob, log.Printf("[%s] Job %d queued for retry (%d/%d)", workerID, job.ID, retryCount, maxRetries) } else { // Retries exhausted -- attempt failover to backup agent if this is an agent error - if agentError && job.BackupAgent != "" { - failedOver, foErr := wp.db.FailoverJob(job.ID, workerID) - if foErr != nil { - log.Printf("[%s] Error attempting failover for job %d: %v", workerID, job.ID, foErr) - } - if failedOver { - log.Printf("[%s] Job %d failing over from %s to %s after %d retries: %s", - workerID, job.ID, agentName, job.BackupAgent, maxRetries, errorMsg) - return + if agentError { + backupAgent := wp.resolveBackupAgent(job) + if backupAgent != "" { + failedOver, foErr := wp.db.FailoverJob(job.ID, workerID, backupAgent) + if foErr != nil { + log.Printf("[%s] Error attempting failover for job %d: %v", workerID, job.ID, foErr) + } + if failedOver { + log.Printf("[%s] Job %d failing over from %s to %s after %d retries: %s", + workerID, job.ID, agentName, backupAgent, maxRetries, errorMsg) + return + } } } @@ -491,6 +494,32 @@ func (wp *WorkerPool) failOrRetryInner(workerID string, job *storage.ReviewJob, } } +// resolveBackupAgent determines the backup agent for a job from config. +// Returns the canonicalized backup agent name, or "" if none is +// available or it's the same as the job's current agent. +func (wp *WorkerPool) resolveBackupAgent(job *storage.ReviewJob) string { + cfg := wp.cfgGetter.Config() + workflow := "review" + if !config.IsDefaultReviewType(job.ReviewType) { + workflow = job.ReviewType + } + backup := config.ResolveBackupAgentForWorkflow( + job.RepoPath, cfg, workflow, + ) + if backup == "" { + return "" + } + // Canonicalize: resolve alias, verify installed, skip if same as primary + resolved, err := agent.Get(backup) + if err != nil || !agent.IsAvailable(resolved.Name()) { + return "" + } + if resolved.Name() == job.Agent { + return "" + } + return resolved.Name() +} + // broadcastFailed sends a review.failed event for a job func (wp *WorkerPool) broadcastFailed(job *storage.ReviewJob, agentName, errorMsg string) { wp.broadcaster.Broadcast(Event{ diff --git a/internal/storage/db.go b/internal/storage/db.go index e46c6196..a44ee8cb 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -596,18 +596,6 @@ func (db *DB) migrate() error { } } - // Migration: add backup_agent column to review_jobs if missing - err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('review_jobs') WHERE name = 'backup_agent'`).Scan(&count) - if err != nil { - return fmt.Errorf("check backup_agent column: %w", err) - } - if count == 0 { - _, err = db.Exec(`ALTER TABLE review_jobs ADD COLUMN backup_agent TEXT`) - if err != nil { - return fmt.Errorf("add backup_agent column: %w", err) - } - } - // Migration: add index on reviews.addressed for server-side filtering _, err = db.Exec(`CREATE INDEX IF NOT EXISTS idx_reviews_addressed ON reviews(addressed)`) if err != nil { diff --git a/internal/storage/db_test.go b/internal/storage/db_test.go index dd86f242..f41a0a53 100644 --- a/internal/storage/db_test.go +++ b/internal/storage/db_test.go @@ -907,13 +907,11 @@ func TestFailoverJob(t *testing.T) { repo := createRepo(t, db, "/tmp/failover-repo") commit := createCommit(t, db, repo.ID, "fo-abc123") - // Enqueue with backup agent job, err := db.EnqueueJob(EnqueueOpts{ - RepoID: repo.ID, - CommitID: commit.ID, - GitRef: "fo-abc123", - Agent: "primary", - BackupAgent: "backup", + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-abc123", + Agent: "primary", }) if err != nil { t.Fatalf("EnqueueJob: %v", err) @@ -923,7 +921,7 @@ func TestFailoverJob(t *testing.T) { claimJob(t, db, "worker-1") // Failover should succeed - ok, err := db.FailoverJob(job.ID, "worker-1") + ok, err := db.FailoverJob(job.ID, "worker-1", "backup") if err != nil { t.Fatalf("FailoverJob: %v", err) } @@ -931,7 +929,7 @@ func TestFailoverJob(t *testing.T) { t.Fatal("Expected failover to succeed") } - // Verify: agent swapped, backup cleared, retry_count reset, status queued + // Verify: agent swapped, retry_count reset, status queued updated, err := db.GetJobByID(job.ID) if err != nil { t.Fatalf("GetJobByID: %v", err) @@ -939,9 +937,6 @@ func TestFailoverJob(t *testing.T) { if updated.Agent != "backup" { t.Errorf("Agent = %q, want %q", updated.Agent, "backup") } - if updated.BackupAgent != "" { - t.Errorf("BackupAgent = %q, want empty", updated.BackupAgent) - } if updated.Status != JobStatusQueued { t.Errorf("Status = %q, want %q", updated.Status, JobStatusQueued) } @@ -951,19 +946,19 @@ func TestFailoverJob(t *testing.T) { } }) - t.Run("fails without backup agent", func(t *testing.T) { + t.Run("fails with empty backup agent", func(t *testing.T) { db := openTestDB(t) defer db.Close() _, _, job := createJobChain(t, db, "/tmp/failover-nobackup", "fo-no-backup") claimJob(t, db, "worker-1") - ok, err := db.FailoverJob(job.ID, "worker-1") + ok, err := db.FailoverJob(job.ID, "worker-1", "") if err != nil { t.Fatalf("FailoverJob: %v", err) } if ok { - t.Error("Expected failover to return false with no backup agent") + t.Error("Expected failover to return false with empty backup agent") } }) @@ -975,18 +970,17 @@ func TestFailoverJob(t *testing.T) { commit := createCommit(t, db, repo.ID, "fo-same123") job, err := db.EnqueueJob(EnqueueOpts{ - RepoID: repo.ID, - CommitID: commit.ID, - GitRef: "fo-same123", - Agent: "codex", - BackupAgent: "codex", + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-same123", + Agent: "codex", }) if err != nil { t.Fatalf("EnqueueJob: %v", err) } claimJob(t, db, "worker-1") - ok, err := db.FailoverJob(job.ID, "worker-1") + ok, err := db.FailoverJob(job.ID, "worker-1", "codex") if err != nil { t.Fatalf("FailoverJob: %v", err) } @@ -1004,17 +998,16 @@ func TestFailoverJob(t *testing.T) { // Job is queued (not claimed/running) job, err := db.EnqueueJob(EnqueueOpts{ - RepoID: repo.ID, - CommitID: commit.ID, - GitRef: "fo-queued", - Agent: "primary", - BackupAgent: "backup", + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-queued", + Agent: "primary", }) if err != nil { t.Fatalf("EnqueueJob: %v", err) } - ok, err := db.FailoverJob(job.ID, "worker-1") + ok, err := db.FailoverJob(job.ID, "worker-1", "backup") if err != nil { t.Fatalf("FailoverJob: %v", err) } @@ -1023,35 +1016,37 @@ func TestFailoverJob(t *testing.T) { } }) - t.Run("backup agent is null after failover", func(t *testing.T) { + t.Run("second failover with same backup is no-op", func(t *testing.T) { db := openTestDB(t) defer db.Close() - repo := createRepo(t, db, "/tmp/failover-null") - commit := createCommit(t, db, repo.ID, "fo-null123") + repo := createRepo(t, db, "/tmp/failover-double") + commit := createCommit(t, db, repo.ID, "fo-double") job, err := db.EnqueueJob(EnqueueOpts{ - RepoID: repo.ID, - CommitID: commit.ID, - GitRef: "fo-null123", - Agent: "primary", - BackupAgent: "backup", + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-double", + Agent: "primary", }) if err != nil { t.Fatalf("EnqueueJob: %v", err) } claimJob(t, db, "worker-1") - db.FailoverJob(job.ID, "worker-1") + // First failover: primary -> backup + db.FailoverJob(job.ID, "worker-1", "backup") - // Second failover should fail (backup_agent is now NULL) + // Reclaim, now agent is "backup" claimJob(t, db, "worker-1") - ok, err := db.FailoverJob(job.ID, "worker-1") + + // Second failover with same backup agent should fail (agent == backup) + ok, err := db.FailoverJob(job.ID, "worker-1", "backup") if err != nil { t.Fatalf("FailoverJob second attempt: %v", err) } if ok { - t.Error("Expected second failover to return false (backup_agent cleared)") + t.Error("Expected second failover to return false (agent already is backup)") } }) @@ -1063,11 +1058,10 @@ func TestFailoverJob(t *testing.T) { commit := createCommit(t, db, repo.ID, "fo-wrongw") job, err := db.EnqueueJob(EnqueueOpts{ - RepoID: repo.ID, - CommitID: commit.ID, - GitRef: "fo-wrongw", - Agent: "primary", - BackupAgent: "backup", + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-wrongw", + Agent: "primary", }) if err != nil { t.Fatalf("EnqueueJob: %v", err) @@ -1075,7 +1069,7 @@ func TestFailoverJob(t *testing.T) { claimJob(t, db, "worker-1") // A different worker should not be able to failover this job - ok, err := db.FailoverJob(job.ID, "worker-2") + ok, err := db.FailoverJob(job.ID, "worker-2", "backup") if err != nil { t.Fatalf("FailoverJob: %v", err) } diff --git a/internal/storage/jobs.go b/internal/storage/jobs.go index 9ad01c20..c6584397 100644 --- a/internal/storage/jobs.go +++ b/internal/storage/jobs.go @@ -717,7 +717,6 @@ type EnqueueOpts struct { GitRef string // SHA, "start..end" range, or "dirty" Branch string Agent string - BackupAgent string // Backup agent for failover (resolved at enqueue time) Model string Reasoning string ReviewType string // e.g. "security" — changes which system prompt is used @@ -781,12 +780,12 @@ func (db *DB) EnqueueJob(opts EnqueueOpts) (*ReviewJob, error) { } result, err := db.Exec(` - INSERT INTO review_jobs (repo_id, commit_id, git_ref, branch, agent, backup_agent, model, reasoning, + INSERT INTO review_jobs (repo_id, commit_id, git_ref, branch, agent, model, reasoning, status, job_type, review_type, patch_id, diff_content, prompt, agentic, output_prefix, uuid, source_machine_id, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'queued', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + VALUES (?, ?, ?, ?, ?, ?, ?, 'queued', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, opts.RepoID, commitIDParam, gitRef, nullString(opts.Branch), - opts.Agent, nullString(opts.BackupAgent), nullString(opts.Model), reasoning, + opts.Agent, nullString(opts.Model), reasoning, jobType, opts.ReviewType, nullString(opts.PatchID), nullString(opts.DiffContent), nullString(opts.Prompt), agenticInt, nullString(opts.OutputPrefix), @@ -802,7 +801,6 @@ func (db *DB) EnqueueJob(opts EnqueueOpts) (*ReviewJob, error) { GitRef: gitRef, Branch: opts.Branch, Agent: opts.Agent, - BackupAgent: opts.BackupAgent, Model: opts.Model, Reasoning: reasoning, JobType: jobType, @@ -869,11 +867,10 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { var reviewType sql.NullString var outputPrefix sql.NullString var patchID sql.NullString - var backupAgent sql.NullString err = db.QueryRow(` SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.agent, j.model, j.reasoning, j.status, j.enqueued_at, r.root_path, r.name, c.subject, j.diff_content, j.prompt, COALESCE(j.agentic, 0), j.job_type, j.review_type, - j.output_prefix, j.patch_id, j.backup_agent + j.output_prefix, j.patch_id FROM review_jobs j JOIN repos r ON r.id = j.repo_id LEFT JOIN commits c ON c.id = j.commit_id @@ -882,7 +879,7 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { LIMIT 1 `, workerID).Scan(&job.ID, &job.RepoID, &commitID, &job.GitRef, &branch, &job.Agent, &model, &job.Reasoning, &job.Status, &enqueuedAt, &job.RepoPath, &job.RepoName, &commitSubject, &diffContent, &prompt, &agenticInt, &jobType, &reviewType, - &outputPrefix, &patchID, &backupAgent) + &outputPrefix, &patchID) if err != nil { return nil, err } @@ -918,9 +915,6 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { if patchID.Valid { job.PatchID = patchID.String } - if backupAgent.Valid { - job.BackupAgent = backupAgent.String - } job.EnqueuedAt = parseSQLiteTime(enqueuedAt) job.Status = JobStatusRunning job.WorkerID = workerID @@ -1143,14 +1137,17 @@ func (db *DB) RetryJob(jobID int64, workerID string, maxRetries int) (bool, erro return rows > 0, nil } -// FailoverJob atomically switches a failed job to its backup agent and requeues it. -// Returns false if the job has no backup agent, is not in running state, the -// backup agent is the same as the current agent, or the worker doesn't own the job. -func (db *DB) FailoverJob(jobID int64, workerID string) (bool, error) { +// FailoverJob atomically switches a running job to the given backup agent +// and requeues it. Returns false if the job is not in running state, the +// worker doesn't own the job, or the backup agent is the same as the +// current agent. +func (db *DB) FailoverJob(jobID int64, workerID string, backupAgent string) (bool, error) { + if backupAgent == "" { + return false, nil + } result, err := db.Exec(` UPDATE review_jobs - SET agent = backup_agent, - backup_agent = NULL, + SET agent = ?, retry_count = 0, status = 'queued', worker_id = NULL, @@ -1160,10 +1157,8 @@ func (db *DB) FailoverJob(jobID int64, workerID string) (bool, error) { WHERE id = ? AND status = 'running' AND worker_id = ? - AND backup_agent IS NOT NULL - AND backup_agent != '' - AND backup_agent != agent - `, jobID, workerID) + AND agent != ? + `, backupAgent, jobID, workerID, backupAgent) if err != nil { return false, err } @@ -1219,7 +1214,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.agent, j.reasoning, j.status, j.enqueued_at, j.started_at, j.finished_at, j.worker_id, j.error, j.prompt, j.retry_count, COALESCE(j.agentic, 0), r.root_path, r.name, c.subject, rv.addressed, rv.output, - j.source_machine_id, j.uuid, j.model, j.job_type, j.review_type, j.patch_id, j.backup_agent + j.source_machine_id, j.uuid, j.model, j.job_type, j.review_type, j.patch_id FROM review_jobs j JOIN repos r ON r.id = j.repo_id LEFT JOIN commits c ON c.id = j.commit_id @@ -1286,7 +1281,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int for rows.Next() { var j ReviewJob var enqueuedAt string - var startedAt, finishedAt, workerID, errMsg, prompt, output, sourceMachineID, jobUUID, model, branch, jobTypeStr, reviewTypeStr, patchIDStr, backupAgentStr sql.NullString + var startedAt, finishedAt, workerID, errMsg, prompt, output, sourceMachineID, jobUUID, model, branch, jobTypeStr, reviewTypeStr, patchIDStr sql.NullString var commitID sql.NullInt64 var commitSubject sql.NullString var addressed sql.NullInt64 @@ -1295,7 +1290,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int err := rows.Scan(&j.ID, &j.RepoID, &commitID, &j.GitRef, &branch, &j.Agent, &j.Reasoning, &j.Status, &enqueuedAt, &startedAt, &finishedAt, &workerID, &errMsg, &prompt, &j.RetryCount, &agentic, &j.RepoPath, &j.RepoName, &commitSubject, &addressed, &output, - &sourceMachineID, &jobUUID, &model, &jobTypeStr, &reviewTypeStr, &patchIDStr, &backupAgentStr) + &sourceMachineID, &jobUUID, &model, &jobTypeStr, &reviewTypeStr, &patchIDStr) if err != nil { return nil, err } @@ -1346,9 +1341,6 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int if branch.Valid { j.Branch = branch.String } - if backupAgentStr.Valid { - j.BackupAgent = backupAgentStr.String - } if addressed.Valid { val := addressed.Int64 != 0 j.Addressed = &val @@ -1423,18 +1415,18 @@ func (db *DB) GetJobByID(id int64) (*ReviewJob, error) { var commitSubject sql.NullString var agentic int - var model, branch, jobTypeStr, reviewTypeStr, patchIDStr, backupAgentStr sql.NullString + var model, branch, jobTypeStr, reviewTypeStr, patchIDStr sql.NullString err := db.QueryRow(` SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.agent, j.reasoning, j.status, j.enqueued_at, j.started_at, j.finished_at, j.worker_id, j.error, j.prompt, COALESCE(j.agentic, 0), - r.root_path, r.name, c.subject, j.model, j.job_type, j.review_type, j.patch_id, j.backup_agent + r.root_path, r.name, c.subject, j.model, j.job_type, j.review_type, j.patch_id FROM review_jobs j JOIN repos r ON r.id = j.repo_id LEFT JOIN commits c ON c.id = j.commit_id WHERE j.id = ? `, id).Scan(&j.ID, &j.RepoID, &commitID, &j.GitRef, &branch, &j.Agent, &j.Reasoning, &j.Status, &enqueuedAt, &startedAt, &finishedAt, &workerID, &errMsg, &prompt, &agentic, - &j.RepoPath, &j.RepoName, &commitSubject, &model, &jobTypeStr, &reviewTypeStr, &patchIDStr, &backupAgentStr) + &j.RepoPath, &j.RepoName, &commitSubject, &model, &jobTypeStr, &reviewTypeStr, &patchIDStr) if err != nil { return nil, err } @@ -1479,9 +1471,6 @@ func (db *DB) GetJobByID(id int64) (*ReviewJob, error) { if branch.Valid { j.Branch = branch.String } - if backupAgentStr.Valid { - j.BackupAgent = backupAgentStr.String - } return &j, nil } diff --git a/internal/storage/models.go b/internal/storage/models.go index 25d1fd0c..bcede9da 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -65,8 +65,6 @@ type ReviewJob struct { ReviewType string `json:"review_type,omitempty"` // Review type (e.g., "security") - changes system prompt PatchID string `json:"patch_id,omitempty"` // Stable patch-id for rebase tracking OutputPrefix string `json:"output_prefix,omitempty"` // Prefix to prepend to review output - BackupAgent string `json:"backup_agent,omitempty"` // Backup agent for failover - // Sync fields UUID string `json:"uuid,omitempty"` // Globally unique identifier for sync SourceMachineID string `json:"source_machine_id,omitempty"` // Machine that created this job From 4d2ff2908b96c4e47b87d1de857c6d9d131092d2 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 18 Feb 2026 12:09:01 -0600 Subject: [PATCH 3/4] Add tests for worker-level backup agent resolution Cover resolveBackupAgent behavior: no config, unknown agent, same as primary, workflow mapping (default/security), workflow mismatch, and default_backup_agent fallback. Co-Authored-By: Claude Opus 4.6 --- internal/daemon/worker_test.go | 90 ++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/internal/daemon/worker_test.go b/internal/daemon/worker_test.go index 007114be..1fe2c765 100644 --- a/internal/daemon/worker_test.go +++ b/internal/daemon/worker_test.go @@ -333,3 +333,93 @@ func TestWorkerPoolCancelJobFinalCheckDeadlockSafe(t *testing.T) { t.Error("Job should have been canceled via final check path") } } + +func TestResolveBackupAgent(t *testing.T) { + tests := []struct { + name string + jobAgent string + reviewType string + cfgField string // Config field to set + cfgValue string // Value to set + want string + }{ + { + name: "no backup configured", + jobAgent: "test", + want: "", + }, + { + name: "unknown backup agent", + jobAgent: "test", + cfgField: "DefaultBackupAgent", + cfgValue: "nonexistent-agent-xyz", + want: "", + }, + { + name: "backup same as primary", + jobAgent: "test", + cfgField: "DefaultBackupAgent", + cfgValue: "test", + want: "", + }, + { + name: "default review type uses review workflow", + jobAgent: "codex", + cfgField: "ReviewBackupAgent", + cfgValue: "test", + want: "test", + }, + { + name: "security review type uses security workflow", + jobAgent: "codex", + reviewType: "security", + cfgField: "SecurityBackupAgent", + cfgValue: "test", + want: "test", + }, + { + name: "workflow mismatch returns empty", + jobAgent: "codex", + reviewType: "security", + cfgField: "ReviewBackupAgent", + cfgValue: "test", + want: "", + }, + { + name: "default_backup_agent fallback", + jobAgent: "codex", + cfgField: "DefaultBackupAgent", + cfgValue: "test", + want: "test", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := config.DefaultConfig() + + // Set the config field using a switch (avoids reflect) + switch tt.cfgField { + case "DefaultBackupAgent": + cfg.DefaultBackupAgent = tt.cfgValue + case "ReviewBackupAgent": + cfg.ReviewBackupAgent = tt.cfgValue + case "SecurityBackupAgent": + cfg.SecurityBackupAgent = tt.cfgValue + case "DesignBackupAgent": + cfg.DesignBackupAgent = tt.cfgValue + } + + pool := NewWorkerPool(nil, NewStaticConfig(cfg), 1, NewBroadcaster(), nil) + job := &storage.ReviewJob{ + Agent: tt.jobAgent, + ReviewType: tt.reviewType, + } + + got := pool.resolveBackupAgent(job) + if got != tt.want { + t.Errorf("resolveBackupAgent() = %q, want %q", got, tt.want) + } + }) + } +} From 1c51cf343b3632f0a64863432b41c6769edf08ef Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 18 Feb 2026 12:49:55 -0600 Subject: [PATCH 4/4] Clear model on failover, fix test isolation - FailoverJob now sets model = NULL so the backup agent uses its own default rather than inheriting a potentially incompatible model from the primary agent - Set RepoPath to t.TempDir() in TestResolveBackupAgent to avoid reading .roborev.toml from the working directory - Add design workflow test case for completeness - Add "clears model on failover" test in TestFailoverJob Co-Authored-By: Claude Opus 4.6 --- internal/daemon/worker_test.go | 9 ++++++++ internal/storage/db_test.go | 40 ++++++++++++++++++++++++++++++++++ internal/storage/jobs.go | 1 + 3 files changed, 50 insertions(+) diff --git a/internal/daemon/worker_test.go b/internal/daemon/worker_test.go index 1fe2c765..1e386cbc 100644 --- a/internal/daemon/worker_test.go +++ b/internal/daemon/worker_test.go @@ -377,6 +377,14 @@ func TestResolveBackupAgent(t *testing.T) { cfgValue: "test", want: "test", }, + { + name: "design review type uses design workflow", + jobAgent: "codex", + reviewType: "design", + cfgField: "DesignBackupAgent", + cfgValue: "test", + want: "test", + }, { name: "workflow mismatch returns empty", jobAgent: "codex", @@ -413,6 +421,7 @@ func TestResolveBackupAgent(t *testing.T) { pool := NewWorkerPool(nil, NewStaticConfig(cfg), 1, NewBroadcaster(), nil) job := &storage.ReviewJob{ Agent: tt.jobAgent, + RepoPath: t.TempDir(), ReviewType: tt.reviewType, } diff --git a/internal/storage/db_test.go b/internal/storage/db_test.go index f41a0a53..3cd9266c 100644 --- a/internal/storage/db_test.go +++ b/internal/storage/db_test.go @@ -946,6 +946,46 @@ func TestFailoverJob(t *testing.T) { } }) + t.Run("clears model on failover", func(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + repo := createRepo(t, db, "/tmp/failover-model") + commit := createCommit(t, db, repo.ID, "fo-model") + + job, err := db.EnqueueJob(EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fo-model", + Agent: "primary", + Model: "o3-mini", + }) + if err != nil { + t.Fatalf("EnqueueJob: %v", err) + } + if job.Model != "o3-mini" { + t.Fatalf("Model = %q, want %q", job.Model, "o3-mini") + } + + claimJob(t, db, "worker-1") + + ok, err := db.FailoverJob(job.ID, "worker-1", "backup") + if err != nil { + t.Fatalf("FailoverJob: %v", err) + } + if !ok { + t.Fatal("Expected failover to succeed") + } + + updated, err := db.GetJobByID(job.ID) + if err != nil { + t.Fatalf("GetJobByID: %v", err) + } + if updated.Model != "" { + t.Errorf("Model = %q, want empty (cleared on failover)", updated.Model) + } + }) + t.Run("fails with empty backup agent", func(t *testing.T) { db := openTestDB(t) defer db.Close() diff --git a/internal/storage/jobs.go b/internal/storage/jobs.go index c6584397..49a3babe 100644 --- a/internal/storage/jobs.go +++ b/internal/storage/jobs.go @@ -1148,6 +1148,7 @@ func (db *DB) FailoverJob(jobID int64, workerID string, backupAgent string) (boo result, err := db.Exec(` UPDATE review_jobs SET agent = ?, + model = NULL, retry_count = 0, status = 'queued', worker_id = NULL,