@@ -2,10 +2,13 @@ package crew
2
2
3
3
import (
4
4
"fmt"
5
+ "log"
5
6
"sort"
6
7
"strings"
7
8
"sync"
8
9
"time"
10
+
11
+ "github.com/go-co-op/gocron"
9
12
)
10
13
11
14
// A ThrottlePushQuery is a request to the throttler to see if there is enough bandwidth for a worker to run.
@@ -28,11 +31,13 @@ type Throttler struct {
28
31
29
32
// TaskGroup represents a group of tasks.
30
33
type TaskController struct {
31
- Storage TaskStorage
32
- Client TaskClient
33
- Feed chan interface {}
34
- Throttler * Throttler
35
- Pending * sync.WaitGroup
34
+ Storage TaskStorage
35
+ Client TaskClient
36
+ Feed chan interface {}
37
+ Throttler * Throttler
38
+ Pending * sync.WaitGroup
39
+ AbandonedCheckScheduler * gocron.Scheduler
40
+ AbandonedCheckMutex * sync.Mutex
36
41
}
37
42
38
43
// NewTaskController returns a new TaskController.
@@ -43,6 +48,8 @@ func NewTaskController(storage TaskStorage, client TaskClient, throttler *Thrott
43
48
Feed : make (chan interface {}, 8 ),
44
49
Throttler : throttler ,
45
50
Pending : & sync.WaitGroup {},
51
+ // AbandonedCheckScheduler is created in startup
52
+ AbandonedCheckMutex : & sync.Mutex {},
46
53
}
47
54
}
48
55
@@ -492,53 +499,75 @@ func (controller *TaskController) UpdateTask(id string, update map[string]interf
492
499
}
493
500
494
501
func (controller * TaskController ) Startup () (err error ) {
495
- // Fire an evaluate for any tasks that are incomplete and unpaused
502
+ // Restart tasks on startup and/or check for tasks that may have been abandoned due to crashes (or power outages) during execution.
503
+ // Note that for this to work for abandonments the storage mechanism must have expirations on task locks.
504
+ // Only the redis storage mechanism currently supports this.
505
+ s := gocron .NewScheduler (time .UTC )
506
+ // TODO - configure interval with an env var?
507
+ s .Every (15 ).Minutes ().Do (func () {
508
+ log .Println ("Abandoned task scan starting" )
509
+
510
+ // Use a mutex to make sure this doesn't run more than once at a time
511
+ locked := controller .AbandonedCheckMutex .TryLock ()
512
+ if ! locked {
513
+ log .Println ("Previous abandoned task scan still running, bailing out." )
514
+ return
515
+ }
516
+ defer controller .AbandonedCheckMutex .Unlock ()
496
517
497
- go func () {
498
- groups , groupsError := controller .Storage .AllTaskGroups ()
499
- if groupsError == nil {
500
- for _ , group := range groups {
501
- fmt .Println ("~~ Bootstrapping tasks for group" , group .Id )
518
+ taskGroups , taskGroupsError := controller .Storage .AllTaskGroups ()
519
+ if taskGroupsError == nil {
520
+ for _ , group := range taskGroups {
502
521
tasks , tasksError := controller .Storage .AllTasksInGroup (group .Id )
503
522
if tasksError == nil {
504
523
for _ , task := range tasks {
505
- if ! task .IsComplete && ! task .IsPaused {
524
+ // Do a couple of quick checks to prevent unecessary evaluates
525
+ if ! task .IsComplete && ! task .IsPaused && task .RunAfter .After (time .Now ()) && task .RemainingAttempts > 0 {
506
526
controller .TriggerTaskEvaluate (task .Id )
507
527
// Slight pause here to prevent a flood of evaluates
508
528
time .Sleep (time .Second / 10 )
509
529
}
510
530
}
511
531
} else {
512
- fmt .Println ("~~ Error bootstrapping tasks" , tasksError )
532
+ log .Println ("Error scanning for abandoned tasks" , tasksError )
513
533
}
514
- // Longer pause between groups to prevent overloading ourselves.
534
+
535
+ // Pause between groups to prevent overloading ourselves.
515
536
time .Sleep (time .Second * 1 )
516
537
}
517
538
} else {
518
- fmt .Println ("~~ Error bootstrapping task groups" , groupsError )
539
+ log .Println ("Error scanning for abandoned tasks (fetch groups) " , taskGroupsError )
519
540
}
520
- }()
541
+
542
+ log .Println ("Abandoned task scan completed" )
543
+ })
544
+ s .StartAsync ()
545
+ controller .AbandonedCheckScheduler = s
521
546
522
547
return nil
523
548
}
524
549
525
550
func (controller * TaskController ) Shutdown () (err error ) {
551
+ if controller .AbandonedCheckScheduler != nil {
552
+ controller .AbandonedCheckScheduler .Stop ()
553
+ }
554
+
526
555
// Wait till all pending task executions are complete
527
556
controller .Pending .Wait ()
528
557
return nil
529
558
}
530
559
531
560
func (controller * TaskController ) Evaluate (task * Task ) {
532
561
parents , _ := controller .Storage .GetTaskParents (task .Id )
533
- fmt .Println ("~~ Evaluating task" , task .Id , len (parents ))
562
+ log .Println ("Evaluating task" , task .Id , len (parents ))
534
563
canExecute := task .CanExecute (parents )
535
564
if canExecute {
536
565
controller .Execute (task )
537
566
}
538
567
}
539
568
540
569
func (controller * TaskController ) Execute (taskToExecute * Task ) {
541
- fmt .Println ("~~ Executing task" , taskToExecute .Id )
570
+ log .Println ("Executing task" , taskToExecute .Id )
542
571
parents , _ := controller .Storage .GetTaskParents (taskToExecute .Id )
543
572
544
573
timer := time .NewTimer (1000 * time .Second )
@@ -548,10 +577,10 @@ func (controller *TaskController) Execute(taskToExecute *Task) {
548
577
go func () {
549
578
defer controller .Pending .Done ()
550
579
551
- fmt .Println ("~~ Waiting for task start time (go routine)" , taskToExecute .Id )
580
+ log .Println ("Waiting for task start time (go routine)" , taskToExecute .Id )
552
581
<- timer .C
553
582
554
- fmt .Println ("~~ Executing task (go routine)" , taskToExecute .Id )
583
+ log .Println ("Executing task (go routine)" , taskToExecute .Id )
555
584
556
585
// Lock is as close to worker request send as possible (in case task delay is longer than lock timeout)
557
586
unlocker , lockError := controller .Storage .TryLockTask (taskToExecute .Id )
@@ -564,7 +593,7 @@ func (controller *TaskController) Execute(taskToExecute *Task) {
564
593
565
594
if lockError != nil {
566
595
// Couldn't lock task, do not execute
567
- fmt .Println ("~~ Executing task (lock fail)" , taskToExecute .Id )
596
+ log .Println ("Executing task (lock fail)" , taskToExecute .Id )
568
597
return
569
598
}
570
599
@@ -619,10 +648,10 @@ func (controller *TaskController) Execute(taskToExecute *Task) {
619
648
task .BusyExecuting = false
620
649
621
650
if err != nil {
622
- fmt .Println ("~~ Got standard error" , err )
651
+ log .Println ("Got standard error" , task . Id , err )
623
652
controller .HandleExecuteError (task , fmt .Sprintf ("%v" , err ))
624
653
} else if workerResponse .Error != nil {
625
- fmt .Println ("~~ Got worker response error" , workerResponse .Error )
654
+ log .Println ("Got worker response error" , task . Id , workerResponse .Error )
626
655
controller .HandleExecuteError (task , fmt .Sprintf ("%v" , workerResponse .Error ))
627
656
} else {
628
657
// No error!
@@ -674,7 +703,7 @@ func (controller *TaskController) Execute(taskToExecute *Task) {
674
703
675
704
// Because children failed we have to fail the task so that users will know something went wrong.
676
705
task .IsComplete = false
677
- fmt .Println ("~~ Got child creation error" , errorCreatingChildren )
706
+ log .Println ("Got child creation error" , task . Id , errorCreatingChildren )
678
707
controller .HandleExecuteError (task , fmt .Sprintf ("Child create failure : %v" , errorCreatingChildren ))
679
708
} else {
680
709
for _ , child := range createdChildren {
0 commit comments