Skip to content

Commit

Permalink
add another out channel so we can properly report lastRun (#700)
Browse files Browse the repository at this point in the history
* add another out channel so we can properly report lastRun

* don't block on channel send

* add tests
  • Loading branch information
JohnRoesler committed Mar 26, 2024
1 parent 5b1cf9c commit 9ae7545
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 49 deletions.
55 changes: 30 additions & 25 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@ import (
)

type executor struct {
ctx context.Context
cancel context.CancelFunc
logger Logger
stopCh chan struct{}
jobsIn chan jobIn
jobIDsOut chan uuid.UUID
jobOutRequest chan jobOutRequest
stopTimeout time.Duration
done chan error
singletonRunners *sync.Map // map[uuid.UUID]singletonRunner
limitMode *limitModeConfig
elector Elector
locker Locker
monitor Monitor
ctx context.Context
cancel context.CancelFunc
logger Logger
stopCh chan struct{}
jobsIn chan jobIn
jobsOutForRescheduling chan uuid.UUID
jobsOutCompleted chan uuid.UUID
jobOutRequest chan jobOutRequest
stopTimeout time.Duration
done chan error
singletonRunners *sync.Map // map[uuid.UUID]singletonRunner
limitMode *limitModeConfig
elector Elector
locker Locker
monitor Monitor
}

type jobIn struct {
Expand Down Expand Up @@ -122,7 +123,7 @@ func (e *executor) start() {
// all runners are busy, reschedule the work for later
// which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric
e.sendOutToScheduler(&jIn)
e.sendOutForRescheduling(&jIn)
}
} else {
// since we're not using LimitModeReschedule, but instead using LimitModeWait
Expand All @@ -131,7 +132,7 @@ func (e *executor) start() {
// at which point this call would block.
// TODO when metrics are added, this should increment a wait metric
e.limitMode.in <- jIn
e.sendOutToScheduler(&jIn)
e.sendOutForRescheduling(&jIn)
}
} else {
// no limit mode, so we're either running a regular job or
Expand Down Expand Up @@ -167,17 +168,17 @@ func (e *executor) start() {
select {
case runner.rescheduleLimiter <- struct{}{}:
runner.in <- jIn
e.sendOutToScheduler(&jIn)
e.sendOutForRescheduling(&jIn)
default:
// runner is busy, reschedule the work for later
// which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric
e.sendOutToScheduler(&jIn)
e.sendOutForRescheduling(&jIn)
}
} else {
// wait mode, fill up that queue (buffered channel, so it's ok)
runner.in <- jIn
e.sendOutToScheduler(&jIn)
e.sendOutForRescheduling(&jIn)
}
} else {
select {
Expand Down Expand Up @@ -206,10 +207,10 @@ func (e *executor) start() {
}
}

func (e *executor) sendOutToScheduler(jIn *jobIn) {
func (e *executor) sendOutForRescheduling(jIn *jobIn) {
if jIn.shouldSendOut {
select {
case e.jobIDsOut <- jIn.id:
case e.jobsOutForRescheduling <- jIn.id:
case <-e.ctx.Done():
return
}
Expand Down Expand Up @@ -250,7 +251,7 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith
return
case <-j.ctx.Done():
return
case e.jobIDsOut <- j.id:
case e.jobsOutForRescheduling <- j.id:
}
}
// remove the limiter block, as this particular job
Expand Down Expand Up @@ -331,20 +332,24 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {

if e.elector != nil {
if err := e.elector.IsLeader(j.ctx); err != nil {
e.sendOutToScheduler(&jIn)
e.sendOutForRescheduling(&jIn)
return
}
} else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil {
e.sendOutToScheduler(&jIn)
e.sendOutForRescheduling(&jIn)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
}
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)

e.sendOutToScheduler(&jIn)
e.sendOutForRescheduling(&jIn)
select {
case e.jobsOutCompleted <- j.id:
case <-e.ctx.Done():
}

startTime := time.Now()
err := callJobFuncWithParams(j.function, j.parameters...)
Expand Down
24 changes: 13 additions & 11 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ type internalJob struct {
name string
tags []string
jobSchedule
lastRun, nextRun time.Time
lastScheduledRun time.Time
nextScheduled time.Time
lastRun time.Time
function any
parameters []any
timer clockwork.Timer
Expand Down Expand Up @@ -681,18 +683,18 @@ func (d dailyJob) next(lastRun time.Time) time.Time {

func (d dailyJob) nextDay(lastRun time.Time, firstPass bool) time.Time {
for _, at := range d.atTimes {
// sub the at time hour/min/sec onto the lastRun's values
// sub the at time hour/min/sec onto the lastScheduledRun's values
// to use in checks to see if we've got our next run time
atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day(), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location())

if firstPass && atDate.After(lastRun) {
// checking to see if it is after i.e. greater than,
// and not greater or equal as our lastRun day/time
// and not greater or equal as our lastScheduledRun day/time
// will be in the loop, and we don't want to select it again
return atDate
} else if !firstPass && !atDate.Before(lastRun) {
// now that we're looking at the next day, it's ok to consider
// the same at time that was last run (as lastRun has been incremented)
// the same at time that was last run (as lastScheduledRun has been incremented)
return atDate
}
}
Expand Down Expand Up @@ -727,18 +729,18 @@ func (w weeklyJob) nextWeekDayAtTime(lastRun time.Time, firstPass bool) time.Tim
// weekDayDiff is used to add the correct amount to the atDate day below
weekDayDiff := wd - lastRun.Weekday()
for _, at := range w.atTimes {
// sub the at time hour/min/sec onto the lastRun's values
// sub the at time hour/min/sec onto the lastScheduledRun's values
// to use in checks to see if we've got our next run time
atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day()+int(weekDayDiff), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location())

if firstPass && atDate.After(lastRun) {
// checking to see if it is after i.e. greater than,
// and not greater or equal as our lastRun day/time
// and not greater or equal as our lastScheduledRun day/time
// will be in the loop, and we don't want to select it again
return atDate
} else if !firstPass && !atDate.Before(lastRun) {
// now that we're looking at the next week, it's ok to consider
// the same at time that was last run (as lastRun has been incremented)
// the same at time that was last run (as lastScheduledRun has been incremented)
return atDate
}
}
Expand Down Expand Up @@ -795,7 +797,7 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass
for _, day := range days {
if day >= lastRun.Day() {
for _, at := range m.atTimes {
// sub the day, and the at time hour/min/sec onto the lastRun's values
// sub the day, and the at time hour/min/sec onto the lastScheduledRun's values
// to use in checks to see if we've got our next run time
atDate := time.Date(lastRun.Year(), lastRun.Month(), day, at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location())

Expand All @@ -807,12 +809,12 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass

if firstPass && atDate.After(lastRun) {
// checking to see if it is after i.e. greater than,
// and not greater or equal as our lastRun day/time
// and not greater or equal as our lastScheduledRun day/time
// will be in the loop, and we don't want to select it again
return atDate
} else if !firstPass && !atDate.Before(lastRun) {
// now that we're looking at the next month, it's ok to consider
// the same at time that was lastRun (as lastRun has been incremented)
// the same at time that was lastScheduledRun (as lastScheduledRun has been incremented)
return atDate
}
}
Expand Down Expand Up @@ -892,7 +894,7 @@ func (j job) NextRun() (time.Time, error) {
if ij == nil || ij.id == uuid.Nil {
return time.Time{}, ErrJobNotFound
}
return ij.nextRun, nil
return ij.nextScheduled, nil
}

func (j job) Tags() []string {
Expand Down
37 changes: 25 additions & 12 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,11 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
singletonRunners: nil,
logger: &noOpLogger{},

jobsIn: make(chan jobIn),
jobIDsOut: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000),
done: make(chan error),
jobsIn: make(chan jobIn),
jobsOutForRescheduling: make(chan uuid.UUID),
jobsOutCompleted: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000),
done: make(chan error),
}

s := &scheduler{
Expand Down Expand Up @@ -147,8 +148,11 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
s.logger.Info("gocron: new scheduler created")
for {
select {
case id := <-s.exec.jobIDsOut:
s.selectExecJobIDsOut(id)
case id := <-s.exec.jobsOutForRescheduling:
s.selectExecJobsOutForRescheduling(id)

case id := <-s.exec.jobsOutCompleted:
s.selectExecJobsOutCompleted(id)

case in := <-s.newJobCh:
s.selectNewJob(in)
Expand Down Expand Up @@ -287,14 +291,14 @@ func (s *scheduler) selectRemoveJob(id uuid.UUID) {

// Jobs coming back from the executor to the scheduler that
// need to evaluated for rescheduling.
func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
j, ok := s.jobs[id]
if !ok {
// the job was removed while it was running, and
// so we don't need to reschedule it.
return
}
j.lastRun = j.nextRun
j.lastScheduledRun = j.nextScheduled

// if the job has a limited number of runs set, we need to
// check how many runs have occurred and stop running this
Expand All @@ -313,7 +317,7 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
}
}

next := j.next(j.lastRun)
next := j.next(j.lastScheduledRun)
if next.IsZero() {
// the job's next function will return zero for OneTime jobs.
// since they are one time only, they do not need rescheduling.
Expand All @@ -329,7 +333,7 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
next = j.next(next)
}
}
j.nextRun = next
j.nextScheduled = next
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
// set the actual timer on the job here and listen for
// shut down events so that the job doesn't attempt to
Expand All @@ -347,6 +351,15 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
s.jobs[id] = j
}

func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
j, ok := s.jobs[id]
if !ok {
return
}
j.lastRun = s.now()
s.jobs[id] = j
}

func (s *scheduler) selectJobOutRequest(out jobOutRequest) {
if j, ok := s.jobs[out.id]; ok {
select {
Expand Down Expand Up @@ -386,7 +399,7 @@ func (s *scheduler) selectNewJob(in newJobIn) {
}
})
}
j.nextRun = next
j.nextScheduled = next
}

s.jobs[j.id] = j
Expand Down Expand Up @@ -437,7 +450,7 @@ func (s *scheduler) selectStart() {
}
})
}
j.nextRun = next
j.nextScheduled = next
s.jobs[id] = j
}
select {
Expand Down
Loading

0 comments on commit 9ae7545

Please sign in to comment.