From 5b0dc482c77f5ea5c27f41fbbcce75158fc10e38 Mon Sep 17 00:00:00 2001 From: keepchen Date: Thu, 28 Dec 2023 09:46:37 +0800 Subject: [PATCH] =?UTF-8?q?1.schedule=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96,?= =?UTF-8?q?2.examples=E6=96=B0=E5=A2=9E=E8=B0=83=E7=94=A8=E7=A4=BA?= =?UTF-8?q?=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/pkg/app/user/user.go | 11 +++++-- schedule/crontabexpr.go | 1 + schedule/schedule.go | 62 ++++++++++++++++++++++------------- 3 files changed, 48 insertions(+), 26 deletions(-) diff --git a/examples/pkg/app/user/user.go b/examples/pkg/app/user/user.go index ec748b9..a733678 100644 --- a/examples/pkg/app/user/user.go +++ b/examples/pkg/app/user/user.go @@ -88,10 +88,15 @@ func StartServer(wg *sync.WaitGroup) { } after = func() { fmt.Println("call user function [after] to do something...") - cancel := schedule.Job("print now datetime", func() { + cancel0 := schedule.Job("print now datetime", func() { fmt.Println("now: ", utils.FormatDate(time.Now(), utils.YYYY_MM_DD_HH_MM_SS_EN)) - }).RunAt("* * * * *") - time.AfterFunc(time.Minute*3, cancel) + }).RunAt(schedule.EveryMinute) + time.AfterFunc(time.Minute*3, cancel0) + + cancel1 := schedule.Job("print hello", func() { + fmt.Println(utils.FormatDate(time.Now(), utils.YYYY_MM_DD_HH_MM_SS_EN), "hello") + }).EverySecond() + time.AfterFunc(time.Second*5, cancel1) } ) diff --git a/schedule/crontabexpr.go b/schedule/crontabexpr.go index 221fdf6..86e581c 100644 --- a/schedule/crontabexpr.go +++ b/schedule/crontabexpr.go @@ -2,6 +2,7 @@ package schedule // 一些常用的crontab表达式 const ( + EveryMinute = "* * * * *" //每分钟的开始第0秒 FirstDayOfMonth = "0 0 1 * *" //每月的第一天 LastDayOfMonth = "0 0 L * *" //每月的最后一天 FirstDayOfWeek = "0 0 * * 1" //每周的第一天(周一) diff --git a/schedule/schedule.go b/schedule/schedule.go index d6de57a..6cf87c6 100644 --- a/schedule/schedule.go +++ b/schedule/schedule.go @@ -52,9 +52,13 @@ func Job(name string, task func()) *TaskJob { // WithoutOverlapping 禁止并发执行 // -// 同一时刻仅允许一个任务线程执行 +// 一个任务仅允许存在一个运行态 // -// Note: 该方法使用redis锁来保证唯一性 +// Note: 该方法使用redis锁来保证唯一性, +// +// 因此请确保先使用 redis.InitRedis 或 +// +// redis.InitRedisCluster 实例化redis连接 func (j *TaskJob) WithoutOverlapping() *TaskJob { j.withoutOverlapping = true @@ -62,7 +66,12 @@ func (j *TaskJob) WithoutOverlapping() *TaskJob { } // Every 每隔多久执行一次 +// +// Note: interval至少需要大于等于1毫秒,否则将被设置为1毫秒 func (j *TaskJob) Every(interval time.Duration) (cancel func()) { + if interval.Milliseconds() < 1 { + interval = time.Millisecond + } j.interval = interval j.run() @@ -205,24 +214,30 @@ func (j *TaskJob) Yearly() (cancel func()) { func (j *TaskJob) run() { go func() { ticker := time.NewTicker(j.interval) + defer ticker.Stop() + wrappedFunc := func() { + if !j.withoutOverlapping { + j.task() + return + } + if utils.RedisLock(j.lockerKey) { + func() { + defer utils.RedisUnlock(j.lockerKey) + j.task() + }() + } + } + LISTEN: for { select { case <-ticker.C: + go wrappedFunc() + //收到退出信号,终止任务 + case <-j.cancelTaskChan: if j.withoutOverlapping { - if utils.RedisLock(j.lockerKey) { - go func() { - defer utils.RedisUnlock(j.lockerKey) - j.task() - }() - } - } else { - go j.task() + utils.RedisUnlock(j.lockerKey) } - //shutdown gracefully. - case <-j.cancelTaskChan: - utils.RedisUnlock(j.lockerKey) - ticker.Stop() - break + break LISTEN } } }() @@ -253,16 +268,17 @@ func (j *TaskJob) RunAt(crontabExpr string) (cancel func()) { cronJob.Start() }) + //因为AddFunc内部是协程启动,因此这里的方法使用同步方式调用 wrappedTaskFunc := func() { - if j.withoutOverlapping { - if utils.RedisLock(j.lockerKey) { - go func() { - defer utils.RedisUnlock(j.lockerKey) - j.task() - }() - } - } else { + if !j.withoutOverlapping { j.task() + return + } + if utils.RedisLock(j.lockerKey) { + func() { + defer utils.RedisUnlock(j.lockerKey) + j.task() + }() } }