Skip to content

Commit

Permalink
concurrently repairing indexes
Browse files Browse the repository at this point in the history
Signed-off-by: Jianjun Liao <[email protected]>
  • Loading branch information
Leavrth committed Jan 23, 2025
1 parent 9c37399 commit 94d9efc
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 26 deletions.
3 changes: 3 additions & 0 deletions br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ type CheckpointIngestIndexRepairSQL struct {
IndexName string `json:"index-name"`
AddSQL string `json:"add-sql"`
AddArgs []any `json:"add-args"`

OldIndexIDFound bool `json:"-"`
IndexRepaired bool `json:"-"`
}

type CheckpointIngestIndexRepairSQLs struct {
Expand Down
80 changes: 80 additions & 0 deletions br/pkg/glue/progressing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
"fmt"
"io"
"os"
"sync/atomic"
"time"

"github.com/fatih/color"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/vbauerster/mpb/v7"
"github.com/vbauerster/mpb/v7/decor"
"go.uber.org/zap"
"golang.org/x/term"
)

Expand Down Expand Up @@ -188,3 +191,80 @@ func buildOneTaskBar(pb *mpb.Progress, title string, total int) *mpb.Bar {
color.RedString("ABORTED"))),
)
}

type ProgressBar interface {
Increment()
Done()
}

type MultiProgress interface {
AddTextBar(string, int64) ProgressBar
Wait()
}

func (ops ConsoleOperations) StartMultiProgress() MultiProgress {
if !ops.OutputIsTTY() {
return &NopMultiProgress{}
}
pb := mpb.New(mpb.WithOutput(ops.Out()), mpb.WithRefreshRate(400*time.Millisecond))
return &TerminalMultiProgress{
progress: pb,
}
}

type NopMultiProgress struct{}

type LogBar struct {
name string
total int64
}

func (nmp *NopMultiProgress) AddTextBar(name string, total int64) ProgressBar {
log.Info("progress start", zap.String("name", name))
return &LogBar{
name: name,
total: total,
}
}

func (nmp *NopMultiProgress) Wait() {}

func (lb *LogBar) Increment() {
if atomic.AddInt64(&lb.total, -1) <= 0 {
log.Info("progress done", zap.String("name", lb.name))
}
}

func (lb *LogBar) Done() {}

type TerminalBar struct {
bar *mpb.Bar
}

func (tb *TerminalBar) Increment() {
tb.bar.Increment()
}

func (tb *TerminalBar) Done() {
tb.bar.Abort(false)
tb.bar.Wait()
}

type TerminalMultiProgress struct {
progress *mpb.Progress
}

func (tmp *TerminalMultiProgress) AddTextBar(name string, total int64) ProgressBar {
bar := tmp.progress.New(total,
mpb.NopStyle(),
mpb.PrependDecorators(decor.Name(name)),
mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText),
color.RedString("ABORTED"),
)),
)
return &TerminalBar{bar: bar}
}

func (tmp *TerminalMultiProgress) Wait() {
tmp.progress.Wait()
}
106 changes: 80 additions & 26 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ const maxSplitKeysOnce = 10240
// rawKVBatchCount specifies the count of entries that the rawkv client puts into TiKV.
const rawKVBatchCount = 64

// session count for repairing ingest indexes
const defaultRepairIndexSessionCount uint = 10

