@@ -483,11 +483,11 @@ func (s *streamMgr) checkStreamStartEnable(ctx context.Context) error {
483
483
return nil
484
484
}
485
485
486
- type RestoreGcFunc func (string ) error
486
+ type RestoreGCFunc func (string ) error
487
487
488
488
// DisableGC disables and returns a function that can enable gc back.
489
489
// gc.ratio-threshold = "-1.0", which represents disable gc in TiKV.
490
- func DisableGC (g glue.Glue , store kv.Storage ) (RestoreGcFunc , string , error ) {
490
+ func DisableGC (g glue.Glue , store kv.Storage ) (RestoreGCFunc , string , error ) {
491
491
se , err := g .CreateSession (store )
492
492
if err != nil {
493
493
return nil , "" , errors .Trace (err )
@@ -1328,6 +1328,8 @@ func RunStreamRestore(
1328
1328
failpoint .Return (errors .New ("failpoint: failed before full restore" ))
1329
1329
})
1330
1330
1331
+ // restore log.
1332
+ cfg .adjustRestoreConfigForStreamRestore ()
1331
1333
cfg .tiflashRecorder = tiflashrec .New ()
1332
1334
logClient , err := createLogClient (ctx , g , cfg , mgr )
1333
1335
if err != nil {
@@ -1385,8 +1387,6 @@ func RunStreamRestore(
1385
1387
cfg .tiflashRecorder .Load (taskInfo .CheckpointInfo .Metadata .TiFlashItems )
1386
1388
}
1387
1389
}
1388
- // restore log.
1389
- cfg .adjustRestoreConfigForStreamRestore ()
1390
1390
logRestoreConfig := & LogRestoreConfig {
1391
1391
RestoreConfig : cfg ,
1392
1392
checkpointTaskInfo : taskInfo .CheckpointInfo ,
@@ -1495,7 +1495,7 @@ func restoreStream(
1495
1495
1496
1496
// It need disable GC in TiKV when PiTR.
1497
1497
// because the process of PITR is concurrent and kv events isn't sorted by tso.
1498
- restoreGcFunc , oldGcRatio , err := DisableGC (g , mgr .GetStorage ())
1498
+ restoreGCFunc , oldGCRatio , err := DisableGC (g , mgr .GetStorage ())
1499
1499
if err != nil {
1500
1500
return errors .Trace (err )
1501
1501
}
@@ -1509,24 +1509,24 @@ func restoreStream(
1509
1509
1510
1510
// If the oldGcRatio is negative, which is not normal status.
1511
1511
// It should set default value "1.1" after PiTR finished.
1512
- if strings .HasPrefix (oldGcRatio , "-" ) {
1513
- log .Warn ("the original gc-ratio is negative, reset by default value 1.1" , zap .String ("old-gc-ratio" , oldGcRatio ))
1514
- oldGcRatio = utils .DefaultGcRatioVal
1512
+ if strings .HasPrefix (oldGCRatio , "-" ) {
1513
+ log .Warn ("the original gc-ratio is negative, reset by default value 1.1" , zap .String ("old-gc-ratio" , oldGCRatio ))
1514
+ oldGCRatio = utils .DefaultGcRatioVal
1515
1515
}
1516
- log .Info ("start to restore gc" , zap .String ("ratio" , oldGcRatio ))
1517
- if err := restoreGcFunc ( oldGcRatio ); err != nil {
1516
+ log .Info ("start to restore gc" , zap .String ("ratio" , oldGCRatio ))
1517
+ if err := restoreGCFunc ( oldGCRatio ); err != nil {
1518
1518
log .Error ("failed to restore gc" , zap .Error (err ))
1519
1519
}
1520
1520
log .Info ("finish restoring gc" )
1521
1521
}()
1522
1522
1523
1523
var sstCheckpointSets map [string ]struct {}
1524
1524
if cfg .UseCheckpoint {
1525
- gcRatioFromCheckpoint , err := client .LoadOrCreateCheckpointMetadataForLogRestore (ctx , cfg .StartTS , cfg .RestoreTS , oldGcRatio , cfg .tiflashRecorder )
1525
+ gcRatioFromCheckpoint , err := client .LoadOrCreateCheckpointMetadataForLogRestore (ctx , cfg .StartTS , cfg .RestoreTS , oldGCRatio , cfg .tiflashRecorder )
1526
1526
if err != nil {
1527
1527
return errors .Trace (err )
1528
1528
}
1529
- oldGcRatio = gcRatioFromCheckpoint
1529
+ oldGCRatio = gcRatioFromCheckpoint
1530
1530
sstCheckpointSets , err = client .InitCheckpointMetadataForCompactedSstRestore (ctx )
1531
1531
if err != nil {
1532
1532
return errors .Trace (err )
@@ -1734,7 +1734,8 @@ func createLogClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr *
1734
1734
}
1735
1735
return nil , nil
1736
1736
}
1737
- err = client .InitClients (ctx , u , createCheckpointSessionFn , uint (cfg .Concurrency ), cfg .ConcurrencyPerStore .Value )
1737
+
1738
+ err = client .InitClients (ctx , u , createCheckpointSessionFn , uint (cfg .PitrConcurrency ), cfg .ConcurrencyPerStore .Value )
1738
1739
if err != nil {
1739
1740
return nil , errors .Trace (err )
1740
1741
}
0 commit comments