Skip to content

Commit

Permalink
1.schedule代码优化,2.examples新增调用示例
Browse files Browse the repository at this point in the history
  • Loading branch information
keepchen committed Dec 28, 2023
1 parent 68ad55b commit 5b0dc48
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 26 deletions.
11 changes: 8 additions & 3 deletions examples/pkg/app/user/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,15 @@ func StartServer(wg *sync.WaitGroup) {
}
after = func() {
fmt.Println("call user function [after] to do something...")
cancel := schedule.Job("print now datetime", func() {
cancel0 := schedule.Job("print now datetime", func() {
fmt.Println("now: ", utils.FormatDate(time.Now(), utils.YYYY_MM_DD_HH_MM_SS_EN))
}).RunAt("* * * * *")
time.AfterFunc(time.Minute*3, cancel)
}).RunAt(schedule.EveryMinute)
time.AfterFunc(time.Minute*3, cancel0)

cancel1 := schedule.Job("print hello", func() {
fmt.Println(utils.FormatDate(time.Now(), utils.YYYY_MM_DD_HH_MM_SS_EN), "hello")
}).EverySecond()
time.AfterFunc(time.Second*5, cancel1)
}
)

Expand Down
1 change: 1 addition & 0 deletions schedule/crontabexpr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package schedule

// 一些常用的crontab表达式
const (
EveryMinute = "* * * * *" //每分钟的开始第0秒
FirstDayOfMonth = "0 0 1 * *" //每月的第一天
LastDayOfMonth = "0 0 L * *" //每月的最后一天
FirstDayOfWeek = "0 0 * * 1" //每周的第一天(周一)
Expand Down
62 changes: 39 additions & 23 deletions schedule/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,26 @@ func Job(name string, task func()) *TaskJob {

// WithoutOverlapping 禁止并发执行
//
// 同一时刻仅允许一个任务线程执行
// 一个任务仅允许存在一个运行态
//
// Note: 该方法使用redis锁来保证唯一性
// Note: 该方法使用redis锁来保证唯一性,
//
// 因此请确保先使用 redis.InitRedis 或
//
// redis.InitRedisCluster 实例化redis连接
func (j *TaskJob) WithoutOverlapping() *TaskJob {
j.withoutOverlapping = true

return j
}

// Every 每隔多久执行一次
//
// Note: interval至少需要大于等于1毫秒,否则将被设置为1毫秒
func (j *TaskJob) Every(interval time.Duration) (cancel func()) {
if interval.Milliseconds() < 1 {
interval = time.Millisecond
}
j.interval = interval
j.run()

Expand Down Expand Up @@ -205,24 +214,30 @@ func (j *TaskJob) Yearly() (cancel func()) {
func (j *TaskJob) run() {
go func() {
ticker := time.NewTicker(j.interval)
defer ticker.Stop()
wrappedFunc := func() {
if !j.withoutOverlapping {
j.task()
return
}
if utils.RedisLock(j.lockerKey) {
func() {
defer utils.RedisUnlock(j.lockerKey)
j.task()
}()
}
}
LISTEN:
for {
select {
case <-ticker.C:
go wrappedFunc()
//收到退出信号,终止任务
case <-j.cancelTaskChan:
if j.withoutOverlapping {
if utils.RedisLock(j.lockerKey) {
go func() {
defer utils.RedisUnlock(j.lockerKey)
j.task()
}()
}
} else {
go j.task()
utils.RedisUnlock(j.lockerKey)
}
//shutdown gracefully.
case <-j.cancelTaskChan:
utils.RedisUnlock(j.lockerKey)
ticker.Stop()
break
break LISTEN
}
}
}()
Expand Down Expand Up @@ -253,16 +268,17 @@ func (j *TaskJob) RunAt(crontabExpr string) (cancel func()) {
cronJob.Start()
})

//因为AddFunc内部是协程启动,因此这里的方法使用同步方式调用
wrappedTaskFunc := func() {
if j.withoutOverlapping {
if utils.RedisLock(j.lockerKey) {
go func() {
defer utils.RedisUnlock(j.lockerKey)
j.task()
}()
}
} else {
if !j.withoutOverlapping {
j.task()
return
}
if utils.RedisLock(j.lockerKey) {
func() {
defer utils.RedisUnlock(j.lockerKey)
j.task()
}()
}
}

Expand Down

0 comments on commit 5b0dc48

Please sign in to comment.