Skip to content

Commit

Permalink
statistics: speed up the backgroud stats update worker
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros committed Dec 23, 2024
1 parent 70caa9e commit 115146e
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 54 deletions.
33 changes: 15 additions & 18 deletions pkg/executor/show_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,27 +146,24 @@ func (e *ShowExec) appendTableForStatsMeta(dbName, tblName, partitionName string
if statsTbl.Pseudo {
return
}
row := make([]any, 8)
row[0] = dbName
row[1] = tblName
row[2] = partitionName
row[3] = e.versionToTime(statsTbl.Version)
row[4] = statsTbl.ModifyCount
row[5] = statsTbl.RealtimeCount
if !statsTbl.IsAnalyzed() {
e.appendRow([]any{
dbName,
tblName,
partitionName,
e.versionToTime(statsTbl.Version),
statsTbl.ModifyCount,
statsTbl.RealtimeCount,
nil,
})
row[6] = nil
} else {
e.appendRow([]any{
dbName,
tblName,
partitionName,
e.versionToTime(statsTbl.Version),
statsTbl.ModifyCount,
statsTbl.RealtimeCount,
e.versionToTime(statsTbl.LastAnalyzeVersion),
})
row[6] = e.versionToTime(statsTbl.LastAnalyzeVersion)
}
if statsTbl.LastStatsFullUpdateVersion == 0 {
row[7] = nil
} else {
row[7] = e.versionToTime(statsTbl.LastStatsFullUpdateVersion)
}
e.appendRow(row)
}

