Skip to content

PBM-1511: Configuration for Fallback dbpath feature #1139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c940f3d
Add --fallback-enabled option for cli
boris-ilijic May 26, 2025
eb15872
Add --allow-partly-done option for cli
boris-ilijic May 26, 2025
8d34c16
Inject fallback options into phys restore
boris-ilijic May 26, 2025
489486b
Expand backup meta with backup's size on RS
boris-ilijic May 26, 2025
13a32f2
Add free disk space validation for fallback sync
boris-ilijic May 26, 2025
cfab52c
Add logic for --fallback-enabled and
boris-ilijic May 27, 2025
92df5b1
Improve logging and error messages for fallback
boris-ilijic May 28, 2025
71752a3
Expand cleanup strategy for all use cases
boris-ilijic May 29, 2025
345b400
Apply reviewdog fixes
boris-ilijic May 29, 2025
d79da30
Merge remote-tracking branch 'origin/dev' into PBM-1511-fallback-conf…
boris-ilijic Jun 3, 2025
41e275c
Add error handling and logging when shard failed
boris-ilijic Jun 3, 2025
8210b36
Switch fallback strategy for older backups
boris-ilijic Jun 3, 2025
cdd4e0c
Add config params for fallback-enabled & ...
boris-ilijic Jun 4, 2025
2c141ea
Expand backup meta with uncompressed size
boris-ilijic Jun 4, 2025
a0f6724
Enable RS backup size calculation for inc restore
boris-ilijic Jun 5, 2025
12861d2
Improve formula for checking free disk space
boris-ilijic Jun 5, 2025
61551ee
Add validation for fallback & allowPartlyDone opts
boris-ilijic Jun 5, 2025
e41a8c4
Log more info about storage size metrics
boris-ilijic Jun 6, 2025
358c353
Update restore meta and log in case of fallback
boris-ilijic Jun 6, 2025
13f5b32
Add meta creation in case of full RS error
boris-ilijic Jun 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion cmd/pbm-agent/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,25 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
lck = nil
}

fallbackOpt := cfg.Restore.GetFallbackEnabled()
if r.Fallback != nil {
fallbackOpt = *r.Fallback
}
allowPartlyDoneOpt := cfg.Restore.GetAllowPartlyDone()
if r.AllowPartlyDone != nil {
allowPartlyDoneOpt = *r.AllowPartlyDone
}

