Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,58 @@ func (c *Client) DeleteDeadJob(diedAt int64, jobID string) error {
return nil
}

// Retry retries failed jobs of a specific type from the dead queue.
func (c *Client) RetryDeadOfType(jobType string) error {
// Get queues for job names
queues, err := c.Queues()
if err != nil {
logError("client.retry_all_dead_jobs.queues", err)
return err
}

fmt.Println("Queues: %+V", queues)
// Extract job names
var jobNames []string
for _, q := range queues {
if q.JobName == jobType {
jobNames = append(jobNames, q.JobName)
}
}
fmt.Println("JobNames: %+V", jobNames)

script := redis.NewScript(len(jobNames)+1, redisLuaRequeueAllDeadCmd)

args := make([]interface{}, 0, len(jobNames)+1+3)
args = append(args, redisKeyDead(c.namespace)) // KEY[1]
for _, jobName := range jobNames {
args = append(args, redisKeyJobs(c.namespace, jobName)) // KEY[2, 3, ...]
}
args = append(args, redisKeyJobsPrefix(c.namespace)) // ARGV[1]
args = append(args, nowEpochSeconds())
args = append(args, 1000)

conn := c.pool.Get()
defer conn.Close()

fmt.Println("args: %+V", args)
// Cap iterations for safety (which could reprocess 1k*1k jobs).
// This is conceptually an infinite loop but let's be careful.
for i := 0; i < 1000; i++ {
res, err := redis.Int64(script.Do(conn, args...))
if err != nil {
logError("client.retry_all_dead_jobs.do", err)
return err
}

if res == 0 {
break
}
}

return nil

}

// RetryDeadJob retries a dead job. The job will be re-queued on the normal work queue for eventual processing by a worker.
func (c *Client) RetryDeadJob(diedAt int64, jobID string) error {
// Get queues for job names
Expand Down
7 changes: 7 additions & 0 deletions webui/webui.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func NewServer(namespace string, pool *redis.Pool, hostPort string) *Server {
router.Get("/dead_jobs", (*context).deadJobs)
router.Post("/delete_dead_job/:died_at:\\d.*/:job_id", (*context).deleteDeadJob)
router.Post("/retry_dead_job/:died_at:\\d.*/:job_id", (*context).retryDeadJob)
router.Post("/retry_dead_job_type/:job_type", (*context).retryDeadJobOfType)
router.Post("/delete_all_dead_jobs", (*context).deleteAllDeadJobs)
router.Post("/retry_all_dead_jobs", (*context).retryAllDeadJobs)

Expand Down Expand Up @@ -205,6 +206,12 @@ func (c *context) retryDeadJob(rw web.ResponseWriter, r *web.Request) {
render(rw, map[string]string{"status": "ok"}, err)
}

func (c *context) retryDeadJobOfType(rw web.ResponseWriter, r *web.Request) {
err := c.client.RetryDeadOfType(r.PathParams["job_type"])

render(rw, map[string]string{"status": "ok"}, err)
}

func (c *context) deleteAllDeadJobs(rw web.ResponseWriter, r *web.Request) {
err := c.client.DeleteAllDeadJobs()
render(rw, map[string]string{"status": "ok"}, err)
Expand Down