From 562041fbafae1f88650c3e58c5d270937780eca2 Mon Sep 17 00:00:00 2001 From: Manuel Carmona Date: Wed, 11 Apr 2018 09:25:15 +0200 Subject: [PATCH] Add logic to the producer to be able to requeue errored jobs from the buried queue Signed-off-by: Manuel Carmona --- cli/borges/producer.go | 32 ++++++++++++++++++++++++-------- consumer_test.go | 3 ++- worker.go | 4 ++-- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/cli/borges/producer.go b/cli/borges/producer.go index 04aadb88..25e69068 100644 --- a/cli/borges/producer.go +++ b/cli/borges/producer.go @@ -23,12 +23,13 @@ const ( type producerCmd struct { cmd - Source string `long:"source" default:"mentions" description:"source to produce jobs from (mentions, file)"` - MentionsQueue string `long:"mentionsqueue" default:"rovers" description:"queue name used to obtain mentions if the source type is 'mentions'"` - File string `long:"file" description:"path to a file to read URLs from, used with --source=file"` - RepublishBuried bool `long:"republish-buried" description:"republishes again all buried jobs before starting to listen for mentions, used with --source=mentions"` - Priority uint8 `long:"priority" default:"4" description:"priority used to enqueue jobs, goes from 0 (lowest) to :MAX: (highest)"` - JobsRetries int `long:"job-retries" default:"5" description:"number of times a falied job should be processed again before reject it"` + Source string `long:"source" default:"mentions" description:"source to produce jobs from (mentions, file)"` + MentionsQueue string `long:"mentionsqueue" default:"rovers" description:"queue name used to obtain mentions if the source type is 'mentions'"` + File string `long:"file" description:"path to a file to read URLs from, used with --source=file"` + RepublishMentions bool `long:"republish-mentions" description:"republishes again all buried mentions before starting to listen for new mentions, used with --source=mentions"` + RepublishJobs bool `long:"republish-jobs" description:"republish failed jobs on the main queue"` + Priority uint8 `long:"priority" default:"4" description:"priority used to enqueue jobs, goes from 0 (lowest) to :MAX: (highest)"` + JobsRetries int `long:"job-retries" default:"5" description:"number of times a falied job should be processed again before reject it"` } // Changes the priority description and default on runtime as it is not @@ -68,6 +69,10 @@ func (c *producerCmd) Execute(args []string) error { return err } + if c.RepublishJobs { + q.RepublishBuried(jobCondition) + } + ji, err := c.jobIter(b) if err != nil { return err @@ -82,6 +87,15 @@ func (c *producerCmd) Execute(args []string) error { return err } +func jobCondition(job *queue.Job) bool { + // Althoug the job has the temporary error tag, it must be checked + // that the retries is equals to zero. The reason for this is that + // a job can panic during a retry process, so it can be tagged as + // temporary error and a number of retries greater than zero reveals + // that fact. + return job.ErrorType == borges.TemporaryError && job.Retries == 0 +} + func (c *producerCmd) jobIter(b queue.Broker) (borges.JobIter, error) { storer := storage.FromDatabase(core.Database()) @@ -92,8 +106,8 @@ func (c *producerCmd) jobIter(b queue.Broker) (borges.JobIter, error) { return nil, err } - if c.RepublishBuried { - if err := q.RepublishBuried(); err != nil { + if c.RepublishMentions { + if err := q.RepublishBuried(mentionCondition); err != nil { return nil, err } } @@ -108,3 +122,5 @@ func (c *producerCmd) jobIter(b queue.Broker) (borges.JobIter, error) { return nil, fmt.Errorf("invalid source: %s", c.Source) } } + +func mentionCondition(*queue.Job) bool { return true } diff --git a/consumer_test.go b/consumer_test.go index 8cc96e20..3d4a6845 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -67,7 +67,8 @@ func (s *ConsumerSuite) TestConsumer_StartStop_FailedJob() { require.Error(timeoutChan(done, time.Second*5)) require.Equal(1, processed) - err = s.queue.RepublishBuried() + testCondition := func(*queue.Job) bool { return true } + err = s.queue.RepublishBuried(testCondition) require.NoError(err) require.NoError(timeoutChan(done, time.Second*10)) diff --git a/worker.go b/worker.go index 7ec6702a..089ce322 100644 --- a/worker.go +++ b/worker.go @@ -4,7 +4,7 @@ import ( "github.com/inconshreveable/log15" ) -const temporaryError = "temporary" +const TemporaryError = "temporary" // Worker is a worker that processes jobs from a channel. type Worker struct { @@ -63,7 +63,7 @@ func (w *Worker) Start() { if requeue { job.queueJob.Retries-- - job.queueJob.ErrorType = temporaryError + job.queueJob.ErrorType = TemporaryError job.source.Publish(job.queueJob) }