Skip to content

Commit 2e5817c

Browse files
hawkingreiti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#55943
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 58bb25d commit 2e5817c

File tree

1 file changed

+287
-0
lines changed

1 file changed

+287
-0
lines changed
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
// Copyright 2023 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a Copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package cache
16+
17+
import (
18+
"context"
19+
"sync/atomic"
20+
"time"
21+
22+
"github.com/pingcap/errors"
23+
"github.com/pingcap/failpoint"
24+
"github.com/pingcap/tidb/pkg/config"
25+
"github.com/pingcap/tidb/pkg/infoschema"
26+
tidbmetrics "github.com/pingcap/tidb/pkg/metrics"
27+
"github.com/pingcap/tidb/pkg/sessionctx"
28+
"github.com/pingcap/tidb/pkg/statistics"
29+
"github.com/pingcap/tidb/pkg/statistics/handle/cache/internal/metrics"
30+
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
31+
handle_metrics "github.com/pingcap/tidb/pkg/statistics/handle/metrics"
32+
"github.com/pingcap/tidb/pkg/statistics/handle/types"
33+
"github.com/pingcap/tidb/pkg/statistics/handle/util"
34+
"github.com/pingcap/tidb/pkg/util/chunk"
35+
"github.com/pingcap/tidb/pkg/util/logutil"
36+
"go.uber.org/zap"
37+
)
38+
39+
// StatsCacheImpl implements util.StatsCache.
40+
type StatsCacheImpl struct {
41+
atomic.Pointer[StatsCache]
42+
43+
statsHandle types.StatsHandle
44+
}
45+
46+
// NewStatsCacheImpl creates a new StatsCache.
47+
func NewStatsCacheImpl(statsHandle types.StatsHandle) (types.StatsCache, error) {
48+
newCache, err := NewStatsCache()
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
result := &StatsCacheImpl{
54+
statsHandle: statsHandle,
55+
}
56+
result.Store(newCache)
57+
58+
return result, nil
59+
}
60+
61+
// NewStatsCacheImplForTest creates a new StatsCache for test.
62+
func NewStatsCacheImplForTest() (types.StatsCache, error) {
63+
return NewStatsCacheImpl(nil)
64+
}
65+
66+
// Update reads stats meta from store and updates the stats map.
67+
func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema) error {
68+
start := time.Now()
69+
lastVersion := s.getLastVersion()
70+
var (
71+
rows []chunk.Row
72+
err error
73+
)
74+
if err := util.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error {
75+
rows, _, err = util.ExecRows(
76+
sctx,
77+
"SELECT version, table_id, modify_count, count, snapshot from mysql.stats_meta where version > %? order by version",
78+
lastVersion,
79+
)
80+
return err
81+
}); err != nil {
82+
return errors.Trace(err)
83+
}
84+
85+
tables := make([]*statistics.Table, 0, len(rows))
86+
deletedTableIDs := make([]int64, 0, len(rows))
87+
88+
for _, row := range rows {
89+
version := row.GetUint64(0)
90+
physicalID := row.GetInt64(1)
91+
modifyCount := row.GetInt64(2)
92+
count := row.GetInt64(3)
93+
snapshot := row.GetUint64(4)
94+
95+
// Detect the context cancel signal, since it may take a long time for the loop.
96+
// TODO: add context to TableInfoByID and remove this code block?
97+
if ctx.Err() != nil {
98+
return ctx.Err()
99+
}
100+
101+
table, ok := s.statsHandle.TableInfoByID(is, physicalID)
102+
if !ok {
103+
logutil.BgLogger().Debug(
104+
"unknown physical ID in stats meta table, maybe it has been dropped",
105+
zap.Int64("ID", physicalID),
106+
)
107+
deletedTableIDs = append(deletedTableIDs, physicalID)
108+
continue
109+
}
110+
tableInfo := table.Meta()
111+
// If the table is not updated, we can skip it.
112+
if oldTbl, ok := s.Get(physicalID); ok &&
113+
oldTbl.Version >= version &&
114+
tableInfo.UpdateTS == oldTbl.TblInfoUpdateTS {
115+
continue
116+
}
117+
tbl, err := s.statsHandle.TableStatsFromStorage(
118+
tableInfo,
119+
physicalID,
120+
false,
121+
0,
122+
)
123+
// Error is not nil may mean that there are some ddl changes on this table, we will not update it.
124+
if err != nil {
125+
statslogutil.StatsLogger().Error(
126+
"error occurred when read table stats",
127+
zap.String("table", tableInfo.Name.O),
128+
zap.Error(err),
129+
)
130+
continue
131+
}
132+
if tbl == nil {
133+
deletedTableIDs = append(deletedTableIDs, physicalID)
134+
continue
135+
}
136+
tbl.Version = version
137+
tbl.RealtimeCount = count
138+
tbl.ModifyCount = modifyCount
139+
tbl.TblInfoUpdateTS = tableInfo.UpdateTS
140+
// It only occurs in the following situations:
141+
// 1. The table has already been analyzed,
142+
// but because the predicate columns feature is turned on, and it doesn't have any columns or indexes analyzed,
143+
// it only analyzes _row_id and refreshes stats_meta, in which case the snapshot is not zero.
144+
// 2. LastAnalyzeVersion is 0 because it has never been loaded.
145+
// In this case, we can initialize LastAnalyzeVersion to the snapshot,
146+
// otherwise auto-analyze will assume that the table has never been analyzed and try to analyze it again.
147+
if tbl.LastAnalyzeVersion == 0 && snapshot != 0 {
148+
tbl.LastAnalyzeVersion = snapshot
149+
}
150+
tables = append(tables, tbl)
151+
}
152+
153+
s.UpdateStatsCache(tables, deletedTableIDs)
154+
dur := time.Since(start)
155+
tidbmetrics.StatsDeltaLoadHistogram.Observe(dur.Seconds())
156+
return nil
157+
}
158+
159+
func (s *StatsCacheImpl) getLastVersion() uint64 {
160+
// Get the greatest version of the stats meta table.
161+
lastVersion := s.MaxTableStatsVersion()
162+
// We need this because for two tables, the smaller version may write later than the one with larger version.
163+
// Consider the case that there are two tables A and B, their version and commit time is (A0, A1) and (B0, B1),
164+
// and A0 < B0 < B1 < A1. We will first read the stats of B, and update the lastVersion to B0, but we cannot read
165+
// the table stats of A0 if we read stats that greater than lastVersion which is B0.
166+
// We can read the stats if the diff between commit time and version is less than five lease.
167+
offset := util.DurationToTS(5 * s.statsHandle.Lease()) // 5 lease is 15s.
168+
if s.MaxTableStatsVersion() >= offset {
169+
lastVersion = lastVersion - offset
170+
} else {
171+
lastVersion = 0
172+
}
173+
174+
return lastVersion
175+
}
176+
177+
// Replace replaces this cache.
178+
func (s *StatsCacheImpl) Replace(cache types.StatsCache) {
179+
x := cache.(*StatsCacheImpl)
180+
s.replace(x.Load())
181+
}
182+
183+
// replace replaces the cache with the new cache.
184+
func (s *StatsCacheImpl) replace(newCache *StatsCache) {
185+
old := s.Swap(newCache)
186+
if old != nil {
187+
old.Close()
188+
}
189+
metrics.CostGauge.Set(float64(newCache.Cost()))
190+
}
191+
192+
// UpdateStatsCache updates the cache with the new cache.
193+
func (s *StatsCacheImpl) UpdateStatsCache(tables []*statistics.Table, deletedIDs []int64) {
194+
if enableQuota := config.GetGlobalConfig().Performance.EnableStatsCacheMemQuota; enableQuota {
195+
s.Load().Update(tables, deletedIDs)
196+
} else {
197+
// TODO: remove this branch because we will always enable quota.
198+
newCache := s.Load().CopyAndUpdate(tables, deletedIDs)
199+
s.replace(newCache)
200+
}
201+
}
202+
203+
// Close closes this cache.
204+
func (s *StatsCacheImpl) Close() {
205+
s.Load().Close()
206+
}
207+
208+
// Clear clears this cache.
209+
// Create a empty cache and replace the old one.
210+
func (s *StatsCacheImpl) Clear() {
211+
cache, err := NewStatsCache()
212+
if err != nil {
213+
logutil.BgLogger().Warn("create stats cache failed", zap.Error(err))
214+
return
215+
}
216+
s.replace(cache)
217+
}
218+
219+
// MemConsumed returns its memory usage.
220+
func (s *StatsCacheImpl) MemConsumed() (size int64) {
221+
return s.Load().Cost()
222+
}
223+
224+
// Get returns the specified table's stats.
225+
func (s *StatsCacheImpl) Get(tableID int64) (*statistics.Table, bool) {
226+
failpoint.Inject("StatsCacheGetNil", func() {
227+
failpoint.Return(nil, false)
228+
})
229+
return s.Load().Get(tableID)
230+
}
231+
232+
// Put puts this table stats into the cache.
233+
func (s *StatsCacheImpl) Put(id int64, t *statistics.Table) {
234+
s.Load().put(id, t)
235+
}
236+
237+
// MaxTableStatsVersion returns the version of the current cache, which is defined as
238+
// the max table stats version the cache has in its lifecycle.
239+
func (s *StatsCacheImpl) MaxTableStatsVersion() uint64 {
240+
return s.Load().Version()
241+
}
242+
243+
// Values returns all values in this cache.
244+
func (s *StatsCacheImpl) Values() []*statistics.Table {
245+
return s.Load().Values()
246+
}
247+
248+
// Len returns the length of this cache.
249+
func (s *StatsCacheImpl) Len() int {
250+
return s.Load().Len()
251+
}
252+
253+
// SetStatsCacheCapacity sets the cache's capacity.
254+
func (s *StatsCacheImpl) SetStatsCacheCapacity(c int64) {
255+
s.Load().SetCapacity(c)
256+
}
257+
258+
// UpdateStatsHealthyMetrics updates stats healthy distribution metrics according to stats cache.
259+
func (s *StatsCacheImpl) UpdateStatsHealthyMetrics() {
260+
distribution := make([]int64, 5)
261+
uneligibleAnalyze := 0
262+
for _, tbl := range s.Values() {
263+
distribution[4]++ // total table count
264+
isEligibleForAnalysis := tbl.IsEligibleForAnalysis()
265+
if !isEligibleForAnalysis {
266+
uneligibleAnalyze++
267+
continue
268+
}
269+
healthy, ok := tbl.GetStatsHealthy()
270+
if !ok {
271+
continue
272+
}
273+
if healthy < 50 {
274+
distribution[0]++
275+
} else if healthy < 80 {
276+
distribution[1]++
277+
} else if healthy < 100 {
278+
distribution[2]++
279+
} else {
280+
distribution[3]++
281+
}
282+
}
283+
for i, val := range distribution {
284+
handle_metrics.StatsHealthyGauges[i].Set(float64(val))
285+
}
286+
handle_metrics.StatsHealthyGauges[5].Set(float64(uneligibleAnalyze))
287+
}

0 commit comments

Comments
 (0)