From 3057e2d3c9cc693c36224db0dd0c1c784fa78c07 Mon Sep 17 00:00:00 2001 From: David Dai Date: Thu, 1 Apr 2021 19:09:27 +0800 Subject: [PATCH] worker: Backup queued json for unique jobs --- go.mod | 2 -- go.sum | 4 ---- job.go | 5 +++++ worker.go | 8 +++++++- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 224eab98..a16ce294 100644 --- a/go.mod +++ b/go.mod @@ -15,13 +15,11 @@ require ( github.com/garyburd/redigo v1.6.0 // indirect github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0 github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b - github.com/gocraft/work v0.5.1 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect github.com/gomodule/redigo v2.0.0+incompatible github.com/jrallison/go-workers v0.0.0-20180112190529-dbf81d0b75bb github.com/kr/pretty v0.2.0 // indirect github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 // indirect - github.com/robfig/cron v1.2.0 // indirect github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.5.1 github.com/youtube/vitess v2.1.1+incompatible // indirect diff --git a/go.sum b/go.sum index 63aa5f0a..89fa3ef1 100644 --- a/go.sum +++ b/go.sum @@ -24,8 +24,6 @@ github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0 h1:pKjeDsx7HGGbjr7V github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0/go.mod h1:rWibcVfwbUxi/QXW84U7vNTcIcZFd6miwbt8ritxh/Y= github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b h1:g2Qcs0B+vOQE1L3a7WQ/JUUSzJnHbTz14qkJSqEWcF4= github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b/go.mod h1:Ag7UMbZNGrnHwaXPJOUKJIVgx4QOWMOWZngrvsN6qak= -github.com/gocraft/work v0.5.1 h1:3bRjMiOo6N4zcRgZWV3Y7uX7R22SF+A9bPTk4xRXr34= -github.com/gocraft/work v0.5.1/go.mod h1:pc3n9Pb5FAESPPGfM0nL+7Q1xtgtRnF8rr/azzhQVlM= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= @@ -41,8 +39,6 @@ github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 h1:yOXfzNV7q github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701/go.mod h1:VtBIF1XX0c1nKkeAPk8i4aXkYopqQgfDqolHUIHPwNI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= -github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/job.go b/job.go index 7c6f5d79..de2a4f89 100644 --- a/job.go +++ b/job.go @@ -23,6 +23,7 @@ type Job struct { FailedAt int64 `json:"failed_at,omitempty"` rawJSON []byte + rawQueueJSON []byte // the JSON stored in in-progress queue, which may changed by redisLuaZremLpushCmd in requeuer dequeuedFrom []byte inProgQueue []byte argError error @@ -45,6 +46,10 @@ func newJob(rawJSON, dequeuedFrom, inProgQueue []byte) (*Job, error) { return &job, nil } +func (j *Job) setQueueJSON(json []byte) { + j.rawQueueJSON = json +} + func (j *Job) serialize() ([]byte, error) { return json.Marshal(j) } diff --git a/worker.go b/worker.go index d7578334..c8b9f3b1 100644 --- a/worker.go +++ b/worker.go @@ -196,6 +196,8 @@ func (w *worker) processJob(job *Job) { // Going forward the job on the queue will always be just a placeholder, and we will be replacing it with the // updated job extracted here if updatedJob != nil { + // Keep json content from in-progress queue for further cleanup + updatedJob.setQueueJSON(job.rawJSON) job = updatedJob } } @@ -268,8 +270,12 @@ func (w *worker) removeJobFromInProgress(job *Job, fate terminateOp) { conn := w.pool.Get() defer conn.Close() + rawJSON := job.rawJSON + if job.rawQueueJSON != nil { + rawJSON = job.rawQueueJSON + } conn.Send("MULTI") - conn.Send("LREM", job.inProgQueue, 1, job.rawJSON) + conn.Send("LREM", job.inProgQueue, 1, rawJSON) conn.Send("DECR", redisKeyJobsLock(w.namespace, job.Name)) conn.Send("HINCRBY", redisKeyJobsLockInfo(w.namespace, job.Name), w.poolID, -1) fate(conn)