func (e *ShowExec) appendTableForStatsLocked(dbName, tblName, partitionName string) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5646,8 +5646,8 @@ func buildShowSchema(s *ast.ShowStmt, isView bool, isSequence bool) (schema *exp
ftypes = []byte{mysql.TypeLonglong, mysql.TypeVarchar, mysql.TypeVarchar,
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLong, mysql.TypeVarchar, mysql.TypeString}
case ast.ShowStatsMeta:
names = []string{"Db_name", "Table_name", "Partition_name", "Update_time", "Modify_count", "Row_count", "Last_analyze_time"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeDatetime, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeDatetime}
names = []string{"Db_name", "Table_name", "Partition_name", "Update_time", "Modify_count", "Row_count", "Last_analyze_time", "Last_stats_full_update_time"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeDatetime, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeDatetime, mysql.TypeDatetime}
case ast.ShowStatsExtended:
names = []string{"Db_name", "Table_name", "Stats_name", "Column_names", "Stats_type", "Stats_val", "Last_update_version"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong}
Expand Down
29 changes: 23 additions & 6 deletions pkg/session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,16 @@ const (

// CreateStatsMetaTable stores the meta of table statistics.
CreateStatsMetaTable = `CREATE TABLE IF NOT EXISTS mysql.stats_meta (
version BIGINT(64) UNSIGNED NOT NULL,
table_id BIGINT(64) NOT NULL,
modify_count BIGINT(64) NOT NULL DEFAULT 0,
count BIGINT(64) UNSIGNED NOT NULL DEFAULT 0,
snapshot BIGINT(64) UNSIGNED NOT NULL DEFAULT 0,
version BIGINT(64) UNSIGNED NOT NULL,
table_id BIGINT(64) NOT NULL,
modify_count BIGINT(64) NOT NULL DEFAULT 0,
count BIGINT(64) UNSIGNED NOT NULL DEFAULT 0,
snapshot BIGINT(64) UNSIGNED NOT NULL DEFAULT 0,
last_analyze_version BIGINT(64) UNSIGNED DEFAULT NULL,
last_affected_ddl_version BIGINT(64) UNSIGNED DEFAULT NULL,
INDEX idx_ver(version),
INDEX idx_analyze_version(last_analyze_version),
INDEX idx_last_affected_ddl_version(last_affected_ddl_version),
UNIQUE INDEX tbl(table_id)
);`

Expand Down Expand Up @@ -1234,11 +1238,15 @@ const (
// version 240
// Add indexes to mysql.analyze_jobs to speed up the query.
version240 = 240

// version 241
// Add last_affected_ddl_version to mysql.stats_meta.
version241 = 241
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version240
var currentBootstrapVersion int64 = version241

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -1414,6 +1422,7 @@ var (
upgradeToVer218,
upgradeToVer239,
upgradeToVer240,
upgradeToVer241,
}
)

Expand Down Expand Up @@ -3343,6 +3352,14 @@ func upgradeToVer240(s sessiontypes.Session, ver int64) {
doReentrantDDL(s, addAnalyzeJobsSchemaTablePartitionStateIndex, dbterror.ErrDupKeyName)
}

func upgradeToVer241(s sessiontypes.Session, ver int64) {
if ver >= version241 {
return
}
doReentrantDDL(s, "ALTER TABLE mysql.stats_meta ADD COLUMN last_analyze_version bigint(20) unsigned DEFAULT NULL")
doReentrantDDL(s, "ALTER TABLE mysql.stats_meta ADD COLUMN last_affected_ddl_version bigint(20) unsigned DEFAULT NULL")
}

// initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist.
func initGlobalVariableIfNotExists(s sessiontypes.Session, name string, val any) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap)
Expand Down
18 changes: 9 additions & 9 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func (*Handle) initStatsMeta4Chunk(cache statstypes.StatsCache, iter *chunk.Iter
// it will stay at 0 and auto-analyze won't be able to detect that the table has been analyzed.
// But in the future, we maybe will create some records for _row_id, see:
// https://github.com/pingcap/tidb/issues/51098
LastAnalyzeVersion: snapshot,
LastAnalyzeVersion: snapshot,
LastStatsFullUpdateVersion: snapshot,
}
cache.Put(physicalID, tbl) // put this table again since it is updated
}
Expand Down Expand Up @@ -149,17 +150,14 @@ func (*Handle) initStatsHistograms4ChunkLite(cache statstypes.StatsCache, iter *
}
if isIndex > 0 {
table.ColAndIdxExistenceMap.InsertIndex(id, statsVer != statistics.Version0)
if statsVer != statistics.Version0 {
// The LastAnalyzeVersion is added by ALTER table so its value might be 0.
table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, row.GetUint64(4))
}
} else {
table.ColAndIdxExistenceMap.InsertCol(id, statsVer != statistics.Version0 || ndv > 0 || nullCount > 0)
if statsVer != statistics.Version0 {
// The LastAnalyzeVersion is added by ALTER table so its value might be 0.
table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, row.GetUint64(4))
}
}
// The LastXXXVersion can be added by ALTER table so its value might be 0.
if statsVer != statistics.Version0 {
table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, row.GetUint64(4))
}
table.LastStatsFullUpdateVersion = max(table.LastStatsFullUpdateVersion, row.GetUint64(4))
}
if table != nil {
cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read.
Expand Down Expand Up @@ -239,6 +237,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache stats
// The LastAnalyzeVersion is added by ALTER table so its value might be 0.
table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, version)
}
table.LastStatsFullUpdateVersion = max(table.LastStatsFullUpdateVersion, version)
lastAnalyzePos.Copy(&index.LastAnalyzePos)
table.SetIdx(idxInfo.ID, index)
table.ColAndIdxExistenceMap.InsertIndex(idxInfo.ID, statsVer != statistics.Version0)
Expand Down Expand Up @@ -277,6 +276,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache stats
col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
}
// Otherwise the column's stats is not initialized.
table.LastStatsFullUpdateVersion = max(table.LastStatsFullUpdateVersion, version)
}
}
if table != nil {
Expand Down
12 changes: 10 additions & 2 deletions pkg/statistics/handle/cache/statscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema, t
err error
)
if err := util.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error {
query := "SELECT version, table_id, modify_count, count, snapshot from mysql.stats_meta where version > %? "
query := "SELECT version, table_id, modify_count, count, snapshot, last_analyze_version, last_affected_ddl_version from mysql.stats_meta where version > %? "
args := []any{lastVersion}

if len(tableAndPartitionIDs) > 0 {
Expand Down Expand Up @@ -168,6 +168,13 @@ func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema, t
modifyCount := row.GetInt64(2)
count := row.GetInt64(3)
snapshot := row.GetUint64(4)
var latestHistUpdateVersion uint64 = 0
if !row.IsNull(5) {
latestHistUpdateVersion = row.GetUint64(5)
}
if !row.IsNull(6) {
latestHistUpdateVersion = max(latestHistUpdateVersion, row.GetUint64(6))
}

// Detect the context cancel signal, since it may take a long time for the loop.
// TODO: add context to TableInfoByID and remove this code block?
Expand All @@ -192,7 +199,8 @@ func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema, t
oldTbl, ok := s.Get(physicalID)
if ok &&
oldTbl.Version >= version &&
tableInfo.UpdateTS == oldTbl.TblInfoUpdateTS {
tableInfo.UpdateTS == oldTbl.TblInfoUpdateTS &&
latestHistUpdateVersion > 0 && oldTbl.LastStatsFullUpdateVersion >= latestHistUpdateVersion {
continue
}
if !ok {
Expand Down
2 changes: 2 additions & 0 deletions pkg/statistics/handle/storage/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ func indexStatsFromStorage(sctx sessionctx.Context, row chunk.Row, table *statis
table.StatsVer = int(statsVer)
table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, histVer)
}
table.LastStatsFullUpdateVersion = max(table.LastStatsFullUpdateVersion, histVer)
// We will not load buckets, topn and cmsketch if:
// 1. lease > 0, and:
// 2. the index doesn't have any of buckets, topn, cmsketch in memory before, and:
Expand Down Expand Up @@ -415,6 +416,7 @@ func columnStatsFromStorage(sctx sessionctx.Context, row chunk.Row, table *stati
table.StatsVer = int(statsVer)
table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, histVer)
}
table.LastStatsFullUpdateVersion = max(table.LastStatsFullUpdateVersion, histVer)
isHandle := tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag())
// We will not load buckets, topn and cmsketch if:
// 1. lease > 0, and:
Expand Down
17 changes: 10 additions & 7 deletions pkg/statistics/handle/storage/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,12 @@ func SaveTableStatsToStorage(sctx sessionctx.Context,
count = 0
}
if _, err = util.Exec(sctx,
"replace into mysql.stats_meta (version, table_id, count, snapshot) values (%?, %?, %?, %?)",
"replace into mysql.stats_meta (version, table_id, count, snapshot, last_analyze_version) values (%?, %?, %?, %?, %?)",
version,
tableID,
count,
snapShot,
version,
); err != nil {
return 0, err
}
Expand All @@ -188,7 +189,8 @@ func SaveTableStatsToStorage(sctx sessionctx.Context,
// 1-2. There's already an existing record for this table, and we are handling stats for mv index now.
// In this case, we only update the version. See comments for AnalyzeResults.ForMVIndex for more details.
if _, err = util.Exec(sctx,
"update mysql.stats_meta set version=%? where table_id=%?",
"update mysql.stats_meta set version=%?, last_analyze_version=%? where table_id=%?",
version,
version,
tableID,
); err != nil {
Expand Down Expand Up @@ -228,11 +230,12 @@ func SaveTableStatsToStorage(sctx sessionctx.Context,
zap.Int64("count", cnt))
}
if _, err = util.Exec(sctx,
"update mysql.stats_meta set version=%?, modify_count=%?, count=%?, snapshot=%? where table_id=%?",
"update mysql.stats_meta set version=%?, modify_count=%?, count=%?, snapshot=%?, last_analyze_version=%? where table_id=%?",
version,
modifyCnt,
cnt,
results.Snapshot,
version,
tableID,
); err != nil {
return 0, err
Expand Down Expand Up @@ -430,8 +433,8 @@ func InsertColStats2KV(
// First of all, we update the version.
_, err = util.ExecWithCtx(
ctx, sctx,
"update mysql.stats_meta set version = %? where table_id = %?",
startTS, physicalID,
"update mysql.stats_meta set version = %?, last_affected_ddl_version = %? where table_id = %?",
startTS, startTS, physicalID,
)
if err != nil {
return 0, errors.Trace(err)
Expand Down Expand Up @@ -521,8 +524,8 @@ func InsertTableStats2KV(
}
if _, err = util.ExecWithCtx(
ctx, sctx,
"insert into mysql.stats_meta (version, table_id) values(%?, %?)",
startTS, physicalID,
"insert into mysql.stats_meta (version, table_id, last_affected_ddl_version) values(%?, %?, %?)",
startTS, physicalID, startTS,
); err != nil {
return 0, errors.Trace(err)
}
Expand Down
30 changes: 20 additions & 10 deletions pkg/statistics/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ type Table struct {
// 1. Initialized by snapshot when loading stats_meta.
// 2. Updated by the analysis time of a specific column or index when loading the histogram of the column or index.
LastAnalyzeVersion uint64
// LastStatsFullUpdateVersion is the mvcc version of the last full update of histograms.
// It differs from LastAnalyzeVersion because it can be influenced by some DDL.
// e.g. When we execute ALTER TABLE ADD COLUMN, there'll be new record inserted into mysql.stats_histograms.
// We need to load the corresponding one into memory too.
// It's used to skip redundant loading of stats, i.e, if the cached stats is already update-to-date with mysql.stats_xxx tables,
// and the schema of the table does not change, we don't need to load the stats for this table again.
// Stats' sync load/async load should not change this field since they are not table-level update.
LastStatsFullUpdateVersion uint64
// TblInfoUpdateTS is the UpdateTS of the TableInfo used when filling this struct.
// It is the schema version of the corresponding table. It is used to skip redundant
// loading of stats, i.e, if the cached stats is already update-to-date with mysql.stats_xxx tables,
Expand Down Expand Up @@ -607,10 +615,11 @@ func (t *Table) Copy() *Table {
newHistColl.indices[id] = idx.Copy()
}
nt := &Table{
HistColl: newHistColl,
Version: t.Version,
TblInfoUpdateTS: t.TblInfoUpdateTS,
LastAnalyzeVersion: t.LastAnalyzeVersion,
HistColl: newHistColl,
Version: t.Version,
TblInfoUpdateTS: t.TblInfoUpdateTS,
LastAnalyzeVersion: t.LastAnalyzeVersion,
LastStatsFullUpdateVersion: t.LastStatsFullUpdateVersion,
}
if t.ExtendedStats != nil {
newExtStatsColl := &ExtendedStatsColl{
Expand Down Expand Up @@ -643,12 +652,13 @@ func (t *Table) ShallowCopy() *Table {
StatsVer: t.StatsVer,
}
nt := &Table{
HistColl: newHistColl,
Version: t.Version,
TblInfoUpdateTS: t.TblInfoUpdateTS,
ExtendedStats: t.ExtendedStats,
ColAndIdxExistenceMap: t.ColAndIdxExistenceMap,
LastAnalyzeVersion: t.LastAnalyzeVersion,
HistColl: newHistColl,
Version: t.Version,
TblInfoUpdateTS: t.TblInfoUpdateTS,
ExtendedStats: t.ExtendedStats,
ColAndIdxExistenceMap: t.ColAndIdxExistenceMap,
LastAnalyzeVersion: t.LastAnalyzeVersion,
LastStatsFullUpdateVersion: t.LastStatsFullUpdateVersion,
}
return nt
}
Expand Down

0 comments on commit 115146e

Please sign in to comment.