diff --git a/pkg/snapshot/snapshotter/garbagecollector.go b/pkg/snapshot/snapshotter/garbagecollector.go index 6cee53405..28c4523b6 100644 --- a/pkg/snapshot/snapshotter/garbagecollector.go +++ b/pkg/snapshot/snapshotter/garbagecollector.go @@ -5,6 +5,7 @@ package snapshotter import ( + "errors" "math" "path" "time" @@ -16,6 +17,9 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// DeltaSnapshotGCErrorThreshold represents the threshold value for the number of individual errors that can occur while deleting delta snapshots. +const DeltaSnapshotGCErrorThreshold = 5 + // RunGarbageCollector basically consider the older backups as garbage and deletes it func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) { if ssr.config.GarbageCollectionPeriod.Duration <= time.Second { @@ -68,8 +72,9 @@ func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) { ssr.logger.Infof("GC: Total number garbage collected chunks: %d", chunksDeleted) } - snapStreamIndexList := getSnapStreamIndexList(snapList) - + fullSnapshotIndexList := getFullSnapshotIndexList(snapList) + // snapStream indicates a list of snapshots, where the first snapshot is base/full snapshot followed by a list of incremental snapshots based on it. + // Garbage collection is performed on one snapStream at a time. switch ssr.config.GarbageCollectionPolicy { case brtypes.GarbageCollectionPolicyExponential: // Overall policy: @@ -87,13 +92,14 @@ func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) { ) // Here we start processing from second last snapstream, because we want to keep last snapstream // including delta snapshots in it. - for snapStreamIndex := len(snapStreamIndexList) - 1; snapStreamIndex > 0; snapStreamIndex-- { - snap := snapList[snapStreamIndexList[snapStreamIndex]] - nextSnap := snapList[snapStreamIndexList[snapStreamIndex-1]] + for fullSnapshotIndex := len(fullSnapshotIndexList) - 1; fullSnapshotIndex > 0; fullSnapshotIndex-- { + snap := snapList[fullSnapshotIndexList[fullSnapshotIndex]] + nextSnap := snapList[fullSnapshotIndexList[fullSnapshotIndex-1]] // garbage collect delta snapshots. - deletedSnap, err := ssr.GarbageCollectDeltaSnapshots(snapList[snapStreamIndexList[snapStreamIndex-1]:snapStreamIndexList[snapStreamIndex]]) - total += deletedSnap + snapStream := snapList[fullSnapshotIndexList[fullSnapshotIndex-1]:fullSnapshotIndexList[fullSnapshotIndex]] + numDeletedSnapshots, err := ssr.GarbageCollectDeltaSnapshots(snapStream) + total += numDeletedSnapshots if err != nil { continue } @@ -161,14 +167,15 @@ func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) { case brtypes.GarbageCollectionPolicyLimitBased: // Delete delta snapshots in all snapStream but the latest one. // Delete all snapshots beyond limit set by ssr.maxBackups. - for snapStreamIndex := 0; snapStreamIndex < len(snapStreamIndexList)-1; snapStreamIndex++ { - deletedSnap, err := ssr.GarbageCollectDeltaSnapshots(snapList[snapStreamIndexList[snapStreamIndex]:snapStreamIndexList[snapStreamIndex+1]]) - total += deletedSnap + for fullSnapshotIndex := 0; fullSnapshotIndex < len(fullSnapshotIndexList)-1; fullSnapshotIndex++ { + snapStream := snapList[fullSnapshotIndexList[fullSnapshotIndex]:fullSnapshotIndexList[fullSnapshotIndex+1]] + numDeletedSnapshots, err := ssr.GarbageCollectDeltaSnapshots(snapStream) + total += numDeletedSnapshots if err != nil { continue } - if snapStreamIndex < len(snapStreamIndexList)-int(ssr.config.MaxBackups) { - snap := snapList[snapStreamIndexList[snapStreamIndex]] + if fullSnapshotIndex < len(fullSnapshotIndexList)-int(ssr.config.MaxBackups) { + snap := snapList[fullSnapshotIndexList[fullSnapshotIndex]] snapPath := path.Join(snap.SnapDir, snap.SnapName) ssr.logger.Infof("GC: Deleting old full snapshot: %s", snapPath) if err := ssr.store.Delete(*snap); err != nil { @@ -187,21 +194,19 @@ func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) { } } -// getSnapStreamIndexList lists the index of snapStreams in snapList which consist of collection of snapStream. -// snapStream indicates the list of snapshot, where first snapshot is base/full snapshot followed by -// list of incremental snapshots based on it. -func getSnapStreamIndexList(snapList brtypes.SnapList) []int { +// getFullSnapshotIndexList returns the indices of Full snapshots in the snapList. +func getFullSnapshotIndexList(snapList brtypes.SnapList) []int { // At this stage, we assume the snapList is sorted in increasing order of last revision number, i.e. snapshot with lower // last revision at lower index and snapshot with higher last revision at higher index in list. snapLen := len(snapList) - var snapStreamIndexList []int - snapStreamIndexList = append(snapStreamIndexList, 0) + var fullSnapshotIndexList []int + fullSnapshotIndexList = append(fullSnapshotIndexList, 0) for index := 1; index < snapLen; index++ { if snapList[index].Kind == brtypes.SnapshotKindFull && !snapList[index].IsChunk { - snapStreamIndexList = append(snapStreamIndexList, index) + fullSnapshotIndexList = append(fullSnapshotIndexList, index) } } - return snapStreamIndexList + return fullSnapshotIndexList } // GarbageCollectChunks removes obsolete chunks based on the latest recorded snapshot. @@ -254,7 +259,8 @@ Returns: func (ssr *Snapshotter) GarbageCollectDeltaSnapshots(snapStream brtypes.SnapList) (int, error) { totalDeleted := 0 cutoffTime := time.Now().UTC().Add(-ssr.config.DeltaSnapshotRetentionPeriod.Duration) - for i := len(snapStream) - 1; i >= 0; i-- { + var finalError error + for i, errorCount := len(snapStream)-1, 0; i >= 0; i-- { if (*snapStream[i]).Kind == brtypes.SnapshotKindDelta && snapStream[i].CreatedOn.Before(cutoffTime) { snapPath := path.Join(snapStream[i].SnapDir, snapStream[i].SnapName) @@ -264,17 +270,20 @@ func (ssr *Snapshotter) GarbageCollectDeltaSnapshots(snapStream brtypes.SnapList continue } if err := ssr.store.Delete(*snapStream[i]); err != nil { + errorCount++ ssr.logger.Warnf("GC: Failed to delete snapshot %s: %v", snapPath, err) metrics.SnapshotterOperationFailure.With(prometheus.Labels{metrics.LabelError: err.Error()}).Inc() metrics.GCSnapshotCounter.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindDelta, metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Inc() - - return totalDeleted, err + finalError = errors.Join(finalError, err) + if errorCount == DeltaSnapshotGCErrorThreshold { + return totalDeleted, finalError + } + } else { + metrics.GCSnapshotCounter.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindDelta, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Inc() + totalDeleted++ } - - metrics.GCSnapshotCounter.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindDelta, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Inc() - totalDeleted++ } } - return totalDeleted, nil + return totalDeleted, finalError } diff --git a/pkg/snapshot/snapshotter/snapshotter_test.go b/pkg/snapshot/snapshotter/snapshotter_test.go index 7788087c9..1bbfa5645 100644 --- a/pkg/snapshot/snapshotter/snapshotter_test.go +++ b/pkg/snapshot/snapshotter/snapshotter_test.go @@ -528,6 +528,67 @@ var _ = Describe("Snapshotter", func() { Expect(len(list)).Should(Equal(3)) }) }) + Context("When no error occurs while deletion of delta snapshots", func() { + It("Should have no errors and all the snapshots should get deleted", func() { + store := prepareStoreWithDeltaSnapshots(testDir, 10) + list, err := store.List(false) + Expect(err).ShouldNot(HaveOccurred()) + Expect(len(list)).Should(Equal(10)) + + ssr, err := NewSnapshotter(logger, snapshotterConfig, store, etcdConnectionConfig, compressionConfig, healthConfig, snapstoreConfig) + Expect(err).ShouldNot(HaveOccurred()) + + deleted, err := ssr.GarbageCollectDeltaSnapshots(list) + Expect(deleted).Should(Equal(10)) + Expect(err).ShouldNot(HaveOccurred()) + }) + }) + Context("When an error occurs while deletion of delta snapshot and number of errors are lesser than the threshold(5)", func() { + It("should continue with the deletion while joining the errors", func() { + store := prepareStoreWithDeltaSnapshots(testDir, 10) + list, err := store.List(false) + Expect(err).ShouldNot(HaveOccurred()) + Expect(len(list)).Should(Equal(10)) + + ssr, err := NewSnapshotter(logger, snapshotterConfig, store, etcdConnectionConfig, compressionConfig, healthConfig, snapstoreConfig) + Expect(err).ShouldNot(HaveOccurred()) + + // delete a few snapshots in between to induce an error + snapshotsToBeDeleted := []int{7, 5, 3, 2} + for _, i := range snapshotsToBeDeleted { + err := os.Remove(path.Join(list[i].Prefix, list[i].SnapName)) + Expect(err).ShouldNot(HaveOccurred()) + } + + deleted, err := ssr.GarbageCollectDeltaSnapshots(list) + Expect(deleted).Should(Equal(6)) + Expect(err).Should(HaveOccurred()) + }) + }) + Context("When the number of errors while deleting are greater than or equal to the threshold", func() { + It("Should halt the process and return", func() { + store := prepareStoreWithDeltaSnapshots(testDir, 15) + list, err := store.List(false) + Expect(err).ShouldNot(HaveOccurred()) + Expect(len(list)).Should(Equal(15)) + + ssr, err := NewSnapshotter(logger, snapshotterConfig, store, etcdConnectionConfig, compressionConfig, healthConfig, snapstoreConfig) + Expect(err).ShouldNot(HaveOccurred()) + + // This below loop deletes snapshots until the number of deletions reaches the threshold. + // This will cause an error when passed into ssr.GarbageCollectDeltaSnapshots for deletion. + // Once the count of these individual errors is greater than or equal to the threshold, it errors out. + for i := len(list) - 3; i > len(list)-3-DeltaSnapshotGCErrorThreshold-1; i-- { + err = os.Remove(path.Join(list[i].Prefix, list[i].SnapName)) + Expect(err).ShouldNot(HaveOccurred()) + } + + deleted, err := ssr.GarbageCollectDeltaSnapshots(list) + Expect(deleted).Should(Equal(2)) + Expect(err).Should(HaveOccurred()) + + }) + }) }) Describe("###GarbageCollectChunkSnapshots", func() { const (