Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensuring the integrity of full snapshot before uploading it to the object store. #779

Merged
merged 18 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func NewCompactCommand(ctx context.Context) *cobra.Command {
compactOptions := &brtypes.CompactOptions{
RestoreOptions: options,
CompactorConfig: opts.compactorConfig,
TempDir: opts.snapstoreConfig.TempDir,
}

snapshot, err := cp.Compact(ctx, compactOptions)
Expand Down
2 changes: 1 addition & 1 deletion example/01-etcd-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ initial-cluster: etcd=http://0.0.0.0:2380
initial-cluster-token: new
initial-cluster-state: new
auto-compaction-mode: periodic
auto-compaction-retention: 30m
auto-compaction-retention: 30m
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (cp *Compactor) Compact(ctx context.Context, opts *brtypes.CompactOptions)
isFinal := compactorRestoreOptions.BaseSnapshot.IsFinal

cc := &compressor.CompressionConfig{Enabled: isCompressed, CompressionPolicy: compressionPolicy}
snapshot, err := etcdutil.TakeAndSaveFullSnapshot(snapshotReqCtx, clientMaintenance, cp.store, etcdRevision, cc, suffix, isFinal, cp.logger)
snapshot, err := etcdutil.TakeAndSaveFullSnapshot(snapshotReqCtx, clientMaintenance, cp.store, opts.TempDir, etcdRevision, cc, suffix, isFinal, cp.logger)
if err != nil {
return nil, err
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,18 @@ var _ = Describe("Running Compactor", func() {
var compactorConfig *brtypes.CompactorConfig
var compactOptions *brtypes.CompactOptions
var compactedSnapshot *brtypes.Snapshot
var snapstoreConfig *brtypes.SnapstoreConfig
var tempRestoreDir string
var tempDataDir string

BeforeEach(func() {
dir = fmt.Sprintf("%s/etcd/snapshotter.bkp", testSuiteDir)
store, err = snapstore.GetSnapstore(&brtypes.SnapstoreConfig{Container: dir, Provider: "Local"})
snapstoreConfig = &brtypes.SnapstoreConfig{
Container: dir,
Provider: "Local",
}

store, err = snapstore.GetSnapstore(snapstoreConfig)
Expect(err).ShouldNot(HaveOccurred())
fmt.Println("The store where compaction will save snapshot is: ", store)

Expand Down Expand Up @@ -104,6 +110,7 @@ var _ = Describe("Running Compactor", func() {
compactOptions = &brtypes.CompactOptions{
RestoreOptions: restoreOpts,
CompactorConfig: compactorConfig,
TempDir: snapstoreConfig.TempDir,
}
})

Expand Down
150 changes: 135 additions & 15 deletions pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
package etcdutil

import (
"bytes"
"context"
"crypto/sha256"
"crypto/tls"
"fmt"
"io"
"os"
"path/filepath"
"time"

"github.com/gardener/etcd-backup-restore/pkg/compressor"
Expand All @@ -23,6 +28,10 @@ import (
"go.etcd.io/etcd/pkg/transport"
)

const (
hashBufferSize = 4 * 1024 * 1024 // 4 MB
)

// NewFactory returns a Factory that constructs new clients using the supplied ETCD client configuration.
func NewFactory(cfg brtypes.EtcdConnectionConfig, opts ...client.Option) client.Factory {
options := &client.Options{}
Expand Down Expand Up @@ -232,7 +241,7 @@ func GetEtcdEndPointsSorted(ctx context.Context, clientMaintenance client.Mainte
endPoint = etcdEndpoints[0]
} else {
return nil, nil, &errors.EtcdError{
Message: fmt.Sprintf("etcd endpoints are not passed correctly"),
Message: "etcd endpoints are not passed correctly",
}
}

Expand All @@ -253,44 +262,155 @@ func GetEtcdEndPointsSorted(ctx context.Context, clientMaintenance client.Mainte
return leaderEtcdEndpoints, followerEtcdEndpoints, nil
}

// TakeAndSaveFullSnapshot takes full snapshot and save it to store
func TakeAndSaveFullSnapshot(ctx context.Context, client client.MaintenanceCloser, store brtypes.SnapStore, lastRevision int64, cc *compressor.CompressionConfig, suffix string, isFinal bool, logger *logrus.Entry) (*brtypes.Snapshot, error) {
// TakeAndSaveFullSnapshot does the following operations:
// 1. takes the full snapshot of etcd database
// 2. verify the full snapshot's integrity check
// 3. compress the full snapshot(if compression is enabled)
// 4. finally, save the full snapshot to object store(if configured).
func TakeAndSaveFullSnapshot(ctx context.Context, client client.MaintenanceCloser, store brtypes.SnapStore, tempDir string, lastRevision int64, cc *compressor.CompressionConfig, suffix string, isFinal bool, logger *logrus.Entry) (*brtypes.Snapshot, error) {
startTime := time.Now()
rc, err := client.Snapshot(ctx)
if err != nil {
return nil, &errors.EtcdError{
Message: fmt.Sprintf("failed to create etcd snapshot: %v", err),
}
}
defer rc.Close()
timeTaken := time.Since(startTime)
logger.Infof("Total time taken by Snapshot API: %f seconds.", timeTaken.Seconds())

snapshotTempDBPath := filepath.Join(tempDir, "db")
defer func() {
if err := os.Remove(snapshotTempDBPath); err != nil {
logger.Warnf("failed to remove temporary full snapshot file: %v", err)
}
}()

var snapshotData io.ReadCloser
defer func() {
// to avoid calling Close() on empty ReadCloser,
// it's important to check ReadCloser for nil before calling Close().
if snapshotData != nil {
if err := snapshotData.Close(); err != nil {
logger.Warnf("failed to close snapshot data file: %v", err)
}
}
}()

// check the integrity of full snapshot before compression and upload to object store.
// for more info: https://github.com/gardener/etcd-backup-restore/issues/778
if snapshotData, err = checkFullSnapshotIntegrity(rc, snapshotTempDBPath, logger); err != nil {
logger.Errorf("verification of full snapshot SHA256 hash has failed: %v", err)
return nil, err
}
logger.Info("full snapshot SHA256 hash has been successfully verified.")

if cc.Enabled {
startTimeCompression := time.Now()
rc, err = compressor.CompressSnapshot(rc, cc.CompressionPolicy)
snapshotData, err = compressor.CompressSnapshot(snapshotData, cc.CompressionPolicy)
if err != nil {
return nil, fmt.Errorf("unable to obtain reader for compressed file: %v", err)
}
timeTakenCompression := time.Since(startTimeCompression)
logger.Infof("Total time taken in full snapshot compression: %f seconds.", timeTakenCompression.Seconds())
}
defer rc.Close()

logger.Infof("Successfully opened snapshot reader on etcd")

// Then save the snapshot to the store.
snapshot := snapstore.NewSnapshot(brtypes.SnapshotKindFull, 0, lastRevision, suffix, isFinal)
// save the snapshot to the store.
snapshot, err := saveSnapshotToStore(store, snapshotData, startTime, brtypes.SnapshotKindFull, lastRevision, suffix, isFinal, logger)
if err != nil {
return nil, err
}

return snapshot, nil
}

// checkFullSnapshotIntegrity verifies the integrity of the full snapshot by comparing
// the appended SHA256 hash of the full snapshot with the calculated SHA256 hash of the full snapshot data.
func checkFullSnapshotIntegrity(snapshotData io.ReadCloser, snapTempDBFilePath string, logger *logrus.Entry) (io.ReadCloser, error) {
logger.Info("checking the full snapshot integrity with the help of SHA256")

// If previous temp db file already exist then remove it.
if err := os.Remove(snapTempDBFilePath); err != nil && !os.IsNotExist(err) {
return nil, err
}

// Note: db file will be closed by caller function.
db, err := os.OpenFile(snapTempDBFilePath, os.O_RDWR|os.O_CREATE, 0600) // #nosec G304 -- this is a trusted file written by etcdbr.
if err != nil {
return nil, err
}

buf := make([]byte, hashBufferSize)

if _, err := io.CopyBuffer(db, snapshotData, buf); err != nil {
return nil, err
}

lastOffset, err := db.Seek(0, io.SeekEnd)
if err != nil {
return nil, err
}

// 512 is chosen because it's a minimum disk sector size in most systems.
if hasHash := (lastOffset % 512) == sha256.Size; !hasHash {
return nil, fmt.Errorf("SHA256 hash seems to be missing from snapshot data")
}

totalSnapshotBytes, err := db.Seek(-sha256.Size, io.SeekEnd)
if err != nil {
return nil, err
}

// get snapshot SHA256 hash
sha := make([]byte, sha256.Size)
if _, err := db.Read(sha); err != nil {
return nil, fmt.Errorf("failed to read SHA256 from snapshot data %v", err)
}

hash := sha256.New()

logger.Infof("Total no. of bytes received from snapshot api call with SHA: %d", lastOffset)
logger.Infof("Total no. of bytes received from snapshot api call without SHA: %d", totalSnapshotBytes)

// reset the file pointer back to starting
if _, err := db.Seek(0, io.SeekStart); err != nil {
return nil, err
}

snapshotDataReader := io.LimitReader(db, totalSnapshotBytes)

if _, err := io.CopyBuffer(hash, snapshotDataReader, buf); err != nil {
return nil, fmt.Errorf("unable to calculate SHA256 for full snapshot: %v", err)
}

dbSha := hash.Sum(nil)
if !bytes.Equal(sha, dbSha) {
return nil, fmt.Errorf("expected SHA256 for full snapshot: %x, got %x", sha, dbSha)
}

// reset the file pointer back to starting
if _, err := db.Seek(0, io.SeekStart); err != nil {
return nil, err
}

// full-snapshot of database has been successfully verified.
return db, nil
}

// saveSnapshotToStore save the snapshot to object store
func saveSnapshotToStore(store brtypes.SnapStore, rc io.ReadCloser, startTime time.Time, snapshotKind string, lastRevision int64, suffix string, isFinal bool, logger *logrus.Entry) (*brtypes.Snapshot, error) {
snapshot := snapstore.NewSnapshot(snapshotKind, 0, lastRevision, suffix, isFinal)

// save the snapshot to object store
if err := store.Save(*snapshot, rc); err != nil {
timeTaken := time.Since(startTime)
metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindFull, metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Observe(timeTaken.Seconds())
metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: snapshot.Kind, metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Observe(timeTaken.Seconds())
return nil, &errors.SnapstoreError{
Message: fmt.Sprintf("failed to save snapshot: %v", err),
}
}

timeTaken = time.Since(startTime)
metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindFull, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Observe(timeTaken.Seconds())
logger.Infof("Total time to save full snapshot: %f seconds.", timeTaken.Seconds())

timeTaken := time.Since(startTime)
metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: snapshot.Kind, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Observe(timeTaken.Seconds())
logger.Infof("Total time to save %s snapshot: %f seconds.", snapshot.Kind, timeTaken.Seconds())
return snapshot, nil
}
52 changes: 52 additions & 0 deletions pkg/etcdutil/etcdutil_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Gardener contributors
//
// SPDX-License-Identifier: Apache-2.0

package etcdutil_test

import (
"context"
"os"
"testing"

"github.com/gardener/etcd-backup-restore/test/utils"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/embed"
)

const (
outputDir = "../../test/output"
etcdDir = outputDir + "/default.etcd"
podName = "etcd-test-0"
)

var (
logger = logrus.New().WithField("suite", "etcdutil")
err error
testCtx = context.Background()
etcd *embed.Etcd
)

func TestEtcdUtil(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "EtcdUtil")
}

var _ = SynchronizedBeforeSuite(func() []byte {
err = os.RemoveAll(outputDir)
Expect(err).ShouldNot(HaveOccurred())

etcd, err = utils.StartEmbeddedEtcd(testCtx, etcdDir, logger, podName, "")
Expect(err).ShouldNot(HaveOccurred())
var data []byte
etcd.Server.Stop()
etcd.Close()
return data
}, func(data []byte) {})

var _ = SynchronizedAfterSuite(func() {}, func() {
err = os.RemoveAll(outputDir)
Expect(err).ShouldNot(HaveOccurred())
})
Loading