Skip to content

Commit

Permalink
statistics: speed up the backgroud stats update worker
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros committed Jan 21, 2025
1 parent f5285d8 commit cdc64bd
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 72 deletions.
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/systable_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,5 @@ func TestCheckSysTableCompatibility(t *testing.T) {
//
// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(241), session.CurrentBootstrapVersion)
require.Equal(t, int64(242), session.CurrentBootstrapVersion)
}
3 changes: 3 additions & 0 deletions pkg/executor/hot_regions_history_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ func TestTiDBHotRegionsHistory(t *testing.T) {
// mysql table_id = 21 ,index_id = 1, table_name = STATS_META, index_name = IDX_VER
{"2019-10-10 10:10:19", "MYSQL", "STATS_META", statsMetaTidStr, "IDX_VER", "1", "3", "3", "33333", "0", "1", "READ", "99", "99", "99", "99"},
{"2019-10-10 10:10:20", "MYSQL", "STATS_META", statsMetaTidStr, "IDX_VER", "1", "4", "4", "44444", "0", "0", "WRITE", "99", "99", "99", "99"},
// mysql table_id = 21, index_id = 3, table_name = STATS_META, index_name = INDEX_ANALYZE_VERSION
{"2019-10-10 10:10:21", "MYSQL", "STATS_META", statsMetaTidStr, "IDX_ANALYZE_VERSION", "2", "5", "5", "55555", "0", "1", "READ", "99", "99", "99", "99"},
{"2019-10-10 10:10:22", "MYSQL", "STATS_META", statsMetaTidStr, "IDX_ANALYZE_VERSION", "2", "6", "6", "66666", "0", "0", "WRITE", "99", "99", "99", "99"},
// mysql table_id = 21 ,index_id = 2, table_name = STATS_META, index_name = TBL
{"2019-10-10 10:10:21", "MYSQL", "STATS_META", statsMetaTidStr, "TBL", "2", "5", "5", "55555", "0", "1", "READ", "99", "99", "99", "99"},
{"2019-10-10 10:10:22", "MYSQL", "STATS_META", statsMetaTidStr, "TBL", "2", "6", "6", "66666", "0", "0", "WRITE", "99", "99", "99", "99"},
Expand Down
28 changes: 10 additions & 18 deletions pkg/executor/show_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,27 +146,19 @@ func (e *ShowExec) appendTableForStatsMeta(dbName, tblName, partitionName string
if statsTbl.Pseudo {
return
}
row := make([]any, 7)
row[0] = dbName
row[1] = tblName
row[2] = partitionName
row[3] = e.versionToTime(statsTbl.Version)
row[4] = statsTbl.ModifyCount
row[5] = statsTbl.RealtimeCount
if !statsTbl.IsAnalyzed() {
e.appendRow([]any{
dbName,
tblName,
partitionName,
e.versionToTime(statsTbl.Version),
statsTbl.ModifyCount,
statsTbl.RealtimeCount,
nil,
})
row[6] = nil
} else {
e.appendRow([]any{
dbName,
tblName,
partitionName,
e.versionToTime(statsTbl.Version),
statsTbl.ModifyCount,
statsTbl.RealtimeCount,
e.versionToTime(statsTbl.LastAnalyzeVersion),
})
row[6] = e.versionToTime(statsTbl.LastAnalyzeVersion)
}
e.appendRow(row)
}

func (e *ShowExec) appendTableForStatsLocked(dbName, tblName, partitionName string) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5647,7 +5647,7 @@ func buildShowSchema(s *ast.ShowStmt, isView bool, isSequence bool) (schema *exp
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLong, mysql.TypeVarchar, mysql.TypeString}
case ast.ShowStatsMeta:
names = []string{"Db_name", "Table_name", "Partition_name", "Update_time", "Modify_count", "Row_count", "Last_analyze_time"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeDatetime, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeDatetime}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeDatetime, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeDatetime, mysql.TypeDatetime}
case ast.ShowStatsExtended:
names = []string{"Db_name", "Table_name", "Stats_name", "Column_names", "Stats_type", "Stats_val", "Last_update_version"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong}
Expand Down
29 changes: 23 additions & 6 deletions pkg/session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,16 @@ const (

// CreateStatsMetaTable stores the meta of table statistics.
CreateStatsMetaTable = `CREATE TABLE IF NOT EXISTS mysql.stats_meta (
version BIGINT(64) UNSIGNED NOT NULL,
table_id BIGINT(64) NOT NULL,
modify_count BIGINT(64) NOT NULL DEFAULT 0,
count BIGINT(64) UNSIGNED NOT NULL DEFAULT 0,
snapshot BIGINT(64) UNSIGNED NOT NULL DEFAULT 0,
version BIGINT(64) UNSIGNED NOT NULL,
table_id BIGINT(64) NOT NULL,
modify_count BIGINT(64) NOT NULL DEFAULT 0,
count BIGINT(64) UNSIGNED NOT NULL DEFAULT 0,
snapshot BIGINT(64) UNSIGNED NOT NULL DEFAULT 0,
last_analyze_version BIGINT(64) UNSIGNED DEFAULT NULL,
last_affected_ddl_version BIGINT(64) UNSIGNED DEFAULT NULL,
INDEX idx_ver(version),
INDEX idx_analyze_version(last_analyze_version),
INDEX idx_last_affected_ddl_version(last_affected_ddl_version),
UNIQUE INDEX tbl(table_id)
);`

Expand Down Expand Up @@ -1242,11 +1246,15 @@ const (

// Add index on user field for some mysql tables.
version241 = 241

// version 241
// Add last_affected_ddl_version to mysql.stats_meta.
version242 = 242
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version241
var currentBootstrapVersion int64 = version242

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -1423,6 +1431,7 @@ var (
upgradeToVer239,
upgradeToVer240,
upgradeToVer241,
upgradeToVer242,
}
)

Expand Down Expand Up @@ -3365,6 +3374,14 @@ func upgradeToVer241(s sessiontypes.Session, ver int64) {
doReentrantDDL(s, "ALTER TABLE mysql.default_roles ADD INDEX i_user (user)", dbterror.ErrDupKeyName)
}

func upgradeToVer242(s sessiontypes.Session, ver int64) {
if ver >= version242 {
return
}
doReentrantDDL(s, "ALTER TABLE mysql.stats_meta ADD COLUMN last_analyze_version bigint(20) unsigned DEFAULT NULL", infoschema.ErrColumnExists)
doReentrantDDL(s, "ALTER TABLE mysql.stats_meta ADD COLUMN last_affected_ddl_version bigint(20) unsigned DEFAULT NULL", infoschema.ErrColumnExists)
}

// initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist.
func initGlobalVariableIfNotExists(s sessiontypes.Session, name string, val any) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap)
Expand Down
57 changes: 33 additions & 24 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,31 @@ func (*Handle) initStatsMeta4Chunk(cache statstypes.StatsCache, iter *chunk.Iter
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
physicalID = row.GetInt64(1)
maxPhysicalID = max(physicalID, maxPhysicalID)
// During the initialization phase, we need to initialize LastAnalyzeVersion with the snapshot,
// which ensures that we don't duplicate the auto-analyze of a particular type of table.
// When the predicate columns feature is turned on, if a table has neither predicate columns nor indexes,
// then auto-analyze will only analyze the _row_id and refresh stats_meta,
// but since we don't have any histograms or topn's created for _row_id at the moment.
// So if we don't initialize LastAnalyzeVersion with the snapshot here,
// it will stay at 0 and auto-analyze won't be able to detect that the table has been analyzed.
// But in the future, we maybe will create some records for _row_id, see:
// https://github.com/pingcap/tidb/issues/51098
newHistColl := *statistics.NewHistColl(physicalID, row.GetInt64(3), row.GetInt64(2), 4, 4)
snapshot := row.GetUint64(4)
lastAnalyzeVersion, lastStatsFullUpdateVersion := snapshot, snapshot
if !row.IsNull(5) {
lastAnalyzeVersion = max(lastAnalyzeVersion, row.GetUint64(5))
lastStatsFullUpdateVersion = max(lastStatsFullUpdateVersion, row.GetUint64(5))
}
if !row.IsNull(6) {
lastStatsFullUpdateVersion = max(lastStatsFullUpdateVersion, row.GetUint64(6))
}
tbl := &statistics.Table{
HistColl: newHistColl,
Version: row.GetUint64(0),
ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMapWithoutSize(),
// During the initialization phase, we need to initialize LastAnalyzeVersion with the snapshot,
// which ensures that we don't duplicate the auto-analyze of a particular type of table.
// When the predicate columns feature is turned on, if a table has neither predicate columns nor indexes,
// then auto-analyze will only analyze the _row_id and refresh stats_meta,
// but since we don't have any histograms or topn's created for _row_id at the moment.
// So if we don't initialize LastAnalyzeVersion with the snapshot here,
// it will stay at 0 and auto-analyze won't be able to detect that the table has been analyzed.
// But in the future, we maybe will create some records for _row_id, see:
// https://github.com/pingcap/tidb/issues/51098
LastAnalyzeVersion: snapshot,
HistColl: newHistColl,
Version: row.GetUint64(0),
ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMapWithoutSize(),
LastAnalyzeVersion: lastAnalyzeVersion,
LastStatsUpdateVersion: lastStatsFullUpdateVersion,
}
cache.Put(physicalID, tbl) // put this table again since it is updated
}
Expand All @@ -95,7 +104,7 @@ func (*Handle) initStatsMeta4Chunk(cache statstypes.StatsCache, iter *chunk.Iter

func (h *Handle) initStatsMeta(ctx context.Context) (statstypes.StatsCache, error) {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
sql := "select HIGH_PRIORITY version, table_id, modify_count, count, snapshot from mysql.stats_meta"
sql := "select HIGH_PRIORITY version, table_id, modify_count, count, snapshot, last_analyze_version, last_affected_ddl_version from mysql.stats_meta"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -146,17 +155,14 @@ func (*Handle) initStatsHistograms4ChunkLite(cache statstypes.StatsCache, iter *
}
if isIndex > 0 {
table.ColAndIdxExistenceMap.InsertIndex(id, statsVer != statistics.Version0)
if statsVer != statistics.Version0 {
// The LastAnalyzeVersion is added by ALTER table so its value might be 0.
table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, row.GetUint64(4))
}
} else {
table.ColAndIdxExistenceMap.InsertCol(id, statsVer != statistics.Version0 || ndv > 0 || nullCount > 0)
if statsVer != statistics.Version0 {
// The LastAnalyzeVersion is added by ALTER table so its value might be 0.
table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, row.GetUint64(4))
}
}
// The LastXXXVersion can be added by ALTER table so its value might be 0, so we also need to update its memory value by the column/index's.
if statsVer != statistics.Version0 {
table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, row.GetUint64(4))
}
table.LastStatsUpdateVersion = max(table.LastStatsUpdateVersion, row.GetUint64(4))
}
if table != nil {
cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read.
Expand Down Expand Up @@ -228,12 +234,14 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache stats
StatsVer: statsVer,
PhysicalID: tblID,
}
// The LastXXXVersion can be added by ALTER table so its value might be 0, so we also need to update its memory value by the column/index's.
if statsVer != statistics.Version0 {
// We first set the StatsLoadedStatus as AllEvicted. when completing to load bucket, we will set it as ALlLoad.
index.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
// The LastAnalyzeVersion is added by ALTER table so its value might be 0.
table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, version)
}
table.LastStatsUpdateVersion = max(table.LastStatsUpdateVersion, version)

