Skip to content

Commit 23063eb

Browse files
fix(scheduler): data race when pushing new tasks
the problem here is that scheduler can be closed in two ways: - canceling the context given as argument to scheduler.RunScheduler() - running scheduler.Shutdown() because of this shutdown can trigger a data race between calling scheduler.inShutdown() and actually pushing tasks into the pool workers solved that by keeping a quit channel and listening on both quit channel and ctx.Done() and closing the worker chan and scheduler afterwards. Signed-off-by: Petu Eusebiu <[email protected]>
1 parent e3bd9a8 commit 23063eb

File tree

8 files changed

+178
-49
lines changed

8 files changed

+178
-49
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ test-prereq: check-skopeo $(TESTDATA) $(ORAS)
194194
.PHONY: test-extended
195195
test-extended: $(if $(findstring ui,$(BUILD_LABELS)), ui)
196196
test-extended: test-prereq
197-
go test -failfast -tags $(BUILD_LABELS),containers_image_openpgp -trimpath -race -timeout 15m -cover -coverpkg ./... -coverprofile=coverage-extended.txt -covermode=atomic ./...
197+
go test -failfast -tags $(BUILD_LABELS),containers_image_openpgp -trimpath -race -timeout 20m -cover -coverpkg ./... -coverprofile=coverage-extended.txt -covermode=atomic ./...
198198
rm -rf /tmp/getter*; rm -rf /tmp/trivy*
199199

200200
.PHONY: test-minimal

pkg/extensions/extension_image_trust_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[
257257
[]string{fmt.Sprintf("localhost:%s/%s@%s", port, repo, image.DigestStr())})
258258
So(err, ShouldBeNil)
259259

260-
found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second)
260+
found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second)
261261
So(err, ShouldBeNil)
262262
So(found, ShouldBeTrue)
263263

@@ -369,7 +369,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[
369369
err = signature.SignWithNotation(certName, imageURL, rootDir, true)
370370
So(err, ShouldBeNil)
371371

372-
found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second)
372+
found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second)
373373
So(err, ShouldBeNil)
374374
So(found, ShouldBeTrue)
375375

@@ -502,7 +502,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[
502502
err = signature.SignWithNotation(certName, imageURL, rootDir, false)
503503
So(err, ShouldBeNil)
504504

505-
found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second)
505+
found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second)
506506
So(err, ShouldBeNil)
507507
So(found, ShouldBeTrue)
508508

@@ -672,7 +672,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[
672672
[]string{fmt.Sprintf("localhost:%s/%s@%s", port, repo, image.DigestStr())})
673673
So(err, ShouldBeNil)
674674

675-
found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second)
675+
found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second)
676676
So(err, ShouldBeNil)
677677
So(found, ShouldBeTrue)
678678

@@ -883,7 +883,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[
883883
So(err, ShouldBeNil)
884884
So(found, ShouldBeTrue)
885885

886-
found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second)
886+
found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second)
887887
So(err, ShouldBeNil)
888888
So(found, ShouldBeTrue)
889889

pkg/extensions/scrub/scrub_test.go

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,11 @@ func TestScrubExtension(t *testing.T) {
7070

7171
cm := test.NewControllerManager(ctlr)
7272
cm.StartAndWait(port)
73-
time.Sleep(6 * time.Second)
74-
7573
defer cm.StopServer()
7674

77-
data, err := os.ReadFile(logFile.Name())
75+
found, err := test.ReadLogFileAndSearchString(logFile.Name(), "scrub: blobs/manifest ok", 60*time.Second)
76+
So(found, ShouldBeTrue)
7877
So(err, ShouldBeNil)
79-
So(string(data), ShouldContainSubstring, "scrub: blobs/manifest ok")
8078
})
8179

