diff --git a/README.md b/README.md index 3b1aa91a..38c4d8a0 100644 --- a/README.md +++ b/README.md @@ -3,20 +3,61 @@ # cron -## DRAFT - Upgrading to v3 +Cron V3 has been released! + +To download the specific tagged release, run: +```bash +go get github.com/robfig/cron/v3@v3.0.0 +``` +Import it in your program as: +```go +import "github.com/robfig/cron/v3" +``` +It requires Go 1.11 or later due to usage of Go Modules. + +Refer to the documentation here: +http://godoc.org/github.com/robfig/cron + +The rest of this document describes the the advances in v3 and a list of +breaking changes for users that wish to upgrade from an earlier version. + +## Upgrading to v3 (June 2019) cron v3 is a major upgrade to the library that addresses all outstanding bugs, -feature requests, and clarifications around usage. It is based on a merge of -master which contains various fixes to issues found over the years and the v2 -branch which contains some backwards-incompatible features like the ability to -remove cron jobs. In addition, v3 adds support for Go Modules and cleans up -rough edges like the timezone support. +feature requests, and rough edges. It is based on a merge of master which +contains various fixes to issues found over the years and the v2 branch which +contains some backwards-incompatible features like the ability to remove cron +jobs. In addition, v3 adds support for Go Modules, cleans up rough edges like +the timezone support, and fixes a number of bugs. + +New features: + +- Support for Go modules. Callers must now import this library as + `github.com/robfig/cron/v3`, instead of `gopkg.in/...` + +- Fixed bugs: + - 0f01e6b parser: fix combining of Dow and Dom (#70) + - dbf3220 adjust times when rolling the clock forward to handle non-existent midnight (#157) + - eeecf15 spec_test.go: ensure an error is returned on 0 increment (#144) + - 70971dc cron.Entries(): update request for snapshot to include a reply channel (#97) + - 1cba5e6 cron: fix: removing a job causes the next scheduled job to run too late (#206) + +- Standard cron spec parsing by default (first field is "minute"), with an easy + way to opt into the seconds field (quartz-compatible). Although, note that the + year field (optional in Quartz) is not supported. + +- Extensible, key/value logging via an interface that complies with + the https://github.com/go-logr/logr project. -It is currently IN DEVELOPMENT and will be considered released once a 3.0 -version is tagged. It is backwards INCOMPATIBLE with both the v1 and v2 -branches. +- The new Chain & JobWrapper types allow you to install "interceptors" to add + cross-cutting behavior like the following: + - Recover any panics from jobs + - Delay a job's execution if the previous run hasn't completed yet + - Skip a job's execution if the previous run hasn't completed yet + - Log each job's invocations + - Notification when jobs are completed -Updates required: +It is backwards incompatible with both v1 and v2. These updates are required: - The v1 branch accepted an optional seconds field at the beginning of the cron spec. This is non-standard and has led to a lot of confusion. The new default @@ -24,17 +65,17 @@ Updates required: UPDATING: To retain the old behavior, construct your Cron with a custom parser: - - // Seconds field, required - cron.New(cron.WithSeconds()) - - // Seconds field, optional - cron.New( - cron.WithParser( - cron.SecondOptional | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)) - +```go +// Seconds field, required +cron.New(cron.WithSeconds()) + +// Seconds field, optional +cron.New(cron.WithParser(cron.NewParser( + cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor, +))) +``` - The Cron type now accepts functional options on construction rather than the - ad-hoc behavior modification mechanisms before (setting a field, calling a setter). + previous ad-hoc behavior modification mechanisms (setting a field, calling a setter). UPDATING: Code that sets Cron.ErrorLogger or calls Cron.SetLocation must be updated to provide those values on construction. @@ -45,16 +86,26 @@ Updates required: UPDATING: No update is required. -Planned updates before calling v3 done: - -- Job "Interceptors" (name tbd), which make it easy for callers to mix desired - behavior like the following: - - Recover any panics from jobs - - Block this job if the previous run hasn't completed yet - - Logging job invocations - - Notification when jobs are completed - -- Fix all open bugs +- By default, cron will no longer recover panics in jobs that it runs. + Recovering can be surprising (see issue #192) and seems to be at odds with + typical behavior of libraries. Relatedly, the `cron.WithPanicLogger` option + has been removed to accommodate the more general JobWrapper type. + + UPDATING: To opt into panic recovery and configure the panic logger: +```go +cron.New(cron.WithChain( + cron.Recover(logger), // or use cron.DefaultLogger +)) +``` +- In adding support for https://github.com/go-logr/logr, `cron.WithVerboseLogger` was + removed, since it is duplicative with the leveled logging. + + UPDATING: Callers should use `WithLogger` and specify a logger that does not + discard `Info` logs. For convenience, one is provided that wraps `*log.Logger`: +```go +cron.New( + cron.WithLogger(cron.VerbosePrintfLogger(logger))) +``` ### Background - Cron spec format @@ -67,8 +118,8 @@ There are two cron spec formats in common usage: jobs in Java software [the Cron wikipedia page]: https://en.wikipedia.org/wiki/Cron -[the Quartz Scheduler]: http://www.quartz-scheduler.org/documentation/quartz-2.x/tutorials/crontrigger.html +[the Quartz Scheduler]: http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/tutorial-lesson-06.html The original version of this package included an optional "seconds" field, which -made it incompatible with both of these formats. Instead, the schedule parser -has been extended to support both types. +made it incompatible with both of these formats. Now, the "standard" format is +the default format accepted, and the Quartz format is opt-in. diff --git a/chain.go b/chain.go new file mode 100644 index 00000000..9c087b7b --- /dev/null +++ b/chain.go @@ -0,0 +1,92 @@ +package cron + +import ( + "fmt" + "runtime" + "sync" + "time" +) + +// JobWrapper decorates the given Job with some behavior. +type JobWrapper func(Job) Job + +// Chain is a sequence of JobWrappers that decorates submitted jobs with +// cross-cutting behaviors like logging or synchronization. +type Chain struct { + wrappers []JobWrapper +} + +// NewChain returns a Chain consisting of the given JobWrappers. +func NewChain(c ...JobWrapper) Chain { + return Chain{c} +} + +// Then decorates the given job with all JobWrappers in the chain. +// +// This: +// NewChain(m1, m2, m3).Then(job) +// is equivalent to: +// m1(m2(m3(job))) +func (c Chain) Then(j Job) Job { + for i := range c.wrappers { + j = c.wrappers[len(c.wrappers)-i-1](j) + } + return j +} + +// Recover panics in wrapped jobs and log them with the provided logger. +func Recover(logger Logger) JobWrapper { + return func(j Job) Job { + return FuncJob(func() { + defer func() { + if r := recover(); r != nil { + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + err, ok := r.(error) + if !ok { + err = fmt.Errorf("%v", r) + } + logger.Error(err, "panic", "stack", "...\n"+string(buf)) + } + }() + j.Run() + }) + } +} + +// DelayIfStillRunning serializes jobs, delaying subsequent runs until the +// previous one is complete. Jobs running after a delay of more than a minute +// have the delay logged at Info. +func DelayIfStillRunning(logger Logger) JobWrapper { + return func(j Job) Job { + var mu sync.Mutex + return FuncJob(func() { + start := time.Now() + mu.Lock() + defer mu.Unlock() + if dur := time.Since(start); dur > time.Minute { + logger.Info("delay", "duration", dur) + } + j.Run() + }) + } +} + +// SkipIfStillRunning skips an invocation of the Job if a previous invocation is +// still running. It logs skips to the given logger at Info level. +func SkipIfStillRunning(logger Logger) JobWrapper { + return func(j Job) Job { + var ch = make(chan struct{}, 1) + ch <- struct{}{} + return FuncJob(func() { + select { + case v := <-ch: + defer func() { ch <- v }() + j.Run() + default: + logger.Info("skip") + } + }) + } +} diff --git a/chain_test.go b/chain_test.go new file mode 100644 index 00000000..ec910975 --- /dev/null +++ b/chain_test.go @@ -0,0 +1,242 @@ +package cron + +import ( + "io/ioutil" + "log" + "reflect" + "sync" + "testing" + "time" +) + +func appendingJob(slice *[]int, value int) Job { + var m sync.Mutex + return FuncJob(func() { + m.Lock() + *slice = append(*slice, value) + m.Unlock() + }) +} + +func appendingWrapper(slice *[]int, value int) JobWrapper { + return func(j Job) Job { + return FuncJob(func() { + appendingJob(slice, value).Run() + j.Run() + }) + } +} + +func TestChain(t *testing.T) { + var nums []int + var ( + append1 = appendingWrapper(&nums, 1) + append2 = appendingWrapper(&nums, 2) + append3 = appendingWrapper(&nums, 3) + append4 = appendingJob(&nums, 4) + ) + NewChain(append1, append2, append3).Then(append4).Run() + if !reflect.DeepEqual(nums, []int{1, 2, 3, 4}) { + t.Error("unexpected order of calls:", nums) + } +} + +func TestChainRecover(t *testing.T) { + panickingJob := FuncJob(func() { + panic("panickingJob panics") + }) + + t.Run("panic exits job by default", func(t *testing.T) { + defer func() { + if err := recover(); err == nil { + t.Errorf("panic expected, but none received") + } + }() + NewChain().Then(panickingJob). + Run() + }) + + t.Run("Recovering JobWrapper recovers", func(t *testing.T) { + NewChain(Recover(PrintfLogger(log.New(ioutil.Discard, "", 0)))). + Then(panickingJob). + Run() + }) + + t.Run("composed with the *IfStillRunning wrappers", func(t *testing.T) { + NewChain(Recover(PrintfLogger(log.New(ioutil.Discard, "", 0)))). + Then(panickingJob). + Run() + }) +} + +type countJob struct { + m sync.Mutex + started int + done int + delay time.Duration +} + +func (j *countJob) Run() { + j.m.Lock() + j.started++ + j.m.Unlock() + time.Sleep(j.delay) + j.m.Lock() + j.done++ + j.m.Unlock() +} + +func (j *countJob) Started() int { + defer j.m.Unlock() + j.m.Lock() + return j.started +} + +func (j *countJob) Done() int { + defer j.m.Unlock() + j.m.Lock() + return j.done +} + +func TestChainDelayIfStillRunning(t *testing.T) { + + t.Run("runs immediately", func(t *testing.T) { + var j countJob + wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j) + go wrappedJob.Run() + time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete. + if c := j.Done(); c != 1 { + t.Errorf("expected job run once, immediately, got %d", c) + } + }) + + t.Run("second run immediate if first done", func(t *testing.T) { + var j countJob + wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j) + go func() { + go wrappedJob.Run() + time.Sleep(time.Millisecond) + go wrappedJob.Run() + }() + time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete. + if c := j.Done(); c != 2 { + t.Errorf("expected job run twice, immediately, got %d", c) + } + }) + + t.Run("second run delayed if first not done", func(t *testing.T) { + var j countJob + j.delay = 10 * time.Millisecond + wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j) + go func() { + go wrappedJob.Run() + time.Sleep(time.Millisecond) + go wrappedJob.Run() + }() + + // After 5ms, the first job is still in progress, and the second job was + // run but should be waiting for it to finish. + time.Sleep(5 * time.Millisecond) + started, done := j.Started(), j.Done() + if started != 1 || done != 0 { + t.Error("expected first job started, but not finished, got", started, done) + } + + // Verify that the second job completes. + time.Sleep(25 * time.Millisecond) + started, done = j.Started(), j.Done() + if started != 2 || done != 2 { + t.Error("expected both jobs done, got", started, done) + } + }) + +} + +func TestChainSkipIfStillRunning(t *testing.T) { + + t.Run("runs immediately", func(t *testing.T) { + var j countJob + wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j) + go wrappedJob.Run() + time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete. + if c := j.Done(); c != 1 { + t.Errorf("expected job run once, immediately, got %d", c) + } + }) + + t.Run("second run immediate if first done", func(t *testing.T) { + var j countJob + wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j) + go func() { + go wrappedJob.Run() + time.Sleep(time.Millisecond) + go wrappedJob.Run() + }() + time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete. + if c := j.Done(); c != 2 { + t.Errorf("expected job run twice, immediately, got %d", c) + } + }) + + t.Run("second run skipped if first not done", func(t *testing.T) { + var j countJob + j.delay = 10 * time.Millisecond + wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j) + go func() { + go wrappedJob.Run() + time.Sleep(time.Millisecond) + go wrappedJob.Run() + }() + + // After 5ms, the first job is still in progress, and the second job was + // aleady skipped. + time.Sleep(5 * time.Millisecond) + started, done := j.Started(), j.Done() + if started != 1 || done != 0 { + t.Error("expected first job started, but not finished, got", started, done) + } + + // Verify that the first job completes and second does not run. + time.Sleep(25 * time.Millisecond) + started, done = j.Started(), j.Done() + if started != 1 || done != 1 { + t.Error("expected second job skipped, got", started, done) + } + }) + + t.Run("skip 10 jobs on rapid fire", func(t *testing.T) { + var j countJob + j.delay = 10 * time.Millisecond + wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j) + for i := 0; i < 11; i++ { + go wrappedJob.Run() + } + time.Sleep(200 * time.Millisecond) + done := j.Done() + if done != 1 { + t.Error("expected 1 jobs executed, 10 jobs dropped, got", done) + } + }) + + t.Run("different jobs independent", func(t *testing.T) { + var j1, j2 countJob + j1.delay = 10 * time.Millisecond + j2.delay = 10 * time.Millisecond + chain := NewChain(SkipIfStillRunning(DiscardLogger)) + wrappedJob1 := chain.Then(&j1) + wrappedJob2 := chain.Then(&j2) + for i := 0; i < 11; i++ { + go wrappedJob1.Run() + go wrappedJob2.Run() + } + time.Sleep(100 * time.Millisecond) + var ( + done1 = j1.Done() + done2 = j2.Done() + ) + if done1 != 1 || done2 != 1 { + t.Error("expected both jobs executed once, got", done1, "and", done2) + } + }) + +} diff --git a/cron.go b/cron.go index e0ee6bd7..c7e91766 100644 --- a/cron.go +++ b/cron.go @@ -1,8 +1,9 @@ package cron import ( - "runtime" + "context" "sort" + "sync" "time" ) @@ -10,17 +11,24 @@ import ( // specified by the schedule. It may be started, stopped, and the entries may // be inspected while running. type Cron struct { - entries []*Entry - stop chan struct{} - add chan *Entry - remove chan EntryID - snapshot chan chan []Entry - running bool - logger Logger - vlogger Logger - location *time.Location - parser Parser - nextID EntryID + entries []*Entry + chain Chain + stop chan struct{} + add chan *Entry + remove chan EntryID + snapshot chan chan []Entry + running bool + logger Logger + runningMu sync.Mutex + location *time.Location + parser ScheduleParser + nextID EntryID + jobWaiter sync.WaitGroup +} + +// ScheduleParser is an interface for schedule spec parsers that return a Schedule +type ScheduleParser interface { + Parse(spec string) (Schedule, error) } // Job is an interface for submitted cron jobs. @@ -54,7 +62,12 @@ type Entry struct { // Prev is the last time this job was run, or the zero time if never. Prev time.Time - // Job is the thing to run when the Schedule is activated. + // WrappedJob is the thing to run when the Schedule is activated. + WrappedJob Job + + // Job is the thing that was submitted to cron. + // It is kept around so that user code that needs to get at the job later, + // e.g. via Entries() can do so. Job Job } @@ -88,27 +101,28 @@ func (s byTime) Less(i, j int) bool { // Description: The time zone in which schedules are interpreted // Default: time.Local // -// PanicLogger -// Description: How to log Jobs that panic -// Default: Log the panic to os.Stderr -// // Parser -// Description: -// Default: Parser that accepts the spec described here: https://en.wikipedia.org/wiki/Cron +// Description: Parser converts cron spec strings into cron.Schedules. +// Default: Accepts this spec: https://en.wikipedia.org/wiki/Cron +// +// Chain +// Description: Wrap submitted jobs to customize behavior. +// Default: A chain that recovers panics and logs them to stderr. // // See "cron.With*" to modify the default behavior. func New(opts ...Option) *Cron { c := &Cron{ - entries: nil, - add: make(chan *Entry), - stop: make(chan struct{}), - snapshot: make(chan chan []Entry), - remove: make(chan EntryID), - running: false, - logger: DefaultLogger, - vlogger: nil, - location: time.Local, - parser: standardParser, + entries: nil, + chain: NewChain(), + add: make(chan *Entry), + stop: make(chan struct{}), + snapshot: make(chan chan []Entry), + remove: make(chan EntryID), + running: false, + runningMu: sync.Mutex{}, + logger: DefaultLogger, + location: time.Local, + parser: standardParser, } for _, opt := range opts { opt(c) @@ -140,12 +154,16 @@ func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) { } // Schedule adds a Job to the Cron to be run on the given schedule. +// The job is wrapped with the configured Chain. func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID { + c.runningMu.Lock() + defer c.runningMu.Unlock() c.nextID++ entry := &Entry{ - ID: c.nextID, - Schedule: schedule, - Job: cmd, + ID: c.nextID, + Schedule: schedule, + WrappedJob: c.chain.Then(cmd), + Job: cmd, } if !c.running { c.entries = append(c.entries, entry) @@ -157,6 +175,8 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID { // Entries returns a snapshot of the cron entries. func (c *Cron) Entries() []Entry { + c.runningMu.Lock() + defer c.runningMu.Unlock() if c.running { replyChan := make(chan []Entry, 1) c.snapshot <- replyChan @@ -182,6 +202,8 @@ func (c *Cron) Entry(id EntryID) Entry { // Remove an entry from being run in the future. func (c *Cron) Remove(id EntryID) { + c.runningMu.Lock() + defer c.runningMu.Unlock() if c.running { c.remove <- id } else { @@ -191,6 +213,8 @@ func (c *Cron) Remove(id EntryID) { // Start the cron scheduler in its own goroutine, or no-op if already started. func (c *Cron) Start() { + c.runningMu.Lock() + defer c.runningMu.Unlock() if c.running { return } @@ -200,35 +224,26 @@ func (c *Cron) Start() { // Run the cron scheduler, or no-op if already running. func (c *Cron) Run() { + c.runningMu.Lock() if c.running { + c.runningMu.Unlock() return } c.running = true + c.runningMu.Unlock() c.run() } -func (c *Cron) runWithRecovery(j Job) { - defer func() { - if r := recover(); r != nil { - const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:runtime.Stack(buf, false)] - c.logger.Printf("panic running job: %v\n%s", r, buf) - } - }() - j.Run() -} - // run the scheduler.. this is private just due to the need to synchronize // access to the 'running' state variable. func (c *Cron) run() { - c.logVerbosef("cron is starting") + c.logger.Info("start") // Figure out the next activation times for each entry. now := c.now() for _, entry := range c.entries { entry.Next = entry.Schedule.Next(now) - c.logVerbosef("(%s) scheduled entry %d for %s", now, entry.ID, entry.Next) + c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next) } for { @@ -248,17 +263,17 @@ func (c *Cron) run() { select { case now = <-timer.C: now = now.In(c.location) - c.logVerbosef("(%s) woke up", now) + c.logger.Info("wake", "now", now) // Run every entry whose next time was less than now for _, e := range c.entries { if e.Next.After(now) || e.Next.IsZero() { break } - go c.runWithRecovery(e.Job) + c.startJob(e.WrappedJob) e.Prev = e.Next e.Next = e.Schedule.Next(now) - c.logVerbosef("(%s) started entry %d, next scheduled for %s", now, e.ID, e.Next) + c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next) } case newEntry := <-c.add: @@ -266,7 +281,7 @@ func (c *Cron) run() { now = c.now() newEntry.Next = newEntry.Schedule.Next(now) c.entries = append(c.entries, newEntry) - c.logVerbosef("(%s) added new entry %d, scheduled for", now, newEntry.ID, newEntry.Next) + c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next) case replyChan := <-c.snapshot: replyChan <- c.entrySnapshot() @@ -274,13 +289,14 @@ func (c *Cron) run() { case <-c.stop: timer.Stop() - c.logVerbosef("cron is stopping") + c.logger.Info("stop") return case id := <-c.remove: timer.Stop() + now = c.now() c.removeEntry(id) - c.logVerbosef("removed entry %d", id) + c.logger.Info("removed", "entry", id) } break @@ -288,19 +304,13 @@ func (c *Cron) run() { } } -// logVerbosef logs a verbose message, if such a logger is configured. -func (c *Cron) logVerbosef(format string, args ...interface{}) { - if c.vlogger != nil { - // Format any times provided as RFC3339, easier to read than default. - var formattedArgs []interface{} - for _, arg := range args { - if t, ok := arg.(time.Time); ok { - arg = t.Format(time.RFC3339) - } - formattedArgs = append(formattedArgs, arg) - } - c.vlogger.Printf(format, formattedArgs...) - } +// startJob runs the given job in a new goroutine. +func (c *Cron) startJob(j Job) { + c.jobWaiter.Add(1) + go func() { + defer c.jobWaiter.Done() + j.Run() + }() } // now returns current time in c location @@ -309,12 +319,20 @@ func (c *Cron) now() time.Time { } // Stop stops the cron scheduler if it is running; otherwise it does nothing. -func (c *Cron) Stop() { - if !c.running { - return +// A context is returned so the caller can wait for running jobs to complete. +func (c *Cron) Stop() context.Context { + c.runningMu.Lock() + defer c.runningMu.Unlock() + if c.running { + c.stop <- struct{}{} + c.running = false } - c.stop <- struct{}{} - c.running = false + ctx, cancel := context.WithCancel(context.Background()) + go func() { + c.jobWaiter.Wait() + cancel() + }() + return ctx } // entrySnapshot returns a copy of the current cron entry list. diff --git a/cron_test.go b/cron_test.go index 20e12cc7..36f06bf7 100644 --- a/cron_test.go +++ b/cron_test.go @@ -34,13 +34,14 @@ func (sw *syncWriter) String() string { return sw.wr.String() } -func newBufLogger(sw *syncWriter) *log.Logger { - return log.New(sw, "", log.LstdFlags) +func newBufLogger(sw *syncWriter) Logger { + return PrintfLogger(log.New(sw, "", log.LstdFlags)) } func TestFuncPanicRecovery(t *testing.T) { var buf syncWriter - cron := New(WithParser(secondParser), WithPanicLogger(newBufLogger(&buf))) + cron := New(WithParser(secondParser), + WithChain(Recover(newBufLogger(&buf)))) cron.Start() defer cron.Stop() cron.AddFunc("* * * * * ?", func() { @@ -66,7 +67,8 @@ func TestJobPanicRecovery(t *testing.T) { var job DummyJob var buf syncWriter - cron := New(WithParser(secondParser), WithPanicLogger(newBufLogger(&buf))) + cron := New(WithParser(secondParser), + WithChain(Recover(newBufLogger(&buf)))) cron.Start() defer cron.Stop() cron.AddJob("* * * * * ?", job) @@ -297,6 +299,13 @@ func TestLocalTimezone(t *testing.T) { wg.Add(2) now := time.Now() + // FIX: Issue #205 + // This calculation doesn't work in seconds 58 or 59. + // Take the easy way out and sleep. + if now.Second() >= 58 { + time.Sleep(2 * time.Second) + now = time.Now() + } spec := fmt.Sprintf("%d,%d %d %d %d %d ?", now.Second()+1, now.Second()+2, now.Minute(), now.Hour(), now.Day(), now.Month()) @@ -324,6 +333,13 @@ func TestNonLocalTimezone(t *testing.T) { } now := time.Now().In(loc) + // FIX: Issue #205 + // This calculation doesn't work in seconds 58 or 59. + // Take the easy way out and sleep. + if now.Second() >= 58 { + time.Sleep(2 * time.Second) + now = time.Now().In(loc) + } spec := fmt.Sprintf("%d,%d %d %d %d %d ?", now.Second()+1, now.Second()+2, now.Minute(), now.Hour(), now.Day(), now.Month()) @@ -469,6 +485,56 @@ func TestJob(t *testing.T) { } } +// Issue #206 +// Ensure that the next run of a job after removing an entry is accurate. +func TestScheduleAfterRemoval(t *testing.T) { + var wg1 sync.WaitGroup + var wg2 sync.WaitGroup + wg1.Add(1) + wg2.Add(1) + + // The first time this job is run, set a timer and remove the other job + // 750ms later. Correct behavior would be to still run the job again in + // 250ms, but the bug would cause it to run instead 1s later. + + var calls int + var mu sync.Mutex + + cron := newWithSeconds() + hourJob := cron.Schedule(Every(time.Hour), FuncJob(func() {})) + cron.Schedule(Every(time.Second), FuncJob(func() { + mu.Lock() + defer mu.Unlock() + switch calls { + case 0: + wg1.Done() + calls++ + case 1: + time.Sleep(750 * time.Millisecond) + cron.Remove(hourJob) + calls++ + case 2: + calls++ + wg2.Done() + case 3: + panic("unexpected 3rd call") + } + })) + + cron.Start() + defer cron.Stop() + + // the first run might be any length of time 0 - 1s, since the schedule + // rounds to the second. wait for the first run to true up. + wg1.Wait() + + select { + case <-time.After(2 * OneSecond): + t.Error("expected job fires 2 times") + case <-wait(&wg2): + } +} + type ZeroSchedule struct{} func (*ZeroSchedule) Next(time.Time) time.Time { @@ -489,6 +555,129 @@ func TestJobWithZeroTimeDoesNotRun(t *testing.T) { } } +func TestStopAndWait(t *testing.T) { + t.Run("nothing running, returns immediately", func(t *testing.T) { + cron := newWithSeconds() + cron.Start() + ctx := cron.Stop() + select { + case <-ctx.Done(): + case <-time.After(time.Millisecond): + t.Error("context was not done immediately") + } + }) + + t.Run("repeated calls to Stop", func(t *testing.T) { + cron := newWithSeconds() + cron.Start() + _ = cron.Stop() + time.Sleep(time.Millisecond) + ctx := cron.Stop() + select { + case <-ctx.Done(): + case <-time.After(time.Millisecond): + t.Error("context was not done immediately") + } + }) + + t.Run("a couple fast jobs added, still returns immediately", func(t *testing.T) { + cron := newWithSeconds() + cron.AddFunc("* * * * * *", func() {}) + cron.Start() + cron.AddFunc("* * * * * *", func() {}) + cron.AddFunc("* * * * * *", func() {}) + cron.AddFunc("* * * * * *", func() {}) + time.Sleep(time.Second) + ctx := cron.Stop() + select { + case <-ctx.Done(): + case <-time.After(time.Millisecond): + t.Error("context was not done immediately") + } + }) + + t.Run("a couple fast jobs and a slow job added, waits for slow job", func(t *testing.T) { + cron := newWithSeconds() + cron.AddFunc("* * * * * *", func() {}) + cron.Start() + cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) }) + cron.AddFunc("* * * * * *", func() {}) + time.Sleep(time.Second) + + ctx := cron.Stop() + + // Verify that it is not done for at least 750ms + select { + case <-ctx.Done(): + t.Error("context was done too quickly immediately") + case <-time.After(750 * time.Millisecond): + // expected, because the job sleeping for 1 second is still running + } + + // Verify that it IS done in the next 500ms (giving 250ms buffer) + select { + case <-ctx.Done(): + // expected + case <-time.After(1500 * time.Millisecond): + t.Error("context not done after job should have completed") + } + }) + + t.Run("repeated calls to stop, waiting for completion and after", func(t *testing.T) { + cron := newWithSeconds() + cron.AddFunc("* * * * * *", func() {}) + cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) }) + cron.Start() + cron.AddFunc("* * * * * *", func() {}) + time.Sleep(time.Second) + ctx := cron.Stop() + ctx2 := cron.Stop() + + // Verify that it is not done for at least 1500ms + select { + case <-ctx.Done(): + t.Error("context was done too quickly immediately") + case <-ctx2.Done(): + t.Error("context2 was done too quickly immediately") + case <-time.After(1500 * time.Millisecond): + // expected, because the job sleeping for 2 seconds is still running + } + + // Verify that it IS done in the next 1s (giving 500ms buffer) + select { + case <-ctx.Done(): + // expected + case <-time.After(time.Second): + t.Error("context not done after job should have completed") + } + + // Verify that ctx2 is also done. + select { + case <-ctx2.Done(): + // expected + case <-time.After(time.Millisecond): + t.Error("context2 not done even though context1 is") + } + + // Verify that a new context retrieved from stop is immediately done. + ctx3 := cron.Stop() + select { + case <-ctx3.Done(): + // expected + case <-time.After(time.Millisecond): + t.Error("context not done even when cron Stop is completed") + } + + }) +} + +func TestMultiThreadedStartAndStop(t *testing.T) { + cron := New() + go cron.Run() + time.Sleep(2 * time.Millisecond) + cron.Stop() +} + func wait(wg *sync.WaitGroup) chan bool { ch := make(chan bool) go func() { @@ -509,5 +698,5 @@ func stop(cron *Cron) chan bool { // newWithSeconds returns a Cron with the seconds field enabled. func newWithSeconds() *Cron { - return New(WithParser(secondParser)) + return New(WithParser(secondParser), WithChain()) } diff --git a/doc.go b/doc.go index efeb53fb..7171378a 100644 --- a/doc.go +++ b/doc.go @@ -1,6 +1,18 @@ /* Package cron implements a cron spec parser and job runner. +Installation + +To download the specific tagged release, run: + + go get github.com/robfig/cron/v3@v3.0.0 + +Import it in your program as: + + import "github.com/robfig/cron/v3" + +It requires Go 1.11 or later due to usage of Go Modules. + Usage Callers may register Funcs to be invoked on a given schedule. Cron will run @@ -9,9 +21,9 @@ them in their own goroutines. c := cron.New() c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") }) c.AddFunc("30 3-6,20-23 * * *", func() { fmt.Println(".. in the range 3-6am, 8-11pm") }) - c.AddFunc("CRON_TZ=Asia/Tokyo 30 04 * * * *", func() { fmt.Println("Runs at 04:30 Tokyo time every day") }) - c.AddFunc("@hourly", func() { fmt.Println("Every hour") }) - c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty") }) + c.AddFunc("CRON_TZ=Asia/Tokyo 30 04 * * *", func() { fmt.Println("Runs at 04:30 Tokyo time every day") }) + c.AddFunc("@hourly", func() { fmt.Println("Every hour, starting an hour from now") }) + c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty, starting an hour thirty from now") }) c.Start() .. // Funcs are invoked in their own goroutine, asynchronously. @@ -32,7 +44,7 @@ A cron expression represents a set of times, using 5 space-separated fields. ---------- | ---------- | -------------- | -------------------------- Minutes | Yes | 0-59 | * / , - Hours | Yes | 0-23 | * / , - - Day of month | Yes | 1-31 | * / , - ? + Day of month | Yes | 1-31 | * / , - ? L Month | Yes | 1-12 or JAN-DEC | * / , - Day of week | Yes | 0-6 or SUN-SAT | * / , - ? @@ -47,11 +59,18 @@ Alternative Formats Alternative Cron expression formats support other fields like seconds. You can implement that by creating a custom Parser as follows. - cron.New( - cron.WithParser( - cron.SecondOptional | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)) + cron.New( + cron.WithParser( + cron.NewParser( + cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor))) + +Since adding Seconds is the most common modification to the standard cron spec, +cron provides a builtin function to do that, which is equivalent to the custom +parser you saw earlier, except that its seconds field is REQUIRED: + + cron.New(cron.WithSeconds()) -The most popular alternative Cron expression format is Quartz: +That emulates Quartz, the most popular alternative Cron schedule format: http://www.quartz-scheduler.org/documentation/quartz-2.x/tutorials/crontrigger.html Special Characters @@ -86,6 +105,9 @@ Question mark ( ? ) Question mark may be used instead of '*' for leaving either day-of-month or day-of-week blank. +'L' stands for "last". When used in the day-of-month field, it specifies +the last day of the month. + Predefined schedules You may use one of several pre-defined schedules in place of a cron expression. @@ -150,6 +172,29 @@ The prefix "TZ=(TIME ZONE)" is also supported for legacy compatibility. Be aware that jobs scheduled during daylight-savings leap-ahead transitions will not be run! +Job Wrappers + +A Cron runner may be configured with a chain of job wrappers to add +cross-cutting functionality to all submitted jobs. For example, they may be used +to achieve the following effects: + + - Recover any panics from jobs (activated by default) + - Delay a job's execution if the previous run hasn't completed yet + - Skip a job's execution if the previous run hasn't completed yet + - Log each job's invocations + +Install wrappers for all jobs added to a cron using the `cron.WithChain` option: + + cron.New(cron.WithChain( + cron.SkipIfStillRunning(logger), + )) + +Install wrappers for individual jobs by explicitly wrapping them: + + job = cron.NewChain( + cron.SkipIfStillRunning(logger), + ).Then(job) + Thread safety Since the Cron service runs concurrently with the calling code, some amount of @@ -158,6 +203,23 @@ care must be taken to ensure proper synchronization. All cron methods are designed to be correctly synchronized as long as the caller ensures that invocations have a clear happens-before ordering between them. +Logging + +Cron defines a Logger interface that is a subset of the one defined in +github.com/go-logr/logr. It has two logging levels (Info and Error), and +parameters are key/value pairs. This makes it possible for cron logging to plug +into structured logging systems. An adapter, [Verbose]PrintfLogger, is provided +to wrap the standard library *log.Logger. + +For additional insight into Cron operations, verbose logging may be activated +which will record job runs, scheduling decisions, and added or removed jobs. +Activate it with a one-off logger as follows: + + cron.New( + cron.WithLogger( + cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags)))) + + Implementation Cron entries are stored in an array, sorted by their next activation time. Cron diff --git a/go.mod b/go.mod index 8c95bf47..c497f221 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ -module github.com/robfig/cron/v3 +module github.com/AtomicConductor/cron/v3 go 1.12 diff --git a/logger.go b/logger.go index 5cbfe90d..b4efcc05 100644 --- a/logger.go +++ b/logger.go @@ -1,14 +1,86 @@ package cron import ( + "io/ioutil" "log" "os" + "strings" + "time" ) -var DefaultLogger = log.New(os.Stderr, "cron: ", log.LstdFlags) +// DefaultLogger is used by Cron if none is specified. +var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags)) + +// DiscardLogger can be used by callers to discard all log messages. +var DiscardLogger Logger = PrintfLogger(log.New(ioutil.Discard, "", 0)) // Logger is the interface used in this package for logging, so that any backend -// can be easily plugged in. It's implemented directly by "log" and logrus. +// can be plugged in. It is a subset of the github.com/go-logr/logr interface. type Logger interface { - Printf(string, ...interface{}) + // Info logs routine messages about cron's operation. + Info(msg string, keysAndValues ...interface{}) + // Error logs an error condition. + Error(err error, msg string, keysAndValues ...interface{}) +} + +// PrintfLogger wraps a Printf-based logger (such as the standard library "log") +// into an implementation of the Logger interface which logs errors only. +func PrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger { + return printfLogger{l, false} +} + +// VerbosePrintfLogger wraps a Printf-based logger (such as the standard library +// "log") into an implementation of the Logger interface which logs everything. +func VerbosePrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger { + return printfLogger{l, true} +} + +type printfLogger struct { + logger interface{ Printf(string, ...interface{}) } + logInfo bool +} + +func (pl printfLogger) Info(msg string, keysAndValues ...interface{}) { + if pl.logInfo { + keysAndValues = formatTimes(keysAndValues) + pl.logger.Printf( + formatString(len(keysAndValues)), + append([]interface{}{msg}, keysAndValues...)...) + } +} + +func (pl printfLogger) Error(err error, msg string, keysAndValues ...interface{}) { + keysAndValues = formatTimes(keysAndValues) + pl.logger.Printf( + formatString(len(keysAndValues)+2), + append([]interface{}{msg, "error", err}, keysAndValues...)...) +} + +// formatString returns a logfmt-like format string for the number of +// key/values. +func formatString(numKeysAndValues int) string { + var sb strings.Builder + sb.WriteString("%s") + if numKeysAndValues > 0 { + sb.WriteString(", ") + } + for i := 0; i < numKeysAndValues/2; i++ { + if i > 0 { + sb.WriteString(", ") + } + sb.WriteString("%v=%v") + } + return sb.String() +} + +// formatTimes formats any time.Time values as RFC3339. +func formatTimes(keysAndValues []interface{}) []interface{} { + var formattedArgs []interface{} + for _, arg := range keysAndValues { + if t, ok := arg.(time.Time); ok { + arg = t.Format(time.RFC3339) + } + formattedArgs = append(formattedArgs, arg) + } + return formattedArgs } diff --git a/option.go b/option.go index af0f9660..09e4278e 100644 --- a/option.go +++ b/option.go @@ -1,7 +1,6 @@ package cron import ( - "log" "time" ) @@ -24,22 +23,23 @@ func WithSeconds() Option { } // WithParser overrides the parser used for interpreting job schedules. -func WithParser(p Parser) Option { +func WithParser(p ScheduleParser) Option { return func(c *Cron) { c.parser = p } } -// WithPanicLogger overrides the logger used for logging job panics. -func WithPanicLogger(l *log.Logger) Option { +// WithChain specifies Job wrappers to apply to all jobs added to this cron. +// Refer to the Chain* functions in this package for provided wrappers. +func WithChain(wrappers ...JobWrapper) Option { return func(c *Cron) { - c.logger = l + c.chain = NewChain(wrappers...) } } -// WithVerboseLogger enables verbose logging of events that occur in cron. -func WithVerboseLogger(logger Logger) Option { +// WithLogger uses the provided logger. +func WithLogger(logger Logger) Option { return func(c *Cron) { - c.vlogger = logger + c.logger = logger } } diff --git a/option_test.go b/option_test.go index f864d3ed..8aef1682 100644 --- a/option_test.go +++ b/option_test.go @@ -1,7 +1,6 @@ package cron import ( - "bytes" "log" "strings" "testing" @@ -23,20 +22,11 @@ func TestWithParser(t *testing.T) { } } -func TestWithPanicLogger(t *testing.T) { - var b bytes.Buffer - var logger = log.New(&b, "", log.LstdFlags) - c := New(WithPanicLogger(logger)) - if c.logger != logger { - t.Error("expected provided logger") - } -} - func TestWithVerboseLogger(t *testing.T) { var buf syncWriter var logger = log.New(&buf, "", log.LstdFlags) - c := New(WithVerboseLogger(logger)) - if c.vlogger != logger { + c := New(WithLogger(VerbosePrintfLogger(logger))) + if c.logger.(printfLogger).logger != logger { t.Error("expected provided logger") } @@ -45,8 +35,8 @@ func TestWithVerboseLogger(t *testing.T) { time.Sleep(OneSecond) c.Stop() out := buf.String() - if !strings.Contains(out, "scheduled entry") || - !strings.Contains(out, "started entry") { + if !strings.Contains(out, "schedule,") || + !strings.Contains(out, "run,") { t.Error("expected to see some actions, got:", out) } } diff --git a/parser.go b/parser.go index 3cf8879f..0355b470 100644 --- a/parser.go +++ b/parser.go @@ -61,11 +61,11 @@ type Parser struct { // sched, err := specParser.Parse("0 0 15 */3 *") // // // Same as above, just excludes time fields -// subsParser := NewParser(Dom | Month | Dow) +// specParser := NewParser(Dom | Month | Dow) // sched, err := specParser.Parse("15 */3 *") // // // Same as above, just makes Dow optional -// subsParser := NewParser(Dom | Month | DowOptional) +// specParser := NewParser(Dom | Month | DowOptional) // sched, err := specParser.Parse("15 */3") // func NewParser(options ParseOption) Parser { @@ -120,6 +120,11 @@ func (p Parser) Parse(spec string) (Schedule, error) { return nil, err } + if fields[3] == "L" { + now := time.Now().In(loc) + fields[3] = strconv.Itoa(ldom(now)) + } + field := func(field string, r bounds) uint64 { if err != nil { return 0 @@ -149,6 +154,7 @@ func (p Parser) Parse(spec string) (Schedule, error) { Month: month, Dow: dayofweek, Location: loc, + CronExpr: spec, }, nil } @@ -373,6 +379,7 @@ func parseDescriptor(descriptor string, loc *time.Location) (Schedule, error) { Month: 1 << months.min, Dow: all(dow), Location: loc, + CronExpr: descriptor, }, nil case "@monthly": @@ -384,6 +391,7 @@ func parseDescriptor(descriptor string, loc *time.Location) (Schedule, error) { Month: all(months), Dow: all(dow), Location: loc, + CronExpr: descriptor, }, nil case "@weekly": @@ -395,6 +403,7 @@ func parseDescriptor(descriptor string, loc *time.Location) (Schedule, error) { Month: all(months), Dow: 1 << dow.min, Location: loc, + CronExpr: descriptor, }, nil case "@daily", "@midnight": @@ -406,6 +415,7 @@ func parseDescriptor(descriptor string, loc *time.Location) (Schedule, error) { Month: all(months), Dow: all(dow), Location: loc, + CronExpr: descriptor, }, nil case "@hourly": @@ -417,6 +427,7 @@ func parseDescriptor(descriptor string, loc *time.Location) (Schedule, error) { Month: all(months), Dow: all(dow), Location: loc, + CronExpr: descriptor, }, nil } @@ -432,3 +443,4 @@ func parseDescriptor(descriptor string, loc *time.Location) (Schedule, error) { return nil, fmt.Errorf("unrecognized descriptor: %s", descriptor) } + diff --git a/parser_test.go b/parser_test.go index 41c8c520..e326f553 100644 --- a/parser_test.go +++ b/parser_test.go @@ -144,17 +144,17 @@ func TestParseSchedule(t *testing.T) { expr string expected Schedule }{ - {secondParser, "0 5 * * * *", every5min(time.Local)}, - {standardParser, "5 * * * *", every5min(time.Local)}, - {secondParser, "CRON_TZ=UTC 0 5 * * * *", every5min(time.UTC)}, - {standardParser, "CRON_TZ=UTC 5 * * * *", every5min(time.UTC)}, - {secondParser, "CRON_TZ=Asia/Tokyo 0 5 * * * *", every5min(tokyo)}, + {secondParser, "0 5 * * * *", every5min(time.Local, "0 5 * * * *")}, + {standardParser, "5 * * * *", every5min(time.Local, "5 * * * *")}, + {secondParser, "CRON_TZ=UTC 0 5 * * * *", every5min(time.UTC, "0 5 * * * *")}, + {standardParser, "CRON_TZ=UTC 5 * * * *", every5min(time.UTC, "5 * * * *")}, + {secondParser, "CRON_TZ=Asia/Tokyo 0 5 * * * *", every5min(tokyo, "0 5 * * * *")}, {secondParser, "@every 5m", ConstantDelaySchedule{5 * time.Minute}}, - {secondParser, "@midnight", midnight(time.Local)}, - {secondParser, "TZ=UTC @midnight", midnight(time.UTC)}, - {secondParser, "TZ=Asia/Tokyo @midnight", midnight(tokyo)}, - {secondParser, "@yearly", annual(time.Local)}, - {secondParser, "@annually", annual(time.Local)}, + {secondParser, "@midnight", midnight(time.Local, "@midnight")}, + {secondParser, "TZ=UTC @midnight", midnight(time.UTC, "@midnight")}, + {secondParser, "TZ=Asia/Tokyo @midnight", midnight(tokyo, "@midnight")}, + {secondParser, "@yearly", annual(time.Local, "@yearly")}, + {secondParser, "@annually", annual(time.Local, "@annually")}, { parser: secondParser, expr: "* 5 * * * *", @@ -166,6 +166,7 @@ func TestParseSchedule(t *testing.T) { Month: all(months), Dow: all(dow), Location: time.Local, + CronExpr: "* 5 * * * *", }, }, } @@ -187,9 +188,9 @@ func TestOptionalSecondSchedule(t *testing.T) { expr string expected Schedule }{ - {"0 5 * * * *", every5min(time.Local)}, - {"5 5 * * * *", every5min5s(time.Local)}, - {"5 * * * *", every5min(time.Local)}, + {"0 5 * * * *", every5min(time.Local, "0 5 * * * *")}, + {"5 5 * * * *", every5min5s(time.Local, "5 5 * * * *")}, + {"5 * * * *", every5min(time.Local, "5 * * * *")}, } for _, c := range entries { @@ -320,7 +321,7 @@ func TestStandardSpecSchedule(t *testing.T) { }{ { expr: "5 * * * *", - expected: &SpecSchedule{1 << seconds.min, 1 << 5, all(hours), all(dom), all(months), all(dow), time.Local}, + expected: &SpecSchedule{1 << seconds.min, 1 << 5, all(hours), all(dom), all(months), all(dow), time.Local, "5 * * * *"}, }, { expr: "@every 5m", @@ -358,19 +359,19 @@ func TestNoDescriptorParser(t *testing.T) { } } -func every5min(loc *time.Location) *SpecSchedule { - return &SpecSchedule{1 << 0, 1 << 5, all(hours), all(dom), all(months), all(dow), loc} +func every5min(loc *time.Location, spec string) *SpecSchedule { + return &SpecSchedule{1 << 0, 1 << 5, all(hours), all(dom), all(months), all(dow), loc, spec} } -func every5min5s(loc *time.Location) *SpecSchedule { - return &SpecSchedule{1 << 5, 1 << 5, all(hours), all(dom), all(months), all(dow), loc} +func every5min5s(loc *time.Location, spec string) *SpecSchedule { + return &SpecSchedule{1 << 5, 1 << 5, all(hours), all(dom), all(months), all(dow), loc, spec} } -func midnight(loc *time.Location) *SpecSchedule { - return &SpecSchedule{1, 1, 1, all(dom), all(months), all(dow), loc} +func midnight(loc *time.Location, spec string) *SpecSchedule { + return &SpecSchedule{1, 1, 1, all(dom), all(months), all(dow), loc, spec} } -func annual(loc *time.Location) *SpecSchedule { +func annual(loc *time.Location, spec string) *SpecSchedule { return &SpecSchedule{ Second: 1 << seconds.min, Minute: 1 << minutes.min, @@ -379,5 +380,69 @@ func annual(loc *time.Location) *SpecSchedule { Month: 1 << months.min, Dow: all(dow), Location: loc, + CronExpr: spec, } } + +// Test Last Day of Month 'L' +func TestLDOM(t *testing.T) { + parser := NewParser(SecondOptional | Minute | Hour | Dom | Month | Dow | Descriptor) + tests := []struct { + time, spec string + expected bool + }{ + {"Tue Jul 31 01:10:30 2021", "30 10 1 L * ?", true}, + {"Tue Jul 31 00:00:00 2021", "0 0 0 L * *", true}, + {"Mon Jan 31 00:00:00 2022", "30 10 1 L 1 ?", false}, + } + + for _, test := range tests { + sched, err := parser.Parse(test.spec) + if err != nil { + t.Error(err) + continue + } + actual := sched.Next(getTime(test.time).Add(-1 * time.Second)) + expected := getTime(test.time) + if test.expected && expected != actual || !test.expected && expected == actual { + t.Errorf("Fail evaluating %s on %s: (expected) %s != %s (actual)", + test.spec, test.time, expected, actual) + } + } +} + +// Test Next activation time for Last Day of Month 'L' +func TestLDOMNext(t *testing.T) { + parser := NewParser(SecondOptional | Minute | Hour | Dom | Month | Dow | Descriptor) + runs := []struct { + time, spec string + expected string + }{ + {"TZ=America/New_York 2012-11-04T03:00:00-0500", "0 0 0 L * ?", "2012-11-30T00:00:00-0500"}, + {"TZ=Asia/Kolkata 2021-12-16T12:00:00+0530", "30 10 1 L * ?", "2021-12-31T01:10:30+0530"}, + {"2021-01-01T12:00:00+0000", "30 10 1 L * ?", "2021-01-31T01:10:30+0000"}, + {"2021-01-31T12:00:00+0000", "0 50 5 L * ?", "2021-02-28T05:50:00+0000"}, + {"2021-02-27T11:00:00+0000", "0 0 1 L * ?", "2021-02-28T01:00:00+0000"}, + {"2024-01-31T10:40:10+0530", "40 10 10 L * ?", "2024-02-29T10:10:40+0530"}, + {"2024-02-15T10:40:10+0530", "40 10 10 L * ?", "2024-02-29T10:10:40+0530"}, + {"TZ=UTC 2020-01-31T23:00:00+0000", "0 0 9 L * ?", "2020-02-29T09:00:00+0000"}, + {"2021-01-31T10:30:00+0000", "0 0 18 L * ?", "2021-01-31T18:00:00+0000"}, + {"2024-01-30T12:00:00+0530", "0 55 23 L * *", "2024-01-31T23:55:00+0530"}, + {"2024-01-31T00:00:00+0530", "0 30 2 L * *", "2024-01-31T02:30:00+0530"}, + {"2024-01-30T23:59:00+0530", "0 0 0 L * *", "2024-01-31T00:00:00+0530"}, + {"2024-01-31T23:59:00+0530", "0 0 0 L * *", "2024-02-29T00:00:00+0530"}, + } + + for _, c := range runs { + sched, err := parser.Parse(c.spec) + if err != nil { + t.Error(err) + continue + } + actual := sched.Next(getTime(c.time)) + expected := getTime(c.expected) + if !actual.Equal(expected) { + t.Errorf("%s, \"%s\": (expected) %v != %v (actual)", c.time, c.spec, expected, actual) + } + } +} \ No newline at end of file diff --git a/spec.go b/spec.go index fa1e241e..f6832669 100644 --- a/spec.go +++ b/spec.go @@ -1,6 +1,9 @@ package cron -import "time" +import ( + "strings" + "time" +) // SpecSchedule specifies a duty cycle (to the second granularity), based on a // traditional crontab specification. It is computed initially and stored as bit sets. @@ -9,6 +12,9 @@ type SpecSchedule struct { // Override location for this schedule. Location *time.Location + + // Cron Expression for this schedule + CronExpr string } // bounds provides a range of acceptable values (plus a map of name to value). @@ -87,6 +93,18 @@ func (s *SpecSchedule) Next(t time.Time) time.Time { // If no time is found within five years, return zero. yearLimit := t.Year() + 5 + // check if last day of month is present in cron spec, if so, update the bits as per current month + edom := false + fields := strings.Fields(s.CronExpr) + + for idx := range fields { + if fields[idx] == "L" { + edom = true + s.Dom = getBits(1, uint(ldom(t)), 1) + break + } + } + WRAP: if t.Year() > yearLimit { return time.Time{} @@ -171,6 +189,13 @@ WRAP: } } + if edom { + year, month, _ := t.In(origLocation).Date() + h, m, s := t.In(origLocation).Clock() + + return time.Date(year, month+1, 0, h, m, s, 0, origLocation) + } + return t.In(origLocation) } @@ -186,3 +211,28 @@ func dayMatches(s *SpecSchedule, t time.Time) bool { } return domMatch || dowMatch } + +// ldom returns the last day of the month in the specified time object +func ldom(t time.Time) int { + + var ( + eom int + leapYear int + ) + + year := t.Year() + if (year%4 == 0 && year%100 != 0) || year%400 == 0 { + leapYear = 1 + } + + switch t.Month() { + case time.April, time.June, time.September, time.November: + eom = 30 + case time.February: + eom = 28 + leapYear + default: + eom = 31 + } + + return eom +}