Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions cli/borges/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ const (

type producerCmd struct {
cmd
Source string `long:"source" default:"mentions" description:"source to produce jobs from (mentions, file)"`
MentionsQueue string `long:"mentionsqueue" default:"rovers" description:"queue name used to obtain mentions if the source type is 'mentions'"`
File string `long:"file" description:"path to a file to read URLs from, used with --source=file"`
RepublishBuried bool `long:"republish-buried" description:"republishes again all buried jobs before starting to listen for mentions, used with --source=mentions"`
Priority uint8 `long:"priority" default:"4" description:"priority used to enqueue jobs, goes from 0 (lowest) to :MAX: (highest)"`
JobsRetries int `long:"job-retries" default:"5" description:"number of times a falied job should be processed again before reject it"`
Source string `long:"source" default:"mentions" description:"source to produce jobs from (mentions, file)"`
MentionsQueue string `long:"mentionsqueue" default:"rovers" description:"queue name used to obtain mentions if the source type is 'mentions'"`
File string `long:"file" description:"path to a file to read URLs from, used with --source=file"`
RepublishMentions bool `long:"republish-mentions" description:"republishes again all buried mentions before starting to listen for new mentions, used with --source=mentions"`
RepublishJobs bool `long:"republish-jobs" description:"republish failed jobs on the main queue"`
Priority uint8 `long:"priority" default:"4" description:"priority used to enqueue jobs, goes from 0 (lowest) to :MAX: (highest)"`
JobsRetries int `long:"job-retries" default:"5" description:"number of times a falied job should be processed again before reject it"`
}

// Changes the priority description and default on runtime as it is not
Expand Down Expand Up @@ -68,6 +69,10 @@ func (c *producerCmd) Execute(args []string) error {
return err
}

if c.RepublishJobs {
q.RepublishBuried(jobCondition)
}

ji, err := c.jobIter(b)
if err != nil {
return err
Expand All @@ -82,6 +87,15 @@ func (c *producerCmd) Execute(args []string) error {
return err
}

func jobCondition(job *queue.Job) bool {
// Althoug the job has the temporary error tag, it must be checked
// that the retries is equals to zero. The reason for this is that
// a job can panic during a retry process, so it can be tagged as
// temporary error and a number of retries greater than zero reveals
// that fact.
return job.ErrorType == borges.TemporaryError && job.Retries == 0
}

func (c *producerCmd) jobIter(b queue.Broker) (borges.JobIter, error) {
storer := storage.FromDatabase(core.Database())

Expand All @@ -92,8 +106,8 @@ func (c *producerCmd) jobIter(b queue.Broker) (borges.JobIter, error) {
return nil, err
}

if c.RepublishBuried {
if err := q.RepublishBuried(); err != nil {
if c.RepublishMentions {
if err := q.RepublishBuried(mentionCondition); err != nil {
return nil, err
}
}
Expand All @@ -108,3 +122,5 @@ func (c *producerCmd) jobIter(b queue.Broker) (borges.JobIter, error) {
return nil, fmt.Errorf("invalid source: %s", c.Source)
}
}

func mentionCondition(*queue.Job) bool { return true }
3 changes: 2 additions & 1 deletion consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func (s *ConsumerSuite) TestConsumer_StartStop_FailedJob() {
require.Error(timeoutChan(done, time.Second*5))
require.Equal(1, processed)

err = s.queue.RepublishBuried()
testCondition := func(*queue.Job) bool { return true }
err = s.queue.RepublishBuried(testCondition)
require.NoError(err)

require.NoError(timeoutChan(done, time.Second*10))
Expand Down
4 changes: 2 additions & 2 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/inconshreveable/log15"
)

const temporaryError = "temporary"
const TemporaryError = "temporary"

// Worker is a worker that processes jobs from a channel.
type Worker struct {
Expand Down Expand Up @@ -63,7 +63,7 @@ func (w *Worker) Start() {

if requeue {
job.queueJob.Retries--
job.queueJob.ErrorType = temporaryError
job.queueJob.ErrorType = TemporaryError
job.source.Publish(job.queueJob)
}

Expand Down