8280
Convey("Blobs integrity affected", t, func(c C) {
@@ -122,13 +120,11 @@ func TestScrubExtension(t *testing.T) {
122120

123121
cm := test.NewControllerManager(ctlr)
124122
cm.StartAndWait(port)
125-
time.Sleep(6 * time.Second)
126-
127123
defer cm.StopServer()
128124

129-
data, err := os.ReadFile(logFile.Name())
125+
found, err := test.ReadLogFileAndSearchString(logFile.Name(), "scrub: blobs/manifest affected", 60*time.Second)
126+
So(found, ShouldBeTrue)
130127
So(err, ShouldBeNil)
131-
So(string(data), ShouldContainSubstring, "scrub: blobs/manifest affected")
132128
})
133129

134130
Convey("Generator error - not enough permissions to access root directory", t, func(c C) {
@@ -170,13 +166,11 @@ func TestScrubExtension(t *testing.T) {
170166

171167
cm := test.NewControllerManager(ctlr)
172168
cm.StartAndWait(port)
173-
time.Sleep(6 * time.Second)
174-
175169
defer cm.StopServer()
176170

177-
data, err := os.ReadFile(logFile.Name())
171+
found, err := test.ReadLogFileAndSearchString(logFile.Name(), "error while executing generator", 60*time.Second)
172+
So(found, ShouldBeTrue)
178173
So(err, ShouldBeNil)
179-
So(string(data), ShouldContainSubstring, "error while executing generator")
180174

181175
So(os.Chmod(path.Join(dir, repoName), 0o755), ShouldBeNil)
182176
})

pkg/extensions/sync/sync_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5238,15 +5238,15 @@ func TestError(t *testing.T) {
52385238

52395239
dctlr, _, destDir, _ := makeDownstreamServer(t, false, syncConfig)
52405240

5241+
localRepoPath := path.Join(destDir, testImage, "blobs")
5242+
err := os.MkdirAll(localRepoPath, 0o755)
5243+
So(err, ShouldBeNil)
5244+
52415245
dcm := test.NewControllerManager(dctlr)
52425246
dcm.StartAndWait(dctlr.Config.HTTP.Port)
52435247
defer dcm.StopServer()
52445248

52455249
// give permission denied on pushSyncedLocalImage()
5246-
localRepoPath := path.Join(destDir, testImage, "blobs")
5247-
err := os.MkdirAll(localRepoPath, 0o755)
5248-
So(err, ShouldBeNil)
5249-
52505250
err = os.Chmod(localRepoPath, 0o000)
52515251
So(err, ShouldBeNil)
52525252

pkg/meta/meta_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func
413413
})
414414

415415
Convey("Test API keys with short expiration date", func() {
416-
expirationDate := time.Now().Add(500 * time.Millisecond).Local().Round(time.Millisecond)
416+
expirationDate := time.Now().Add(1 * time.Second)
417417
apiKeyDetails.ExpirationDate = expirationDate
418418

419419
userAc := reqCtx.NewUserAccessControl()
@@ -435,7 +435,7 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func
435435
So(isExpired, ShouldBeFalse)
436436
So(err, ShouldBeNil)
437437

438-
time.Sleep(600 * time.Millisecond)
438+
time.Sleep(1 * time.Second)
439439

440440
Convey("GetUserAPIKeys detects api key expired", func() {
441441
storedAPIKeys, err = metaDB.GetUserAPIKeys(ctx)

pkg/scheduler/scheduler.go

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ type Scheduler struct {
8383
workerWg *sync.WaitGroup
8484
isShuttingDown atomic.Bool
8585
metricServer monitoring.MetricServer
86+
87+
// ensure the scheduler can only be stopped once
88+
stop sync.Once
89+
90+
// close to signal the workers to stop working
91+
quit chan struct{}
8692
}
8793

8894
func NewScheduler(cfg *config.Config, ms monitoring.MetricServer, logC log.Logger) *Scheduler { //nolint: varnamelen
@@ -107,12 +113,14 @@ func NewScheduler(cfg *config.Config, ms monitoring.MetricServer, logC log.Logge
107113
generatorsLock: new(sync.Mutex),
108114
log: log.Logger{Logger: sublogger},
109115
// default value
116+
metricServer: ms,
110117
RateLimit: rateLimit,
111118
NumWorkers: numWorkers,
112119
workerChan: make(chan Task, numWorkers),
113120
metricsChan: make(chan struct{}, 1),
114121
workerWg: new(sync.WaitGroup),
115-
metricServer: ms,
122+
stop: sync.Once{},
123+
quit: make(chan struct{}),
116124
}
117125
}
118126

@@ -210,22 +218,30 @@ func (scheduler *Scheduler) metricsWorker() {
210218
}
211219
}
212220

221+
/*
222+
Scheduler can be stopped either by stopping the context provided in scheduler.RunScheduler(ctx context.Context)
223+
224+
or by calling this function.
225+
*/
213226
func (scheduler *Scheduler) Shutdown() {
227+
defer scheduler.workerWg.Wait()
228+
214229
if !scheduler.inShutdown() {
215230
scheduler.shutdown()
216231
}
217-
218-
scheduler.workerWg.Wait()
219232
}
220233

221234
func (scheduler *Scheduler) inShutdown() bool {
222235
return scheduler.isShuttingDown.Load()
223236
}
224237

225238
func (scheduler *Scheduler) shutdown() {
226-
close(scheduler.workerChan)
227-
close(scheduler.metricsChan)
228-
scheduler.isShuttingDown.Store(true)
239+
scheduler.stop.Do(func() {
240+
scheduler.isShuttingDown.Store(true)
241+
242+
close(scheduler.metricsChan)
243+
close(scheduler.quit)
244+
})
229245
}
230246

231247
func (scheduler *Scheduler) RunScheduler(ctx context.Context) {
@@ -243,33 +259,41 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) {
243259
go scheduler.metricsWorker()
244260

245261
go func() {
262+
// will close workers chan when either ctx is canceled or scheduler.Shutdown()
263+
defer close(scheduler.workerChan)
264+
246265
for {
247266
select {
267+
// can be stopped either by closing from outside the ctx given to RunScheduler()
248268
case <-ctx.Done():
249269
if !scheduler.inShutdown() {
250270
scheduler.shutdown()
251271
}
252272

253273
scheduler.log.Debug().Msg("scheduler: received stop signal, gracefully shutting down...")
254274

275+
return
276+
// or by running scheduler.Shutdown()
277+
case <-scheduler.quit:
278+
scheduler.log.Debug().Msg("scheduler: received stop signal, gracefully shutting down...")
279+
255280
return
256281
default:
257-
i := 0
258-
for i < numWorkers {
259-
task := scheduler.getTask()
260-
261-
if task != nil {
262-
// push tasks into worker pool
263-
if !scheduler.inShutdown() {
264-
scheduler.log.Debug().Str("task", task.String()).Msg("scheduler: pushing task into worker pool")
265-
scheduler.workerChan <- task
266-
}
267-
}
268-
i++
282+
// we don't want to block on sending task in workerChan.
283+
if len(scheduler.workerChan) == scheduler.NumWorkers {
284+
<-throttle
285+
286+
continue
269287
}
270-
}
271288

272-
<-throttle
289+
task := scheduler.getTask()
290+
291+
if task != nil {
292+
// push tasks into worker pool
293+
scheduler.log.Debug().Msg("scheduler: pushing task into worker pool")
294+
scheduler.workerChan <- task
295+
}
296+
}
273297
}
274298
}()
275299
}

0 commit comments

Comments
 (0)