diff --git a/examples/pkg/app/user/user.go b/examples/pkg/app/user/user.go index ec748b9..a733678 100644 --- a/examples/pkg/app/user/user.go +++ b/examples/pkg/app/user/user.go @@ -88,10 +88,15 @@ func StartServer(wg *sync.WaitGroup) { } after = func() { fmt.Println("call user function [after] to do something...") - cancel := schedule.Job("print now datetime", func() { + cancel0 := schedule.Job("print now datetime", func() { fmt.Println("now: ", utils.FormatDate(time.Now(), utils.YYYY_MM_DD_HH_MM_SS_EN)) - }).RunAt("* * * * *") - time.AfterFunc(time.Minute*3, cancel) + }).RunAt(schedule.EveryMinute) + time.AfterFunc(time.Minute*3, cancel0) + + cancel1 := schedule.Job("print hello", func() { + fmt.Println(utils.FormatDate(time.Now(), utils.YYYY_MM_DD_HH_MM_SS_EN), "hello") + }).EverySecond() + time.AfterFunc(time.Second*5, cancel1) } ) diff --git a/schedule/crontabexpr.go b/schedule/crontabexpr.go index 221fdf6..86e581c 100644 --- a/schedule/crontabexpr.go +++ b/schedule/crontabexpr.go @@ -2,6 +2,7 @@ package schedule // 一些常用的crontab表达式 const ( + EveryMinute = "* * * * *" //每分钟的开始第0秒 FirstDayOfMonth = "0 0 1 * *" //每月的第一天 LastDayOfMonth = "0 0 L * *" //每月的最后一天 FirstDayOfWeek = "0 0 * * 1" //每周的第一天(周一) diff --git a/schedule/schedule.go b/schedule/schedule.go index d6de57a..6cf87c6 100644 --- a/schedule/schedule.go +++ b/schedule/schedule.go @@ -52,9 +52,13 @@ func Job(name string, task func()) *TaskJob { // WithoutOverlapping 禁止并发执行 // -// 同一时刻仅允许一个任务线程执行 +// 一个任务仅允许存在一个运行态 // -// Note: 该方法使用redis锁来保证唯一性 +// Note: 该方法使用redis锁来保证唯一性, +// +// 因此请确保先使用 redis.InitRedis 或 +// +// redis.InitRedisCluster 实例化redis连接 func (j *TaskJob) WithoutOverlapping() *TaskJob { j.withoutOverlapping = true @@ -62,7 +66,12 @@ func (j *TaskJob) WithoutOverlapping() *TaskJob { } // Every 每隔多久执行一次 +// +// Note: interval至少需要大于等于1毫秒,否则将被设置为1毫秒 func (j *TaskJob) Every(interval time.Duration) (cancel func()) { + if interval.Milliseconds() < 1 { + interval = time.Millisecond + } j.interval = interval j.run() @@ -205,24 +214,30 @@ func (j *TaskJob) Yearly() (cancel func()) { func (j *TaskJob) run() { go func() { ticker := time.NewTicker(j.interval) + defer ticker.Stop() + wrappedFunc := func() { + if !j.withoutOverlapping { + j.task() + return + } + if utils.RedisLock(j.lockerKey) { + func() { + defer utils.RedisUnlock(j.lockerKey) + j.task() + }() + } + } + LISTEN: for { select { case <-ticker.C: + go wrappedFunc() + //收到退出信号,终止任务 + case <-j.cancelTaskChan: if j.withoutOverlapping { - if utils.RedisLock(j.lockerKey) { - go func() { - defer utils.RedisUnlock(j.lockerKey) - j.task() - }() - } - } else { - go j.task() + utils.RedisUnlock(j.lockerKey) } - //shutdown gracefully. - case <-j.cancelTaskChan: - utils.RedisUnlock(j.lockerKey) - ticker.Stop() - break + break LISTEN } } }() @@ -253,16 +268,17 @@ func (j *TaskJob) RunAt(crontabExpr string) (cancel func()) { cronJob.Start() }) + //因为AddFunc内部是协程启动,因此这里的方法使用同步方式调用 wrappedTaskFunc := func() { - if j.withoutOverlapping { - if utils.RedisLock(j.lockerKey) { - go func() { - defer utils.RedisUnlock(j.lockerKey) - j.task() - }() - } - } else { + if !j.withoutOverlapping { j.task() + return + } + if utils.RedisLock(j.lockerKey) { + func() { + defer utils.RedisUnlock(j.lockerKey) + j.task() + }() } }