@@ -20,8 +20,7 @@ package job
2020import (
2121 "context"
2222 "fmt"
23-
24- "github.com/robfig/cron/v3"
23+ "time"
2524
2625 "github.com/polarismesh/polaris/cache"
2726 commonlog "github.com/polarismesh/polaris/common/log"
@@ -36,8 +35,8 @@ var log = commonlog.GetScopeOrDefaultByName(commonlog.DefaultLoggerName)
3635type MaintainJobs struct {
3736 jobs map [string ]maintainJob
3837 startedJobs map [string ]maintainJob
39- scheduler * cron.Cron
4038 storage store.Store
39+ cancel context.CancelFunc
4140}
4241
4342// NewMaintainJobs
@@ -53,13 +52,14 @@ func NewMaintainJobs(namingServer service.DiscoverServer, cacheMgn *cache.CacheM
5352 storage : storage },
5453 },
5554 startedJobs : map [string ]maintainJob {},
56- scheduler : newCron (),
5755 storage : storage ,
5856 }
5957}
6058
6159// StartMaintainJobs
6260func (mj * MaintainJobs ) StartMaintianJobs (configs []JobConfig ) error {
61+ ctx , cancel := context .WithCancel (context .Background ())
62+ mj .cancel = cancel
6363 for _ , cfg := range configs {
6464 if ! cfg .Enable {
6565 log .Infof ("[Maintain][Job] job (%s) not enable" , cfg .Name )
@@ -83,33 +83,27 @@ func (mj *MaintainJobs) StartMaintianJobs(configs []JobConfig) error {
8383 log .Errorf ("[Maintain][Job][%s] start leader election err: %v" , cfg .Name , err )
8484 return err
8585 }
86- _ , err = mj . scheduler . AddFunc (cfg .CronSpec , newCronCmd ( cfg . Name , job , mj . storage ) )
86+ dur , err := time . ParseDuration (cfg .Interval )
8787 if err != nil {
88- log .Errorf ("[Maintain][Job] job (%s) fail to start, err: %v" , cfg .Name , err )
89- return fmt . Errorf ( "[Maintain][Job] job (%s) fail to start" , cfg . Name )
88+ log .Errorf ("[Maintain][Job][%s] parse job exec interval err: %v" , cfg .Name , err )
89+ return err
9090 }
91+ runAdminJob (ctx , cfg .Name , dur , job , mj .storage )
9192 mj .startedJobs [cfg .Name ] = job
9293 }
93- mj .scheduler .Start ()
9494 return nil
9595}
9696
9797// StopMaintainJobs
9898func (mj * MaintainJobs ) StopMaintainJobs () {
99- ctx := mj .scheduler .Stop ()
100- <- ctx .Done ()
99+ if mj .cancel != nil {
100+ mj .cancel ()
101+ }
101102 mj .startedJobs = map [string ]maintainJob {}
102103}
103104
104- func newCron () * cron.Cron {
105- return cron .New (cron .WithChain (
106- cron .Recover (cron .DefaultLogger )),
107- cron .WithParser (cron .NewParser (
108- cron .Minute | cron .Hour | cron .Dom | cron .Month | cron .Dow | cron .Descriptor )))
109- }
110-
111- func newCronCmd (name string , job maintainJob , storage store.Store ) func () {
112- return func () {
105+ func runAdminJob (ctx context.Context , name string , interval time.Duration , job maintainJob , storage store.Store ) {
106+ f := func () {
113107 if ! storage .IsLeader (store .ElectionKeyMaintainJobPrefix + name ) {
114108 log .Infof ("[Maintain][Job][%s] I am follower" , name )
115109 job .clear ()
@@ -118,8 +112,19 @@ func newCronCmd(name string, job maintainJob, storage store.Store) func() {
118112 log .Infof ("[Maintain][Job][%s] I am leader, job start" , name )
119113 job .execute ()
120114 log .Infof ("[Maintain][Job][%s] I am leader, job end" , name )
121-
122115 }
116+
117+ ticker := time .NewTicker (interval )
118+ go func (ctx context.Context ) {
119+ for {
120+ select {
121+ case <- ctx .Done ():
122+ return
123+ case <- ticker .C :
124+ f ()
125+ }
126+ }
127+ }(ctx )
123128}
124129
125130type maintainJob interface {
0 commit comments