Skip to content

Commit 0673c26

Browse files
committed
fix: attempt to fix race condition when counting
1 parent 8dc69c5 commit 0673c26

File tree

1 file changed

+33
-42
lines changed

1 file changed

+33
-42
lines changed

services/misc.go

Lines changed: 33 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package services
33
import (
44
"github.com/emvi/logbuch"
55
"github.com/muety/wakapi/config"
6-
"go.uber.org/atomic"
76
"runtime"
87
"strconv"
98
"time"
@@ -17,7 +16,6 @@ type MiscService struct {
1716
userService IUserService
1817
summaryService ISummaryService
1918
keyValueService IKeyValueService
20-
jobCount atomic.Uint32
2119
}
2220

2321
func NewMiscService(userService IUserService, summaryService ISummaryService, keyValueService IKeyValueService) *MiscService {
@@ -51,56 +49,34 @@ func (srv *MiscService) ScheduleCountTotalTime() {
5149
}
5250

5351
func (srv *MiscService) runCountTotalTime() error {
54-
jobs := make(chan *CountTotalTimeJob)
55-
results := make(chan *CountTotalTimeResult)
56-
57-
defer close(jobs)
58-
59-
for i := 0; i < runtime.NumCPU(); i++ {
60-
go srv.countTotalTimeWorker(jobs, results)
52+
users, err := srv.userService.GetAll()
53+
if err != nil {
54+
return err
6155
}
6256

63-
go srv.persistTotalTimeWorker(results)
57+
jobs := make(chan *CountTotalTimeJob, len(users))
58+
results := make(chan *CountTotalTimeResult, len(users))
6459

65-
// generate the jobs
66-
if users, err := srv.userService.GetAll(); err == nil {
67-
for _, u := range users {
68-
jobs <- &CountTotalTimeJob{
69-
UserID: u.ID,
70-
NumJobs: len(users),
71-
}
60+
for _, u := range users {
61+
jobs <- &CountTotalTimeJob{
62+
UserID: u.ID,
63+
NumJobs: len(users),
7264
}
73-
} else {
74-
return err
7565
}
66+
close(jobs)
7667

77-
return nil
78-
}
79-
80-
func (srv *MiscService) countTotalTimeWorker(jobs <-chan *CountTotalTimeJob, results chan<- *CountTotalTimeResult) {
81-
for job := range jobs {
82-
if result, err := srv.summaryService.Aliased(time.Time{}, time.Now(), &models.User{ID: job.UserID}, srv.summaryService.Retrieve, false); err != nil {
83-
config.Log().Error("failed to count total for user %s: %v", job.UserID, err)
84-
} else {
85-
results <- &CountTotalTimeResult{
86-
UserId: job.UserID,
87-
Total: result.TotalTime(),
88-
}
89-
}
90-
if srv.jobCount.Inc() == uint32(job.NumJobs) {
91-
srv.jobCount.Store(0)
92-
close(results)
93-
}
68+
for i := 0; i < runtime.NumCPU(); i++ {
69+
go srv.countTotalTimeWorker(jobs, results)
9470
}
95-
}
9671

97-
func (srv *MiscService) persistTotalTimeWorker(results <-chan *CountTotalTimeResult) {
98-
var c int
72+
// persist
73+
var i int
9974
var total time.Duration
100-
for result := range results {
75+
for i = 0; i < len(users); i++ {
76+
result := <-results
10177
total += result.Total
102-
c++
10378
}
79+
close(results)
10480

10581
if err := srv.keyValueService.PutString(&models.KeyStringValue{
10682
Key: config.KeyLatestTotalTime,
@@ -111,8 +87,23 @@ func (srv *MiscService) persistTotalTimeWorker(results <-chan *CountTotalTimeRes
11187

11288
if err := srv.keyValueService.PutString(&models.KeyStringValue{
11389
Key: config.KeyLatestTotalUsers,
114-
Value: strconv.Itoa(c),
90+
Value: strconv.Itoa(i),
11591
}); err != nil {
11692
logbuch.Error("failed to save total users count: %v", err)
11793
}
94+
95+
return nil
96+
}
97+
98+
func (srv *MiscService) countTotalTimeWorker(jobs <-chan *CountTotalTimeJob, results chan<- *CountTotalTimeResult) {
99+
for job := range jobs {
100+
if result, err := srv.summaryService.Aliased(time.Time{}, time.Now(), &models.User{ID: job.UserID}, srv.summaryService.Retrieve, false); err != nil {
101+
config.Log().Error("failed to count total for user %s: %v", job.UserID, err)
102+
} else {
103+
results <- &CountTotalTimeResult{
104+
UserId: job.UserID,
105+
Total: result.TotalTime(),
106+
}
107+
}
108+
}
118109
}

0 commit comments

Comments
 (0)