Skip to content

Commit

Permalink
Ensuring the integrity of full snapshot before uploading it to the ob…
Browse files Browse the repository at this point in the history
…ject store. (#779)

* Verify the integrity of full snapshot before uploading it to the object store.

* Fix compact sub-command while checking integrity of full-snapshot

Fix unit tests.

* Address review comments.

* Address review comments 2.

* Address review comments 3.

* Address review comments 4.

* Add few logs to debug.

Rename few variables.

* Address review comments.

* Refactoring the code.

* Reverted one change.

* Add unit tests.

* use io.CopyBuffer instead of io.copy to improve performance

* Fix small things.

* Added 1 more unit tests.
  • Loading branch information
ishan16696 authored Jan 30, 2025
1 parent a5d5f56 commit 4703c60
Show file tree
Hide file tree
Showing 9 changed files with 386 additions and 19 deletions.
1 change: 1 addition & 0 deletions cmd/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func NewCompactCommand(ctx context.Context) *cobra.Command {
compactOptions := &brtypes.CompactOptions{
RestoreOptions: options,
CompactorConfig: opts.compactorConfig,
TempDir: opts.snapstoreConfig.TempDir,
}

snapshot, err := cp.Compact(ctx, compactOptions)
Expand Down
2 changes: 1 addition & 1 deletion example/01-etcd-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ initial-cluster: etcd=http://0.0.0.0:2380
initial-cluster-token: new
initial-cluster-state: new
auto-compaction-mode: periodic
auto-compaction-retention: 30m
auto-compaction-retention: 30m
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (cp *Compactor) Compact(ctx context.Context, opts *brtypes.CompactOptions)
isFinal := compactorRestoreOptions.BaseSnapshot.IsFinal

cc := &compressor.CompressionConfig{Enabled: isCompressed, CompressionPolicy: compressionPolicy}
snapshot, err := etcdutil.TakeAndSaveFullSnapshot(snapshotReqCtx, clientMaintenance, cp.store, etcdRevision, cc, suffix, isFinal, cp.logger)
snapshot, err := etcdutil.TakeAndSaveFullSnapshot(snapshotReqCtx, clientMaintenance, cp.store, opts.TempDir, etcdRevision, cc, suffix, isFinal, cp.logger)
if err != nil {
return nil, err
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,18 @@ var _ = Describe("Running Compactor", func() {
var compactorConfig *brtypes.CompactorConfig
var compactOptions *brtypes.CompactOptions
var compactedSnapshot *brtypes.Snapshot
var snapstoreConfig *brtypes.SnapstoreConfig
var tempRestoreDir string
var tempDataDir string

BeforeEach(func() {
dir = fmt.Sprintf("%s/etcd/snapshotter.bkp", testSuiteDir)
store, err = snapstore.GetSnapstore(&brtypes.SnapstoreConfig{Container: dir, Provider: "Local"})
snapstoreConfig = &brtypes.SnapstoreConfig{
Container: dir,
Provider: "Local",
}

store, err = snapstore.GetSnapstore(snapstoreConfig)
Expect(err).ShouldNot(HaveOccurred())
fmt.Println("The store where compaction will save snapshot is: ", store)

Expand Down Expand Up @@ -104,6 +110,7 @@ var _ = Describe("Running Compactor", func() {
compactOptions = &brtypes.CompactOptions{
RestoreOptions: restoreOpts,
CompactorConfig: compactorConfig,
TempDir: snapstoreConfig.TempDir,
}
})

Expand Down
150 changes: 135 additions & 15 deletions pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
package etcdutil

import (
"bytes"
"context"
"crypto/sha256"
"crypto/tls"
"fmt"
"io"
"os"
"path/filepath"
"time"

"github.com/gardener/etcd-backup-restore/pkg/compressor"
Expand All @@ -23,6 +28,10 @@ import (
"go.etcd.io/etcd/pkg/transport"
)

const (
hashBufferSize = 4 * 1024 * 1024 // 4 MB
)

// NewFactory returns a Factory that constructs new clients using the supplied ETCD client configuration.
func NewFactory(cfg brtypes.EtcdConnectionConfig, opts ...client.Option) client.Factory {
options := &client.Options{}
Expand Down Expand Up @@ -232,7 +241,7 @@ func GetEtcdEndPointsSorted(ctx context.Context, clientMaintenance client.Mainte
endPoint = etcdEndpoints[0]
} else {
return nil, nil, &errors.EtcdError{
Message: fmt.Sprintf("etcd endpoints are not passed correctly"),
Message: "etcd endpoints are not passed correctly",
}
}

Expand All @@ -253,44 +262,155 @@ func GetEtcdEndPointsSorted(ctx context.Context, clientMaintenance client.Mainte
return leaderEtcdEndpoints, followerEtcdEndpoints, nil
}

// TakeAndSaveFullSnapshot takes full snapshot and save it to store
func TakeAndSaveFullSnapshot(ctx context.Context, client client.MaintenanceCloser, store brtypes.SnapStore, lastRevision int64, cc *compressor.CompressionConfig, suffix string, isFinal bool, logger *logrus.Entry) (*brtypes.Snapshot, error) {
// TakeAndSaveFullSnapshot does the following operations:
// 1. takes the full snapshot of etcd database
// 2. verify the full snapshot's integrity check
// 3. compress the full snapshot(if compression is enabled)
// 4. finally, save the full snapshot to object store(if configured).
func TakeAndSaveFullSnapshot(ctx context.Context, client client.MaintenanceCloser, store brtypes.SnapStore, tempDir string, lastRevision int64, cc *compressor.CompressionConfig, suffix string, isFinal bool, logger *logrus.Entry) (*brtypes.Snapshot, error) {
startTime := time.Now()
rc, err := client.Snapshot(ctx)
if err != nil {
return nil, &errors.EtcdError{
Message: fmt.Sprintf("failed to create etcd snapshot: %v", err),
}
}
defer rc.Close()
timeTaken := time.Since(startTime)
logger.Infof("Total time taken by Snapshot API: %f seconds.", timeTaken.Seconds())

snapshotTempDBPath := filepath.Join(tempDir, "db")
defer func() {
if err := os.Remove(snapshotTempDBPath); err != nil {
logger.Warnf("failed to remove temporary full snapshot file: %v", err)
}
}()

var snapshotData io.ReadCloser
defer func() {
// to avoid calling Close() on empty ReadCloser,
// it's important to check ReadCloser for nil before calling Close().
if snapshotData != nil {
if err := snapshotData.Close(); err != nil {
logger.Warnf("failed to close snapshot data file: %v", err)
}
}
}()

// check the integrity of full snapshot before compression and upload to object store.
// for more info: https://github.com/gardener/etcd-backup-restore/issues/778
if snapshotData, err = checkFullSnapshotIntegrity(rc, snapshotTempDBPath, logger); err != nil {
logger.Errorf("verification of full snapshot SHA256 hash has failed: %v", err)
return nil, err
}
logger.Info("full snapshot SHA256 hash has been successfully verified.")

if cc.Enabled {
startTimeCompression := time.Now()
rc, err = compressor.CompressSnapshot(rc, cc.CompressionPolicy)
snapshotData, err = compressor.CompressSnapshot(snapshotData, cc.CompressionPolicy)
if err != nil {
return nil, fmt.Errorf("unable to obtain reader for compressed file: %v", err)
}
timeTakenCompression := time.Since(startTimeCompression)
logger.Infof("Total time taken in full snapshot compression: %f seconds.", timeTakenCompression.Seconds())
}
defer rc.Close()

logger.Infof("Successfully opened snapshot reader on etcd")

// Then save the snapshot to the store.
snapshot := snapstore.NewSnapshot(brtypes.SnapshotKindFull, 0, lastRevision, suffix, isFinal)
// save the snapshot to the store.
snapshot, err := saveSnapshotToStore(store, snapshotData, startTime, brtypes.SnapshotKindFull, lastRevision, suffix, isFinal, logger)
if err != nil {
return nil, err
}

return snapshot, nil
}

// checkFullSnapshotIntegrity verifies the integrity of the full snapshot by comparing
// the appended SHA256 hash of the full snapshot with the calculated SHA256 hash of the full snapshot data.
func checkFullSnapshotIntegrity(snapshotData io.ReadCloser, snapTempDBFilePath string, logger *logrus.Entry) (io.ReadCloser, error) {
logger.Info("checking the full snapshot integrity with the help of SHA256")

// If previous temp db file already exist then remove it.
if err := os.Remove(snapTempDBFilePath); err != nil && !os.IsNotExist(err) {
return nil, err
}

// Note: db file will be closed by caller function.
db, err := os.OpenFile(snapTempDBFilePath, os.O_RDWR|os.O_CREATE, 0600) // #nosec G304 -- this is a trusted file written by etcdbr.
if err != nil {
return nil, err
}

buf := make([]byte, hashBufferSize)

if _, err := io.CopyBuffer(db, snapshotData, buf); err != nil {
return nil, err
}

lastOffset, err := db.Seek(0, io.SeekEnd)
if err != nil {
return nil, err
}

// 512 is chosen because it's a minimum disk sector size in most systems.
if hasHash := (lastOffset % 512) == sha256.Size; !hasHash {
return nil, fmt.Errorf("SHA256 hash seems to be missing from snapshot data")
}

totalSnapshotBytes, err := db.Seek(-sha256.Size, io.SeekEnd)
if err != nil {
return nil, err
}

// get snapshot SHA256 hash
sha := make([]byte, sha256.Size)
if _, err := db.Read(sha); err != nil {
return nil, fmt.Errorf("failed to read SHA256 from snapshot data %v", err)
}

hash := sha256.New()

logger.Infof("Total no. of bytes received from snapshot api call with SHA: %d", lastOffset)
logger.Infof("Total no. of bytes received from snapshot api call without SHA: %d", totalSnapshotBytes)

// reset the file pointer back to starting
if _, err := db.Seek(0, io.SeekStart); err != nil {
return nil, err
}

snapshotDataReader := io.LimitReader(db, totalSnapshotBytes)

if _, err := io.CopyBuffer(hash, snapshotDataReader, buf); err != nil {
return nil, fmt.Errorf("unable to calculate SHA256 for full snapshot: %v", err)
}

dbSha := hash.Sum(nil)
if !bytes.Equal(sha, dbSha) {
return nil, fmt.Errorf("expected SHA256 for full snapshot: %x, got %x", sha, dbSha)
}

// reset the file pointer back to starting
if _, err := db.Seek(0, io.SeekStart); err != nil {
return nil, err
}

// full-snapshot of database has been successfully verified.
return db, nil
}

// saveSnapshotToStore save the snapshot to object store
func saveSnapshotToStore(store brtypes.SnapStore, rc io.ReadCloser, startTime time.Time, snapshotKind string, lastRevision int64, suffix string, isFinal bool, logger *logrus.Entry) (*brtypes.Snapshot, error) {
snapshot := snapstore.NewSnapshot(snapshotKind, 0, lastRevision, suffix, isFinal)

// save the snapshot to object store
if err := store.Save(*snapshot, rc); err != nil {
timeTaken := time.Since(startTime)
metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindFull, metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Observe(timeTaken.Seconds())
metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: snapshot.Kind, metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Observe(timeTaken.Seconds())
return nil, &errors.SnapstoreError{
Message: fmt.Sprintf("failed to save snapshot: %v", err),
}
}

timeTaken = time.Since(startTime)
metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindFull, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Observe(timeTaken.Seconds())
logger.Infof("Total time to save full snapshot: %f seconds.", timeTaken.Seconds())

timeTaken := time.Since(startTime)
metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: snapshot.Kind, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Observe(timeTaken.Seconds())
logger.Infof("Total time to save %s snapshot: %f seconds.", snapshot.Kind, timeTaken.Seconds())
return snapshot, nil
}
52 changes: 52 additions & 0 deletions pkg/etcdutil/etcdutil_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Gardener contributors
//
// SPDX-License-Identifier: Apache-2.0

package etcdutil_test

import (
"context"
"os"
"testing"

"github.com/gardener/etcd-backup-restore/test/utils"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/embed"
)

const (
outputDir = "../../test/output"
etcdDir = outputDir + "/default.etcd"
podName = "etcd-test-0"
)

var (
logger = logrus.New().WithField("suite", "etcdutil")
err error
testCtx = context.Background()
etcd *embed.Etcd
)

func TestEtcdUtil(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "EtcdUtil")
}

var _ = SynchronizedBeforeSuite(func() []byte {
err = os.RemoveAll(outputDir)
Expect(err).ShouldNot(HaveOccurred())

etcd, err = utils.StartEmbeddedEtcd(testCtx, etcdDir, logger, podName, "")
Expect(err).ShouldNot(HaveOccurred())
var data []byte
etcd.Server.Stop()
etcd.Close()
return data
}, func(data []byte) {})

var _ = SynchronizedAfterSuite(func() {}, func() {
err = os.RemoveAll(outputDir)
Expect(err).ShouldNot(HaveOccurred())
})
Loading

0 comments on commit 4703c60

Please sign in to comment.