table.SetIdx(idxInfo.ID, index)
table.ColAndIdxExistenceMap.InsertIndex(idxInfo.ID, statsVer != statistics.Version0)
} else {
Expand All @@ -258,8 +266,8 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache stats
}
table.SetCol(hist.ID, col)
table.ColAndIdxExistenceMap.InsertCol(colInfo.ID, statsVer != statistics.Version0 || ndv > 0 || nullCount > 0)
// The LastXXXVersion can be added by ALTER table so its value might be 0, so we also need to update its memory value by the column/index's.
if statsVer != statistics.Version0 {
// The LastAnalyzeVersion is added by ALTER table so its value might be 0.
table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, version)
// We will also set int primary key's loaded status to evicted.
col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
Expand All @@ -269,6 +277,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache stats
col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
}
// Otherwise the column's stats is not initialized.
table.LastStatsUpdateVersion = max(table.LastStatsUpdateVersion, version)
}
}
if table != nil {
Expand Down
25 changes: 21 additions & 4 deletions pkg/statistics/handle/cache/statscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema, t
err error
)
if err := util.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error {
query := "SELECT version, table_id, modify_count, count, snapshot from mysql.stats_meta where version > %? "
query := "SELECT version, table_id, modify_count, count, snapshot, last_analyze_version, last_affected_ddl_version from mysql.stats_meta where version > %? "
args := []any{lastVersion}

if len(tableAndPartitionIDs) > 0 {
Expand Down Expand Up @@ -168,6 +168,13 @@ func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema, t
modifyCount := row.GetInt64(2)
count := row.GetInt64(3)
snapshot := row.GetUint64(4)
var latestHistUpdateVersion uint64
if !row.IsNull(5) {
latestHistUpdateVersion = row.GetUint64(5)
}
if !row.IsNull(6) {
latestHistUpdateVersion = max(latestHistUpdateVersion, row.GetUint64(6))
}

// Detect the context cancel signal, since it may take a long time for the loop.
// TODO: add context to TableInfoByID and remove this code block?
Expand All @@ -186,12 +193,21 @@ func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema, t
}
tableInfo := table.Meta()
// If the table is not updated, we can skip it.
if oldTbl, ok := s.Get(physicalID); ok &&
oldTbl.Version >= version &&

oldTbl, ok := s.Get(physicalID)
if ok && oldTbl.Version >= version &&
tableInfo.UpdateTS == oldTbl.TblInfoUpdateTS {
continue
}
tbl, err := s.statsHandle.TableStatsFromStorage(
var tbl *statistics.Table
// If the column/index stats has not been updated, we can reuse the old table stats.
// Only need to update the count and modify count.
if ok && latestHistUpdateVersion > 0 && oldTbl.LastStatsUpdateVersion >= latestHistUpdateVersion {
tbl = oldTbl.Copy()
// count and modify count is updated in finalProcess
goto finalProcess
}
tbl, err = s.statsHandle.TableStatsFromStorage(
tableInfo,
physicalID,
false,
Expand All @@ -210,6 +226,7 @@ func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema, t
tblToUpdateOrDelete.addToDelete(physicalID)
continue
}
finalProcess:
tbl.Version = version
tbl.RealtimeCount = count
tbl.ModifyCount = modifyCount
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/storage/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func DeleteTableStatsFromKV(sctx sessionctx.Context, statsIDs []int64, soft bool
}
for _, statsID := range statsIDs {
// We only update the version so that other tidb will know that this table is deleted.
if _, err = util.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, statsID); err != nil {
if _, err = util.Exec(sctx, "update mysql.stats_meta set version = %?, last_affected_ddl_version = %? where table_id = %? ", startTS, startTS, statsID); err != nil {
return err
}
if soft {
Expand Down
2 changes: 2 additions & 0 deletions pkg/statistics/handle/storage/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func indexStatsFromStorage(sctx sessionctx.Context, row chunk.Row, table *statis
table.StatsVer = int(statsVer)
table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, histVer)
}
table.LastStatsUpdateVersion = max(table.LastStatsUpdateVersion, histVer)
// We will not load buckets, topn and cmsketch if:
// 1. lease > 0, and:
// 2. the index doesn't have any of buckets, topn, cmsketch in memory before, and:
Expand Down Expand Up @@ -406,6 +407,7 @@ func columnStatsFromStorage(sctx sessionctx.Context, row chunk.Row, table *stati
table.StatsVer = int(statsVer)
table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, histVer)
}
table.LastStatsUpdateVersion = max(table.LastStatsUpdateVersion, histVer)
isHandle := tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag())
// We will not load buckets, topn and cmsketch if:
// 1. lease > 0, and:
Expand Down
17 changes: 10 additions & 7 deletions pkg/statistics/handle/storage/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,12 @@ func SaveTableStatsToStorage(sctx sessionctx.Context,
count = 0
}
if _, err = util.Exec(sctx,
"replace into mysql.stats_meta (version, table_id, count, snapshot) values (%?, %?, %?, %?)",
"replace into mysql.stats_meta (version, table_id, count, snapshot, last_analyze_version) values (%?, %?, %?, %?, %?)",
version,
tableID,
count,
snapShot,
version,
); err != nil {
return 0, err
}
Expand All @@ -185,7 +186,8 @@ func SaveTableStatsToStorage(sctx sessionctx.Context,
// 1-2. There's already an existing record for this table, and we are handling stats for mv index now.
// In this case, we only update the version. See comments for AnalyzeResults.ForMVIndex for more details.
if _, err = util.Exec(sctx,
"update mysql.stats_meta set version=%? where table_id=%?",
"update mysql.stats_meta set version=%?, last_analyze_version=%? where table_id=%?",
version,
version,
tableID,
); err != nil {
Expand Down Expand Up @@ -225,11 +227,12 @@ func SaveTableStatsToStorage(sctx sessionctx.Context,
zap.Int64("count", cnt))
}
if _, err = util.Exec(sctx,
"update mysql.stats_meta set version=%?, modify_count=%?, count=%?, snapshot=%? where table_id=%?",
"update mysql.stats_meta set version=%?, modify_count=%?, count=%?, snapshot=%?, last_analyze_version=%? where table_id=%?",
version,
modifyCnt,
cnt,
results.Snapshot,
version,
tableID,
); err != nil {
return 0, err
Expand Down Expand Up @@ -410,8 +413,8 @@ func InsertColStats2KV(
// First of all, we update the version.
_, err = util.ExecWithCtx(
ctx, sctx,
"update mysql.stats_meta set version = %? where table_id = %?",
startTS, physicalID,
"update mysql.stats_meta set version = %?, last_affected_ddl_version = %? where table_id = %?",
startTS, startTS, physicalID,
)
if err != nil {
return 0, errors.Trace(err)
Expand Down Expand Up @@ -501,8 +504,8 @@ func InsertTableStats2KV(
}
if _, err = util.ExecWithCtx(
ctx, sctx,
"insert into mysql.stats_meta (version, table_id) values(%?, %?)",
startTS, physicalID,
"insert into mysql.stats_meta (version, table_id, last_affected_ddl_version) values(%?, %?, %?)",
startTS, physicalID, startTS,
); err != nil {
return 0, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit cdc64bd

Please sign in to comment.