var rstr *restore.PhysRestore
rstr, err = restore.NewPhysical(ctx, a.leadConn, a.nodeConn, nodeInfo, r.RSMap)
rstr, err = restore.NewPhysical(
ctx,
a.leadConn,
a.nodeConn,
nodeInfo,
r.RSMap,
fallbackOpt,
allowPartlyDoneOpt,
)
if err != nil {
l.Error("init physical backup: %v", err)
return
Expand Down
19 changes: 19 additions & 0 deletions cmd/pbm/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,9 @@ type bcpDesc struct {
PBMVersion string `json:"pbm_version" yaml:"pbm_version"`
Status defs.Status `json:"status" yaml:"status"`
Size int64 `json:"size" yaml:"-"`
SizeUncompressed int64 `json:"size_uncompressed" yaml:"-"`
HSize string `json:"size_h" yaml:"size_h"`
HSizeUncompressed string `json:"size_uncompressed_h" yaml:"size_uncompressed_h"`
StorageName string `json:"storage_name,omitempty" yaml:"storage_name,omitempty"`
Err *string `json:"error,omitempty" yaml:"error,omitempty"`
Replsets []bcpReplDesc `json:"replsets" yaml:"replsets"`
Expand All @@ -340,6 +342,10 @@ type bcpReplDesc struct {
Status defs.Status `json:"status" yaml:"status"`
Node string `json:"node" yaml:"node"`
Files []backup.File `json:"files,omitempty" yaml:"-"`
Size int64 `json:"size" yaml:"-"`
SizeUncompressed int64 `json:"size_uncompressed" yaml:"-"`
HSize string `json:"size_h,omitempty" yaml:"size_h,omitempty"`
HSizeUncompressed string `json:"size_uncompressed_h" yaml:"size_uncompressed_h"`
LastWriteTS int64 `json:"last_write_ts" yaml:"-"`
LastTransitionTS int64 `json:"last_transition_ts" yaml:"-"`
LastWriteTime string `json:"last_write_time" yaml:"last_write_time"`
Expand Down Expand Up @@ -417,8 +423,13 @@ func describeBackup(
Status: bcp.Status,
Size: bcp.Size,
HSize: byteCountIEC(bcp.Size),
SizeUncompressed: bcp.SizeUncompressed,
HSizeUncompressed: byteCountIEC(bcp.SizeUncompressed),
StorageName: bcp.Store.Name,
}
if bcp.SizeUncompressed > 0 {
rv.HSizeUncompressed = byteCountIEC(bcp.SizeUncompressed)
}
if bcp.Err != "" {
rv.Err = &bcp.Err
}
Expand All @@ -442,6 +453,8 @@ func describeBackup(
IsConfigSvr: r.IsConfigSvr,
IsConfigShard: r.IsConfigShard,
Status: r.Status,
Size: r.Size,
SizeUncompressed: r.SizeUncompressed,
LastWriteTS: int64(r.LastWriteTS.T),
LastTransitionTS: r.LastTransitionTS,
LastWriteTime: time.Unix(int64(r.LastWriteTS.T), 0).UTC().Format(time.RFC3339),
Expand All @@ -457,6 +470,12 @@ func describeBackup(
if bcp.Type == defs.ExternalBackup {
rv.Replsets[i].Files = r.Files
}
if r.Size > 0 {
rv.Replsets[i].HSize = byteCountIEC(r.Size)
}
if r.SizeUncompressed > 0 {
rv.Replsets[i].HSizeUncompressed = byteCountIEC(r.SizeUncompressed)
}

if !b.coll || bcp.Type != defs.LogicalBackup {
continue
Expand Down
17 changes: 17 additions & 0 deletions cmd/pbm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
follow bool
}

type cliResult interface {

Check failure on line 58 in cmd/pbm/main.go

View workflow job for this annotation

GitHub Actions / runner / golangci-lint

type `cliResult` is unused (unused)
HasError() bool
}

Expand Down Expand Up @@ -752,6 +752,14 @@
if len(args) == 1 {
restoreOptions.bcp = args[0]
}
if cmd.Flags().Changed("fallback-enabled") {
val, _ := cmd.Flags().GetBool("fallback-enabled")
restoreOptions.fallback = &val
}
if cmd.Flags().Changed("allow-partly-done") {
val, _ := cmd.Flags().GetBool("allow-partly-done")
restoreOptions.allowPartlyDone = &val
}
return runRestore(app.ctx, app.conn, app.pbm, &restoreOptions, app.node, app.pbmOutF)
}),
}
Expand Down Expand Up @@ -808,6 +816,15 @@
&restoreOptions.ts, "ts", "",
"MongoDB cluster time to restore to. In <T,I> format (e.g. 1682093090,9). External backups only!",
)
restoreCmd.Flags().Bool(
"fallback-enabled", false, "Enables/disables fallback strategy when doing physical restore.",
)
restoreCmd.Flags().Bool(
"allow-partly-done", false,
"Allows parly done state of the cluster after physical restore. "+
"If enabled (default), partly-done status for RS will be treated as successful restore."+
"If disabled, fallback will be applied when cluster is partly-done.",
)

restoreCmd.Flags().StringVar(&restoreOptions.rsMap, RSMappingFlag, "", RSMappingDoc)
_ = viper.BindPFlag(RSMappingFlag, restoreCmd.Flags().Lookup(RSMappingFlag))
Expand Down
42 changes: 29 additions & 13 deletions cmd/pbm/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,21 @@ var (
)

type restoreOpts struct {
bcp string
pitr string
pitrBase string
wait bool
waitTime time.Duration
extern bool
ns string
nsFrom string
nsTo string
usersAndRoles bool
rsMap string
conf string
ts string
bcp string
pitr string
pitrBase string
wait bool
waitTime time.Duration
extern bool
ns string
nsFrom string
nsTo string
usersAndRoles bool
rsMap string
conf string
ts string
fallback *bool
allowPartlyDone *bool

numParallelColls int32
numInsertionWorkers int32
Expand Down Expand Up @@ -139,6 +141,9 @@ func runRestore(
if err := validateRestoreUsersAndRoles(o.usersAndRoles, nss); err != nil {
return nil, errors.Wrap(err, "parse --with-users-and-roles option")
}
if err := validateFallbackOpts(o); err != nil {
return nil, err
}

rsMap, err := parseRSNamesMapping(o.rsMap)
if err != nil {
Expand Down Expand Up @@ -415,6 +420,8 @@ func doRestore(
UsersAndRoles: o.usersAndRoles,
RSMap: rsMapping,
External: o.extern,
Fallback: o.fallback,
AllowPartlyDone: o.allowPartlyDone,
},
}
if o.pitr != "" {
Expand Down Expand Up @@ -812,6 +819,15 @@ func validateNSFromNSTo(o *restoreOpts) error {
return nil
}

func validateFallbackOpts(o *restoreOpts) error {
if o.fallback != nil && !*o.fallback &&
o.allowPartlyDone != nil && !*o.allowPartlyDone {
return errors.New("It's not possible to disable both --allow-partly-done " +
"and --fallback-enabled at the same time.")
}
return nil
}

func parseCLINumInsertionWorkersOption(value int32) (*int32, error) {
if value < 0 {
return nil, errors.New("Number of insertion workers has to be greater than zero.")
Expand Down
2 changes: 1 addition & 1 deletion pbm/backup/logical.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (b *Backup) doLogical(
}
}

err = IncBackupSize(ctx, b.leadConn, bcp.Name, snapshotSize+oplogSize)
err = IncBackupSize(ctx, b.leadConn, bcp.Name, snapshotSize+oplogSize, nil)
if err != nil {
return errors.Wrap(err, "inc backup size")
}
Expand Down
23 changes: 22 additions & 1 deletion pbm/backup/physical.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,10 @@
filelist = append(filelist, ju...)

size := int64(0)
sizeUncompressed := int64(0)
for _, f := range filelist {
size += f.StgSize
sizeUncompressed += f.Size
}

filelistPath := path.Join(bcp.Name, rsMeta.Name, FilelistName)
Expand All @@ -526,10 +528,29 @@
}
l.Info("uploaded: %q %s", filelistPath, storage.PrettySize(flSize))

err = IncBackupSize(ctx, b.leadConn, bcp.Name, size+flSize)
totalSize := size + flSize
totalUncompressed := sizeUncompressed + flSize
err = IncBackupSize(
ctx,
b.leadConn,
bcp.Name,
totalSize,
&totalUncompressed,
)
if err != nil {
return errors.Wrap(err, "inc backup size")
}
err = SetBackupSizeForRS(
ctx,
b.leadConn,
bcp.Name,
rsMeta.Name,
totalSize,
totalUncompressed,
)
if err != nil {
return errors.Wrap(err, "set RS backup size")
}

return nil
}
Expand All @@ -554,7 +575,7 @@
}

// UUID represents a UUID as saved in MongoDB
type UUID struct{ uuid.UUID }

Check failure on line 578 in pbm/backup/physical.go

View workflow job for this annotation

GitHub Actions / runner / golangci-lint

the methods of "UUID" use pointer receiver and non-pointer receiver. (recvcheck)

// MarshalBSONValue implements the bson.ValueMarshaler interface.
func (id UUID) MarshalBSONValue() (bsontype.Type, []byte, error) {
Expand Down Expand Up @@ -672,7 +693,7 @@
stg storage.Storage,
compression compress.CompressionType,
compressLevel *int,
l log.LogEvent,

Check failure on line 696 in pbm/backup/physical.go

View workflow job for this annotation

GitHub Actions / runner / golangci-lint

`writeFile` - `l` is unused (unparam)
) (*File, error) {
fstat, err := os.Stat(src.Name)
if err != nil {
Expand Down
42 changes: 40 additions & 2 deletions pbm/backup/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,52 @@ func ChangeRSState(conn connect.Client, bcpName, rsName string, s defs.Status, m
return err
}

func IncBackupSize(ctx context.Context, conn connect.Client, bcpName string, size int64) error {
// IncBackupSize increments total backup size.
func IncBackupSize(
ctx context.Context,
conn connect.Client,
bcpName string,
size int64,
sizeUncompressed *int64,
) error {
update := bson.D{
{"$inc", bson.M{"size": size}},
}
if sizeUncompressed != nil {
update = append(
update,
bson.E{"$inc", bson.M{"size_uncompressed": sizeUncompressed}},
)
}

_, err := conn.BcpCollection().UpdateOne(ctx,
bson.D{{"name", bcpName}},
bson.D{{"$inc", bson.M{"size": size}}})
update,
)

return err
}

// SetBackupSizeForRS sets size of backup for specified RS.
func SetBackupSizeForRS(
ctx context.Context,
conn connect.Client,
bcpName,
rsName string,
size int64,
sizeUncompressed int64,
) error {
_, err := conn.BcpCollection().UpdateOne(
ctx,
bson.D{{"name", bcpName}, {"replsets.name", rsName}},
bson.D{
{"$set", bson.M{"replsets.$.size": size}},
{"$set", bson.M{"replsets.$.size_uncompressed": sizeUncompressed}},
},
)
return err
}

func SetRSLastWrite(conn connect.Client, bcpName, rsName string, ts primitive.Timestamp) error {
_, err := conn.BcpCollection().UpdateOne(
context.Background(),
Expand Down
3 changes: 3 additions & 0 deletions pbm/backup/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
Compression compress.CompressionType `bson:"compression" json:"compression"`
Store Storage `bson:"store" json:"store"`
Size int64 `bson:"size" json:"size"`
SizeUncompressed int64 `bson:"size_uncompressed" json:"size_uncompressed"`
MongoVersion string `bson:"mongodb_version" json:"mongodb_version"`
FCV string `bson:"fcv" json:"fcv"`
StartTS int64 `bson:"start_ts" json:"start_ts"`
Expand Down Expand Up @@ -117,6 +118,8 @@
OplogName string `bson:"oplog_name,omitempty" json:"oplog_name,omitempty"`
StartTS int64 `bson:"start_ts" json:"start_ts"`
Status defs.Status `bson:"status" json:"status"`
Size int64 `bson:"size" json:"size"`
SizeUncompressed int64 `bson:"size_uncompressed" json:"size_uncompressed"`
IsConfigSvr *bool `bson:"iscs,omitempty" json:"iscs,omitempty"`
IsConfigShard *bool `bson:"configshard,omitempty" json:"configshard,omitempty"`
LastTransitionTS int64 `bson:"last_transition_ts" json:"last_transition_ts"`
Expand All @@ -142,7 +145,7 @@
Error string `bson:"error,omitempty" json:"error,omitempty"`
}

type File struct {

Check failure on line 148 in pbm/backup/types.go

View workflow job for this annotation

GitHub Actions / runner / golangci-lint

the methods of "File" use pointer receiver and non-pointer receiver. (recvcheck)
Name string `bson:"filename" json:"filename"`
Off int64 `bson:"offset" json:"offset"` // offset for incremental backups
Len int64 `bson:"length" json:"length"` // length of chunk after the offset
Expand Down
19 changes: 19 additions & 0 deletions pbm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@ type RestoreConf struct {
// physical restore. Will try $PATH/mongod if not set.
MongodLocation string `bson:"mongodLocation" json:"mongodLocation,omitempty" yaml:"mongodLocation,omitempty"`
MongodLocationMap map[string]string `bson:"mongodLocationMap" json:"mongodLocationMap,omitempty" yaml:"mongodLocationMap,omitempty"`

FallbackEnabled *bool `bson:"fallbackEnabled,omitempty" json:"fallbackEnabled,omitempty" yaml:"fallbackEnabled,omitempty"`
AllowPartlyDone *bool `bson:"allowPartlyDone,omitempty" json:"allowPartlyDone,omitempty" yaml:"allowPartlyDone,omitempty"`
}

func (cfg *RestoreConf) Clone() *RestoreConf {
Expand All @@ -388,6 +391,22 @@ func (cfg *RestoreConf) Clone() *RestoreConf {
return &rv
}

// GetFallbackEnabled gets config's or default value for fallbackEnabled
func (cfg *RestoreConf) GetFallbackEnabled() bool {
if cfg != nil && cfg.FallbackEnabled != nil {
return *cfg.FallbackEnabled
}
return true
}

// GetAllowPartlyDone gets config's or default value for allowPartlyDone
func (cfg *RestoreConf) GetAllowPartlyDone() bool {
if cfg != nil && cfg.AllowPartlyDone != nil {
return *cfg.AllowPartlyDone
}
return true
}

//nolint:lll
type BackupConf struct {
OplogSpanMin float64 `bson:"oplogSpanMin" json:"oplogSpanMin" yaml:"oplogSpanMin"`
Expand Down
16 changes: 9 additions & 7 deletions pbm/ctrl/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,15 @@ func (b BackupCmd) String() string {
}

type RestoreCmd struct {
Name string `bson:"name"`
BackupName string `bson:"backupName"`
Namespaces []string `bson:"nss,omitempty"`
NamespaceFrom string `bson:"nsFrom,omitempty"`
NamespaceTo string `bson:"nsTo,omitempty"`
UsersAndRoles bool `bson:"usersAndRoles,omitempty"`
RSMap map[string]string `bson:"rsMap,omitempty"`
Name string `bson:"name"`
BackupName string `bson:"backupName"`
Namespaces []string `bson:"nss,omitempty"`
NamespaceFrom string `bson:"nsFrom,omitempty"`
NamespaceTo string `bson:"nsTo,omitempty"`
UsersAndRoles bool `bson:"usersAndRoles,omitempty"`
RSMap map[string]string `bson:"rsMap,omitempty"`
Fallback *bool `bson:"fallbackEnabled"`
AllowPartlyDone *bool `bson:"allowPartlyDone"`

NumParallelColls *int32 `bson:"numParallelColls,omitempty"`
NumInsertionWorkers *int32 `bson:"numInsertionWorkers,omitempty"`
Expand Down
Loading
Loading