Skip to content

Commit dd1f72d

Browse files
committed
feat(jobscheduler): add jobs after missed tick
1 parent e9003f4 commit dd1f72d

File tree

4 files changed

+57
-24
lines changed

4 files changed

+57
-24
lines changed

basis.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ const (
4141
timestampFormat = "2006-01-02 15:04:05 -0700"
4242

4343
debounceInterval = 100 * time.Millisecond
44+
maxMissedTime = time.Hour
4445
runInterval = time.Second
4546
scheduleInterval = time.Minute
4647

jobconfig.go

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,9 @@ func (j JobConfig) QueueName() string {
3333
return j.Queue
3434
}
3535

36-
func (j JobConfig) schedule(runner jobRunner) error {
36+
func (j JobConfig) shouldRun(t time.Time, lastCompleted *CompletedJob) (bool, error) {
3737
if !j.Enabled {
38-
return nil
39-
}
40-
41-
lastCompleted, err := runner.lastCompleted(j.Name)
42-
if err != nil {
43-
return err
38+
return false, nil
4439
}
4540

4641
exitStatus := -1
@@ -52,31 +47,30 @@ func (j JobConfig) schedule(runner jobRunner) error {
5247
started = int(lastCompleted.Started.Unix())
5348
}
5449

55-
now := time.Now()
5650
kvpairs := []starlark.Tuple{
5751
starlark.Tuple{
5852
starlark.String("minute"),
59-
starlark.MakeInt(now.Minute()),
53+
starlark.MakeInt(t.Minute()),
6054
},
6155
starlark.Tuple{
6256
starlark.String("hour"),
63-
starlark.MakeInt(now.Hour()),
57+
starlark.MakeInt(t.Hour()),
6458
},
6559
starlark.Tuple{
6660
starlark.String("day"),
67-
starlark.MakeInt(now.Day()),
61+
starlark.MakeInt(t.Day()),
6862
},
6963
starlark.Tuple{
7064
starlark.String("month"),
71-
starlark.MakeInt(int(now.Month())),
65+
starlark.MakeInt(int(t.Month())),
7266
},
7367
starlark.Tuple{
7468
starlark.String("dow"),
75-
starlark.MakeInt(int(now.Weekday())),
69+
starlark.MakeInt(int(t.Weekday())),
7670
},
7771
starlark.Tuple{
7872
starlark.String("timestamp"),
79-
starlark.MakeInt(int(now.Unix())),
73+
starlark.MakeInt(int(t.Unix())),
8074
},
8175
starlark.Tuple{
8276
starlark.String("exit_status"),
@@ -95,18 +89,35 @@ func (j JobConfig) schedule(runner jobRunner) error {
9589
thread := &starlark.Thread{Name: "schedule"}
9690
result, err := starlark.Call(thread, j.ShouldRun, nil, kvpairs)
9791
if err != nil {
98-
return fmt.Errorf(`failed to call "should_run": %v`, err)
92+
return false, fmt.Errorf(`failed to call "should_run": %v`, err)
9993
}
10094

10195
switch result {
10296

10397
case starlark.False:
98+
return false, nil
10499

105100
case starlark.True:
106-
runner.addJob(j)
101+
return true, nil
107102

108103
default:
109-
return fmt.Errorf(`"should_run" returned bad value: %v`, result)
104+
return false, fmt.Errorf(`"should_run" returned bad value: %v`, result)
105+
}
106+
}
107+
108+
func (j JobConfig) addToQueueIfDue(runner jobRunner, t time.Time) error {
109+
lastCompleted, err := runner.lastCompleted(j.Name)
110+
if err != nil {
111+
return err
112+
}
113+
114+
shouldRun, err := j.shouldRun(t, lastCompleted)
115+
if err != nil {
116+
return err
117+
}
118+
119+
if shouldRun {
120+
runner.addJob(j)
110121
}
111122

112123
return nil

jobscheduler.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@ func newJobScheduler() jobScheduler {
3535
}
3636
}
3737

38-
func (jsc jobScheduler) scheduleOnce(runner jobRunner) error {
38+
func (jsc jobScheduler) addDueJobsToQueue(runner jobRunner, t time.Time) error {
3939
jsc.mu.RLock()
4040
defer jsc.mu.RUnlock()
4141

4242
for name, job := range jsc.byName {
43-
err := job.schedule(runner)
43+
err := job.addToQueueIfDue(runner, t)
4444
if err != nil {
4545
return newJobError(name, fmt.Errorf("scheduling error: %w", err))
4646
}
@@ -53,15 +53,34 @@ func (jsc jobScheduler) schedule(runner jobRunner) error {
5353
ticker := time.NewTicker(scheduleInterval)
5454
defer ticker.Stop()
5555

56-
err := jsc.scheduleOnce(runner)
56+
current := time.Now()
57+
var last time.Time
58+
59+
err := jsc.addDueJobsToQueue(runner, current)
5760
if err != nil {
5861
return err
5962
}
6063

6164
for range ticker.C {
62-
err := jsc.scheduleOnce(runner)
63-
if err != nil {
64-
return err
65+
last = current
66+
current = time.Now()
67+
68+
// Account for missed time.
69+
// Do not run missed jobs if more than maxMissedTime has elapsed.
70+
// On an overloaded system, the ticker can miss a minute.
71+
// For example, this may happen because Regular was swapped out.
72+
// It would prevent Regular from running jobs scheduled for that minute and that minute alone.
73+
// The purpose of this approach is to catch up on missed jobs.
74+
// However, we should run days' worth of missed jobs after system hibernation.
75+
if current.Sub(last) > maxMissedTime {
76+
last = current
77+
}
78+
79+
for t := last; t.Before(current); t = t.Add(time.Minute) {
80+
err := jsc.addDueJobsToQueue(runner, t)
81+
if err != nil {
82+
return err
83+
}
6584
}
6685
}
6786

runcmd.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"fmt"
55
"path/filepath"
6+
"time"
67
)
78

89
func (r *RunCmd) Run(config Config) error {
@@ -18,6 +19,7 @@ func (r *RunCmd) Run(config Config) error {
1819
}
1920

2021
jobs := newJobScheduler()
22+
now := time.Now()
2123

2224
for _, jobName := range r.JobNames {
2325
path := filepath.Join(config.ConfigRoot, jobName, jobFileName)
@@ -32,7 +34,7 @@ func (r *RunCmd) Run(config Config) error {
3234
if r.Force {
3335
runner.addJob(*job)
3436
} else {
35-
if err := job.schedule(runner); err != nil {
37+
if err := job.addToQueueIfDue(runner, now); err != nil {
3638
return fmt.Errorf("failed to schedule job %q: %w", job.Name, err)
3739
}
3840
}

0 commit comments

Comments
 (0)