Skip to content

Commit

Permalink
feat(scheduler)!: support custom mutex for job queue operations (#133)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed May 4, 2024
1 parent b3ad0a9 commit 8017e08
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 28 deletions.
2 changes: 1 addition & 1 deletion examples/queue/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {
jobQueue := newJobQueue()
scheduler := quartz.NewStdSchedulerWithOptions(quartz.StdSchedulerOptions{
OutdatedThreshold: time.Second, // considering file system I/O latency
}, jobQueue)
}, jobQueue, nil)
scheduler.Start(ctx)

jobQueueSize, err := jobQueue.Size()
Expand Down
58 changes: 33 additions & 25 deletions quartz/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type StdScheduler struct {
mtx sync.Mutex
wg sync.WaitGroup
queue JobQueue
queueMtx sync.Locker
interrupt chan struct{}
cancel context.CancelFunc
feeder chan ScheduledJob
Expand Down Expand Up @@ -125,30 +126,37 @@ type StdSchedulerOptions struct {
// Verify StdScheduler satisfies the Scheduler interface.
var _ Scheduler = (*StdScheduler)(nil)

// NewStdScheduler returns a new StdScheduler with the default
// configuration.
// NewStdScheduler returns a new StdScheduler with the default configuration.
func NewStdScheduler() Scheduler {
return NewStdSchedulerWithOptions(StdSchedulerOptions{
OutdatedThreshold: 100 * time.Millisecond,
RetryInterval: 100 * time.Millisecond,
}, nil)
}, nil, nil)
}

// NewStdSchedulerWithOptions returns a new StdScheduler configured
// as specified.
// A custom JobQueue implementation can be provided to manage scheduled
// jobs. This can be useful when distributed mode is required, so that
// jobs can be stored in persistent storage.
// Pass in nil to use the internal in-memory implementation.
// NewStdSchedulerWithOptions returns a new StdScheduler configured as specified.
//
// A custom [JobQueue] implementation may be provided to manage scheduled jobs.
// This is useful when distributed mode is required, so that jobs can be stored
// in persistent storage. Pass in nil to use the internal in-memory implementation.
//
// A custom [sync.Locker] may also be provided to ensure that scheduler operations
// on the job queue are atomic when used in distributed mode. Pass in nil to use
// the default [sync.Mutex].
func NewStdSchedulerWithOptions(
opts StdSchedulerOptions,
jobQueue JobQueue,
jobQueueMtx sync.Locker,
) *StdScheduler {
if jobQueue == nil {
jobQueue = newJobQueue()
}
if jobQueueMtx == nil {
jobQueueMtx = &sync.Mutex{}
}
return &StdScheduler{
queue: jobQueue,
queueMtx: jobQueueMtx,
interrupt: make(chan struct{}, 1),
feeder: make(chan ScheduledJob),
dispatch: make(chan ScheduledJob),
Expand All @@ -161,8 +169,8 @@ func (sched *StdScheduler) ScheduleJob(
jobDetail *JobDetail,
trigger Trigger,
) error {
sched.mtx.Lock()
defer sched.mtx.Unlock()
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

if jobDetail == nil {
return illegalArgumentError("jobDetail is nil")
Expand Down Expand Up @@ -244,8 +252,8 @@ func (sched *StdScheduler) IsStarted() bool {
// For a job key to be returned, the job must satisfy all of the matchers specified.
// Given no matchers, it returns the keys of all scheduled jobs.
func (sched *StdScheduler) GetJobKeys(matchers ...Matcher[ScheduledJob]) ([]*JobKey, error) {
sched.mtx.Lock()
defer sched.mtx.Unlock()
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

scheduledJobs, err := sched.queue.ScheduledJobs(matchers)
if err != nil {
Expand All @@ -260,8 +268,8 @@ func (sched *StdScheduler) GetJobKeys(matchers ...Matcher[ScheduledJob]) ([]*Job

// GetScheduledJob returns the ScheduledJob with the specified key.
func (sched *StdScheduler) GetScheduledJob(jobKey *JobKey) (ScheduledJob, error) {
sched.mtx.Lock()
defer sched.mtx.Unlock()
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

if jobKey == nil {
return nil, illegalArgumentError("jobKey is nil")
Expand All @@ -271,8 +279,8 @@ func (sched *StdScheduler) GetScheduledJob(jobKey *JobKey) (ScheduledJob, error)

// DeleteJob removes the Job with the specified key if present.
func (sched *StdScheduler) DeleteJob(jobKey *JobKey) error {
sched.mtx.Lock()
defer sched.mtx.Unlock()
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

if jobKey == nil {
return illegalArgumentError("jobKey is nil")
Expand All @@ -290,8 +298,8 @@ func (sched *StdScheduler) DeleteJob(jobKey *JobKey) error {
// PauseJob suspends the job with the specified key from being
// executed by the scheduler.
func (sched *StdScheduler) PauseJob(jobKey *JobKey) error {
sched.mtx.Lock()
defer sched.mtx.Unlock()
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

if jobKey == nil {
return illegalArgumentError("jobKey is nil")
Expand Down Expand Up @@ -324,8 +332,8 @@ func (sched *StdScheduler) PauseJob(jobKey *JobKey) error {

// ResumeJob restarts the suspended job with the specified key.
func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error {
sched.mtx.Lock()
defer sched.mtx.Unlock()
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

if jobKey == nil {
return illegalArgumentError("jobKey is nil")
Expand Down Expand Up @@ -362,8 +370,8 @@ func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error {

// Clear removes all of the scheduled jobs.
func (sched *StdScheduler) Clear() error {
sched.mtx.Lock()
defer sched.mtx.Unlock()
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

// reset the job queue
err := sched.queue.Clear()
Expand Down Expand Up @@ -545,8 +553,8 @@ func (sched *StdScheduler) validateJob(job ScheduledJob) (bool, func() (int64, e
}

func (sched *StdScheduler) fetchAndReschedule() (ScheduledJob, bool) {
sched.mtx.Lock()
defer sched.mtx.Unlock()
sched.queueMtx.Lock()
defer sched.queueMtx.Unlock()

// fetch a job for processing
job, err := sched.queue.Pop()
Expand Down
4 changes: 2 additions & 2 deletions quartz/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestScheduler_BlockingSemantics(t *testing.T) {

opts.OutdatedThreshold = 10 * time.Millisecond

sched := quartz.NewStdSchedulerWithOptions(opts, nil)
sched := quartz.NewStdSchedulerWithOptions(opts, nil, nil)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
sched.Start(ctx)
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestScheduler_MisfiredJob(t *testing.T) {
OutdatedThreshold: time.Millisecond,
RetryInterval: time.Millisecond,
MisfiredChan: misfiredChan,
}, nil)
}, nil, nil)

jobDetail := quartz.NewJobDetail(funcJob, quartz.NewJobKey("funcJob"))
err := sched.ScheduleJob(jobDetail, quartz.NewSimpleTrigger(2*time.Millisecond))
Expand Down

0 comments on commit 8017e08

Please sign in to comment.