// LogRestoreManager is a comprehensive wrapper that encapsulates all logic related to log restoration,
// including concurrency management, checkpoint handling, and file importing for efficient log processing.
type LogRestoreManager struct {
Expand Down Expand Up @@ -452,16 +455,48 @@ func (rc *LogClient) CleanUpKVFiles(
return rc.logRestoreManager.fileImporter.ClearFiles(ctx, rc.pdClient, "v1")
}

// Init create db connection and domain for storage.
func (rc *LogClient) Init(ctx context.Context, g glue.Glue, store kv.Storage) error {
var err error
rc.unsafeSession, err = g.CreateSession(store)
func createSession(ctx context.Context, g glue.Glue, store kv.Storage) (glue.Session, error) {
unsafeSession, err := g.CreateSession(store)
if err != nil {
return errors.Trace(err)
return nil, errors.Trace(err)
}

// Set SQL mode to None for avoiding SQL compatibility problem
err = rc.unsafeSession.Execute(ctx, "set @@sql_mode=''")
err = unsafeSession.Execute(ctx, "set @@sql_mode=''")
if err != nil {
return nil, errors.Trace(err)
}
return unsafeSession, nil
}

func createSessions(ctx context.Context, g glue.Glue, store kv.Storage, count uint) (unsafeSessions []glue.Session, createErr error) {
unsafeSessions = make([]glue.Session, 0, count)
defer func() {
if createErr != nil {
closeSessions(unsafeSessions)
}
}()
for range count {
unsafeSession, err := createSession(ctx, g, store)
if err != nil {
return nil, errors.Trace(err)
}
unsafeSessions = append(unsafeSessions, unsafeSession)
}
return unsafeSessions, nil
}

func closeSessions(sessions []glue.Session) {
for _, session := range sessions {
if session != nil {
session.Close()
}
}
}

// Init create db connection and domain for storage.
func (rc *LogClient) Init(ctx context.Context, g glue.Glue, store kv.Storage) error {
var err error
rc.unsafeSession, err = createSession(ctx, g, store)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1761,39 +1796,60 @@ func (rc *LogClient) RepairIngestIndex(ctx context.Context, ingestRecorder *inge

info := rc.dom.InfoSchema()
console := glue.GetConsole(g)
NEXTSQL:
for _, sql := range sqls {
progressTitle := fmt.Sprintf("repair ingest index %s for table %s.%s", sql.IndexName, sql.SchemaName, sql.TableName)

tableInfo, err := info.TableByName(ctx, sql.SchemaName, sql.TableName)
if err != nil {
return errors.Trace(err)
}
oldIndexIDFound := false
sql.OldIndexIDFound = false
sql.IndexRepaired = false
if fromCheckpoint {
for _, idx := range tableInfo.Indices() {
indexInfo := idx.Meta()
if indexInfo.ID == sql.IndexID {
// the original index id is not dropped
oldIndexIDFound = true
sql.OldIndexIDFound = true
break
}
// what if index's state is not public?
if indexInfo.Name.O == sql.IndexName {
progressTitle := fmt.Sprintf("repair ingest index %s for table %s.%s", sql.IndexName, sql.SchemaName, sql.TableName)
// find the same name index, but not the same index id,
// which means the repaired index id is created
if _, err := fmt.Fprintf(console.Out(), "%s ... %s\n", progressTitle, color.HiGreenString("SKIPPED DUE TO CHECKPOINT MODE")); err != nil {
return errors.Trace(err)
}
continue NEXTSQL
sql.IndexRepaired = true
break
}
}
}
}

if err := func(sql checkpoint.CheckpointIngestIndexRepairSQL) error {
w := console.StartProgressBar(progressTitle, glue.OnlyOneTask)
defer w.Close()
sessionCount := defaultRepairIndexSessionCount
unsafeSessions, err := createSessions(ctx, g, rc.dom.Store(), sessionCount)
if err != nil {
return errors.Trace(err)
}
defer func() {
closeSessions(unsafeSessions)
}()
workerpool := tidbutil.NewWorkerPool(sessionCount, "repair ingest index")
eg, ectx := errgroup.WithContext(ctx)
mp := console.StartMultiProgress()
for _, sql := range sqls {
if sql.IndexRepaired {
continue
}
if ectx.Err() != nil {
break
}
progressTitle := fmt.Sprintf("repair ingest index %s for table %s.%s", sql.IndexName, sql.SchemaName, sql.TableName)
w := mp.AddTextBar(progressTitle, 1)
workerpool.ApplyWithIDInErrorGroup(eg, func(id uint64) error {
defer w.Done()

unsafeSession := unsafeSessions[id]
// TODO: When the TiDB supports the DROP and CREATE the same name index in one SQL,
// the checkpoint for ingest recorder can be removed and directly use the SQL:
// ALTER TABLE db.tbl DROP INDEX `i_1`, ADD IDNEX `i_1` ...
Expand All @@ -1804,8 +1860,8 @@ NEXTSQL:
// restored metakv and then skips repairing it.

// only when first execution or old index id is not dropped
if !fromCheckpoint || oldIndexIDFound {
if err := rc.unsafeSession.ExecuteInternal(ctx, alterTableDropIndexSQL, sql.SchemaName.O, sql.TableName.O, sql.IndexName); err != nil {
if !fromCheckpoint || sql.OldIndexIDFound {
if err := unsafeSession.ExecuteInternal(ectx, alterTableDropIndexSQL, sql.SchemaName.O, sql.TableName.O, sql.IndexName); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -1815,17 +1871,15 @@ NEXTSQL:
}
})
// create the repaired index when first execution or not found it
if err := rc.unsafeSession.ExecuteInternal(ctx, sql.AddSQL, sql.AddArgs...); err != nil {
return errors.Trace(err)
}
w.Inc()
if err := w.Wait(ctx); err != nil {
if err := unsafeSession.ExecuteInternal(ectx, sql.AddSQL, sql.AddArgs...); err != nil {
return errors.Trace(err)
}
w.Increment()
return nil
}(sql); err != nil {
return errors.Trace(err)
}
})
}
if err := eg.Wait(); err != nil {
return errors.Trace(err)
}

return nil
Expand Down

0 comments on commit 94d9efc

Please sign in to comment.