Skip to content

Commit

Permalink
Improved error handling in deletion of delta snapshots (#793)
Browse files Browse the repository at this point in the history
* Delta snapshots are attempted to be deleted during garbage collection
  until a certain threshold number of errors is met, at which point deletion is halted.
  • Loading branch information
Shreyas-s14 authored Nov 12, 2024
1 parent d7f94e4 commit d558412
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 27 deletions.
63 changes: 36 additions & 27 deletions pkg/snapshot/snapshotter/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package snapshotter

import (
"errors"
"math"
"path"
"time"
Expand All @@ -16,6 +17,9 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

// DeltaSnapshotGCErrorThreshold represents the threshold value for the number of individual errors that can occur while deleting delta snapshots.
const DeltaSnapshotGCErrorThreshold = 5

// RunGarbageCollector basically consider the older backups as garbage and deletes it
func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) {
if ssr.config.GarbageCollectionPeriod.Duration <= time.Second {
Expand Down Expand Up @@ -68,8 +72,9 @@ func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) {
ssr.logger.Infof("GC: Total number garbage collected chunks: %d", chunksDeleted)
}

snapStreamIndexList := getSnapStreamIndexList(snapList)

fullSnapshotIndexList := getFullSnapshotIndexList(snapList)
// snapStream indicates a list of snapshots, where the first snapshot is base/full snapshot followed by a list of incremental snapshots based on it.
// Garbage collection is performed on one snapStream at a time.
switch ssr.config.GarbageCollectionPolicy {
case brtypes.GarbageCollectionPolicyExponential:
// Overall policy:
Expand All @@ -87,13 +92,14 @@ func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) {
)
// Here we start processing from second last snapstream, because we want to keep last snapstream
// including delta snapshots in it.
for snapStreamIndex := len(snapStreamIndexList) - 1; snapStreamIndex > 0; snapStreamIndex-- {
snap := snapList[snapStreamIndexList[snapStreamIndex]]
nextSnap := snapList[snapStreamIndexList[snapStreamIndex-1]]
for fullSnapshotIndex := len(fullSnapshotIndexList) - 1; fullSnapshotIndex > 0; fullSnapshotIndex-- {
snap := snapList[fullSnapshotIndexList[fullSnapshotIndex]]
nextSnap := snapList[fullSnapshotIndexList[fullSnapshotIndex-1]]

// garbage collect delta snapshots.
deletedSnap, err := ssr.GarbageCollectDeltaSnapshots(snapList[snapStreamIndexList[snapStreamIndex-1]:snapStreamIndexList[snapStreamIndex]])
total += deletedSnap
snapStream := snapList[fullSnapshotIndexList[fullSnapshotIndex-1]:fullSnapshotIndexList[fullSnapshotIndex]]
numDeletedSnapshots, err := ssr.GarbageCollectDeltaSnapshots(snapStream)
total += numDeletedSnapshots
if err != nil {
continue
}
Expand Down Expand Up @@ -161,14 +167,15 @@ func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) {
case brtypes.GarbageCollectionPolicyLimitBased:
// Delete delta snapshots in all snapStream but the latest one.
// Delete all snapshots beyond limit set by ssr.maxBackups.
for snapStreamIndex := 0; snapStreamIndex < len(snapStreamIndexList)-1; snapStreamIndex++ {
deletedSnap, err := ssr.GarbageCollectDeltaSnapshots(snapList[snapStreamIndexList[snapStreamIndex]:snapStreamIndexList[snapStreamIndex+1]])
total += deletedSnap
for fullSnapshotIndex := 0; fullSnapshotIndex < len(fullSnapshotIndexList)-1; fullSnapshotIndex++ {
snapStream := snapList[fullSnapshotIndexList[fullSnapshotIndex]:fullSnapshotIndexList[fullSnapshotIndex+1]]
numDeletedSnapshots, err := ssr.GarbageCollectDeltaSnapshots(snapStream)
total += numDeletedSnapshots
if err != nil {
continue
}
if snapStreamIndex < len(snapStreamIndexList)-int(ssr.config.MaxBackups) {
snap := snapList[snapStreamIndexList[snapStreamIndex]]
if fullSnapshotIndex < len(fullSnapshotIndexList)-int(ssr.config.MaxBackups) {
snap := snapList[fullSnapshotIndexList[fullSnapshotIndex]]
snapPath := path.Join(snap.SnapDir, snap.SnapName)
ssr.logger.Infof("GC: Deleting old full snapshot: %s", snapPath)
if err := ssr.store.Delete(*snap); err != nil {
Expand All @@ -187,21 +194,19 @@ func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) {
}
}

// getSnapStreamIndexList lists the index of snapStreams in snapList which consist of collection of snapStream.
// snapStream indicates the list of snapshot, where first snapshot is base/full snapshot followed by
// list of incremental snapshots based on it.
func getSnapStreamIndexList(snapList brtypes.SnapList) []int {
// getFullSnapshotIndexList returns the indices of Full snapshots in the snapList.
func getFullSnapshotIndexList(snapList brtypes.SnapList) []int {
// At this stage, we assume the snapList is sorted in increasing order of last revision number, i.e. snapshot with lower
// last revision at lower index and snapshot with higher last revision at higher index in list.
snapLen := len(snapList)
var snapStreamIndexList []int
snapStreamIndexList = append(snapStreamIndexList, 0)
var fullSnapshotIndexList []int
fullSnapshotIndexList = append(fullSnapshotIndexList, 0)
for index := 1; index < snapLen; index++ {
if snapList[index].Kind == brtypes.SnapshotKindFull && !snapList[index].IsChunk {
snapStreamIndexList = append(snapStreamIndexList, index)
fullSnapshotIndexList = append(fullSnapshotIndexList, index)
}
}
return snapStreamIndexList
return fullSnapshotIndexList
}

// GarbageCollectChunks removes obsolete chunks based on the latest recorded snapshot.
Expand Down Expand Up @@ -254,7 +259,8 @@ Returns:
func (ssr *Snapshotter) GarbageCollectDeltaSnapshots(snapStream brtypes.SnapList) (int, error) {
totalDeleted := 0
cutoffTime := time.Now().UTC().Add(-ssr.config.DeltaSnapshotRetentionPeriod.Duration)
for i := len(snapStream) - 1; i >= 0; i-- {
var finalError error
for i, errorCount := len(snapStream)-1, 0; i >= 0; i-- {
if (*snapStream[i]).Kind == brtypes.SnapshotKindDelta && snapStream[i].CreatedOn.Before(cutoffTime) {

snapPath := path.Join(snapStream[i].SnapDir, snapStream[i].SnapName)
Expand All @@ -264,17 +270,20 @@ func (ssr *Snapshotter) GarbageCollectDeltaSnapshots(snapStream brtypes.SnapList
continue
}
if err := ssr.store.Delete(*snapStream[i]); err != nil {
errorCount++
ssr.logger.Warnf("GC: Failed to delete snapshot %s: %v", snapPath, err)
metrics.SnapshotterOperationFailure.With(prometheus.Labels{metrics.LabelError: err.Error()}).Inc()
metrics.GCSnapshotCounter.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindDelta, metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Inc()

return totalDeleted, err
finalError = errors.Join(finalError, err)
if errorCount == DeltaSnapshotGCErrorThreshold {
return totalDeleted, finalError
}
} else {
metrics.GCSnapshotCounter.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindDelta, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Inc()
totalDeleted++
}

metrics.GCSnapshotCounter.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindDelta, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Inc()
totalDeleted++
}
}

return totalDeleted, nil
return totalDeleted, finalError
}
61 changes: 61 additions & 0 deletions pkg/snapshot/snapshotter/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,67 @@ var _ = Describe("Snapshotter", func() {
Expect(len(list)).Should(Equal(3))
})
})
Context("When no error occurs while deletion of delta snapshots", func() {
It("Should have no errors and all the snapshots should get deleted", func() {
store := prepareStoreWithDeltaSnapshots(testDir, 10)
list, err := store.List(false)
Expect(err).ShouldNot(HaveOccurred())
Expect(len(list)).Should(Equal(10))

ssr, err := NewSnapshotter(logger, snapshotterConfig, store, etcdConnectionConfig, compressionConfig, healthConfig, snapstoreConfig)
Expect(err).ShouldNot(HaveOccurred())

deleted, err := ssr.GarbageCollectDeltaSnapshots(list)
Expect(deleted).Should(Equal(10))
Expect(err).ShouldNot(HaveOccurred())
})
})
Context("When an error occurs while deletion of delta snapshot and number of errors are lesser than the threshold(5)", func() {
It("should continue with the deletion while joining the errors", func() {
store := prepareStoreWithDeltaSnapshots(testDir, 10)
list, err := store.List(false)
Expect(err).ShouldNot(HaveOccurred())
Expect(len(list)).Should(Equal(10))

ssr, err := NewSnapshotter(logger, snapshotterConfig, store, etcdConnectionConfig, compressionConfig, healthConfig, snapstoreConfig)
Expect(err).ShouldNot(HaveOccurred())

// delete a few snapshots in between to induce an error
snapshotsToBeDeleted := []int{7, 5, 3, 2}
for _, i := range snapshotsToBeDeleted {
err := os.Remove(path.Join(list[i].Prefix, list[i].SnapName))
Expect(err).ShouldNot(HaveOccurred())
}

deleted, err := ssr.GarbageCollectDeltaSnapshots(list)
Expect(deleted).Should(Equal(6))
Expect(err).Should(HaveOccurred())
})
})
Context("When the number of errors while deleting are greater than or equal to the threshold", func() {
It("Should halt the process and return", func() {
store := prepareStoreWithDeltaSnapshots(testDir, 15)
list, err := store.List(false)
Expect(err).ShouldNot(HaveOccurred())
Expect(len(list)).Should(Equal(15))

ssr, err := NewSnapshotter(logger, snapshotterConfig, store, etcdConnectionConfig, compressionConfig, healthConfig, snapstoreConfig)
Expect(err).ShouldNot(HaveOccurred())

// This below loop deletes snapshots until the number of deletions reaches the threshold.
// This will cause an error when passed into ssr.GarbageCollectDeltaSnapshots for deletion.
// Once the count of these individual errors is greater than or equal to the threshold, it errors out.
for i := len(list) - 3; i > len(list)-3-DeltaSnapshotGCErrorThreshold-1; i-- {
err = os.Remove(path.Join(list[i].Prefix, list[i].SnapName))
Expect(err).ShouldNot(HaveOccurred())
}

deleted, err := ssr.GarbageCollectDeltaSnapshots(list)
Expect(deleted).Should(Equal(2))
Expect(err).Should(HaveOccurred())

})
})
})
Describe("###GarbageCollectChunkSnapshots", func() {
const (
Expand Down

0 comments on commit d558412

Please sign in to comment.