-
Notifications
You must be signed in to change notification settings - Fork 5.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
scanner: Scan buckets asynchronously
- Scan buckets in all erasure sets asynchrounously - No data format is changed - Cycle concept moved to be bucket centric, the cycle is incremented ecah time a bucket is successfully or unsuccesfully scanned - Next bucket to scan is chosen based on the lowest possible cycle number and managed by the scan manager code
- Loading branch information
Anis Elleuch
committed
Sep 30, 2023
1 parent
24c7e73
commit 904bdd7
Showing
12 changed files
with
392 additions
and
373 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
// Copyright (c) 2015-2023 MinIO, Inc. | ||
// | ||
// This file is part of MinIO Object Storage stack | ||
// | ||
// This program is free software: you can redistribute it and/or modify | ||
// it under the terms of the GNU Affero General Public License as published by | ||
// the Free Software Foundation, either version 3 of the License, or | ||
// (at your option) any later version. | ||
// | ||
// This program is distributed in the hope that it will be useful | ||
// but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
// GNU Affero General Public License for more details. | ||
// | ||
// You should have received a copy of the GNU Affero General Public License | ||
// along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
package cmd | ||
|
||
import ( | ||
"context" | ||
"math" | ||
"sync" | ||
"time" | ||
) | ||
|
||
const ( | ||
// the interval to discover if there are new buckets created in the cluster | ||
bucketsListInterval = time.Minute | ||
) | ||
|
||
type bucketScanStat struct { | ||
ongoing bool // this bucket is currently being scanned | ||
cycle uint32 // the last cycle of this scan | ||
} | ||
|
||
type bucketsScanMgr struct { | ||
ctx context.Context | ||
|
||
// A registered function which knows how to list buckets | ||
bucketsLister func(context.Context, BucketOptions) ([]BucketInfo, error) | ||
|
||
mu sync.RWMutex | ||
bucketsCh map[int]chan string // A map of an erasure set identifier and a channel of buckets to scan | ||
internal map[int]map[string]bucketScanStat // A map of an erasure set identifier and bucket scan stats | ||
} | ||
|
||
func newBucketsScanMgr(s3 ObjectLayer) *bucketsScanMgr { | ||
mgr := &bucketsScanMgr{ | ||
ctx: GlobalContext, | ||
bucketsLister: s3.ListBuckets, | ||
internal: make(map[int]map[string]bucketScanStat), | ||
bucketsCh: make(map[int]chan string), | ||
} | ||
return mgr | ||
} | ||
|
||
func (mgr *bucketsScanMgr) start() { | ||
// A routine that discovers new buckets and initialize scan stats for each new bucket | ||
go func() { | ||
t := time.NewTimer(bucketsListInterval) | ||
defer t.Stop() | ||
|
||
for { | ||
select { | ||
case <-t.C: | ||
allBuckets, err := mgr.bucketsLister(mgr.ctx, BucketOptions{}) | ||
if err == nil { | ||
mgr.mu.Lock() | ||
for _, bucket := range allBuckets { | ||
for _, set := range mgr.internal { | ||
_, ok := set[bucket.Name] | ||
if !ok { | ||
set[bucket.Name] = bucketScanStat{} | ||
} | ||
} | ||
} | ||
mgr.mu.Unlock() | ||
} | ||
t.Reset(bucketsListInterval) | ||
case <-mgr.ctx.Done(): | ||
return | ||
} | ||
} | ||
}() | ||
|
||
// A routine that sends the next bucket to scan for each erasure set listener | ||
go func() { | ||
tick := 10 * time.Second | ||
|
||
t := time.NewTimer(tick) | ||
defer t.Stop() | ||
|
||
for { | ||
select { | ||
case <-t.C: | ||
mgr.mu.RLock() | ||
for id, ch := range mgr.bucketsCh { | ||
if len(ch) == 0 { | ||
b := mgr.unsafeGetNextBucket(id) | ||
if b != "" { | ||
select { | ||
case ch <- b: | ||
default: | ||
} | ||
} | ||
} | ||
} | ||
mgr.mu.RUnlock() | ||
|
||
t.Reset(tick) | ||
case <-mgr.ctx.Done(): | ||
return | ||
} | ||
} | ||
}() | ||
} | ||
|
||
// Return a channel of buckets names to scan a given erasure set identifier | ||
func (mgr *bucketsScanMgr) getBucketCh(id int) chan string { | ||
mgr.mu.Lock() | ||
defer mgr.mu.Unlock() | ||
|
||
mgr.internal[id] = make(map[string]bucketScanStat) | ||
mgr.bucketsCh[id] = make(chan string, 1) | ||
|
||
return mgr.bucketsCh[id] | ||
} | ||
|
||
// Return the next bucket name to scan of a given erasure set identifier | ||
func (mgr *bucketsScanMgr) unsafeGetNextBucket(id int) string { | ||
var ( | ||
leastCycle = uint32(math.MaxUint32) | ||
nextBucket = "" | ||
) | ||
|
||
for bucket, stat := range mgr.internal[id] { | ||
if stat.ongoing { | ||
continue | ||
} | ||
if stat.cycle == 0 { | ||
return bucket | ||
} | ||
if stat.cycle < leastCycle { | ||
leastCycle = stat.cycle | ||
nextBucket = bucket | ||
} | ||
} | ||
|
||
return nextBucket | ||
} | ||
|
||
// Mark a bucket as done in a specific erasure set - returns true if successful, | ||
// false if the bucket is already in a scanning phase | ||
func (mgr *bucketsScanMgr) markBucketScanStarted(id int, bucket string, cycle uint32) bool { | ||
mgr.mu.Lock() | ||
defer mgr.mu.Unlock() | ||
|
||
m, _ := mgr.internal[id][bucket] | ||
if m.ongoing { | ||
return false | ||
} | ||
|
||
m.ongoing = true | ||
m.cycle = cycle | ||
mgr.internal[id][bucket] = m | ||
return true | ||
} | ||
|
||
// Mark a bucket as done in a specific erasure set | ||
func (mgr *bucketsScanMgr) markBucketScanDone(id int, bucket string) { | ||
mgr.mu.Lock() | ||
defer mgr.mu.Unlock() | ||
|
||
m, _ := mgr.internal[id][bucket] | ||
m.ongoing = false | ||
mgr.internal[id][bucket] = m | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.