Skip to content

[DNM]*: test import #60147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ require (
github.com/tikv/pd/client v0.0.0-20250311030855-e245c1fb30c9
github.com/timakin/bodyclose v0.0.0-20241017074812-ed6a65f985e3
github.com/twmb/murmur3 v1.1.6
github.com/twotwotwo/sorts v0.0.0-20160814051341-bf5c1f2b8553
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/vbauerster/mpb/v7 v7.5.3
github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 h1:6fotK7
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75/go.mod h1:KO6IkyS8Y3j8OdNO85qEYBsRPuteD+YciPomcXdrMnk=
github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg=
github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/twotwotwo/sorts v0.0.0-20160814051341-bf5c1f2b8553 h1:DRC1ubdb3ZmyyIeCSTxjZIQAnpLPfKVgYrLETQuOPjo=
github.com/twotwotwo/sorts v0.0.0-20160814051341-bf5c1f2b8553/go.mod h1:Rj7Csq/tZ/egz+Ltc2IVpsA5309AmSMEswjkTZmq2Xc=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg=
Expand Down
124 changes: 94 additions & 30 deletions pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ package external

import (
"bytes"
"container/heap"
"context"
"encoding/hex"
"io"
"sort"
"sync"
"time"

"github.com/cockroachdb/pebble"
"github.com/docker/go-units"
"github.com/jfcg/sorty/v2"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -65,7 +64,67 @@ type memKVsAndBuffers struct {
droppedSizePerFile []int
}

func (b *memKVsAndBuffers) build(ctx context.Context) {
type bufferedKVReader struct {
keys [][]byte
values [][]byte
cur int
}

func (p bufferedKVReader) path() string {
return ""
}

func (p bufferedKVReader) next() (*kvPair, error) {
if p.cur >= len(p.keys) {
return nil, io.EOF
}

pair := &kvPair{key: p.keys[p.cur], value: p.values[p.cur]}
p.cur++
return pair, nil
}

func (_ bufferedKVReader) switchConcurrentMode(_ bool) error {
return nil
}

func (_ bufferedKVReader) close() error {
return nil
}

type HeapItem struct {
fileIndex int
elemIndex int
}

type MergeHeap struct {
items []HeapItem
m *memKVsAndBuffers
}

func (h MergeHeap) Len() int { return len(h.items) }
func (h MergeHeap) Less(i, j int) bool {
itemI := h.items[i]
itemJ := h.items[j]
keyI := h.m.keysPerFile[itemI.fileIndex][itemI.elemIndex]
keyJ := h.m.keysPerFile[itemJ.fileIndex][itemJ.elemIndex]
return bytes.Compare(keyI, keyJ) < 0
}
func (h MergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }

func (h *MergeHeap) Push(x interface{}) {
h.items = append(h.items, x.(HeapItem))
}

func (h *MergeHeap) Pop() interface{} {
old := h.items
n := len(old)
x := old[n-1]
h.items = old[0 : n-1]
return x
}

func (b *memKVsAndBuffers) build(ctx context.Context) error {
sumKVCnt := 0
for _, keys := range b.keysPerFile {
sumKVCnt += len(keys)
Expand All @@ -75,21 +134,46 @@ func (b *memKVsAndBuffers) build(ctx context.Context) {
b.droppedSize += size
}
b.droppedSizePerFile = nil

logutil.Logger(ctx).Info("building memKVsAndBuffers",
zap.Int("sumKVCnt", sumKVCnt),
zap.Int("droppedSize", b.droppedSize))

h := &MergeHeap{
m: b,
}
for i := 0; i < len(b.keysPerFile); i++ {
if len(b.keysPerFile[i]) > 0 {
heap.Push(h, HeapItem{fileIndex: i, elemIndex: 0})
}
}
heap.Init(h)

b.keys = make([][]byte, 0, sumKVCnt)
b.values = make([][]byte, 0, sumKVCnt)

for h.Len() > 0 {
item := heap.Pop(h).(HeapItem)
fileIdx := item.fileIndex
elemIdx := item.elemIndex

b.keys = append(b.keys, b.keysPerFile[fileIdx][elemIdx])
b.values = append(b.values, b.valuesPerFile[fileIdx][elemIdx])

nextElemIdx := elemIdx + 1
if nextElemIdx < len(b.keysPerFile[fileIdx]) {
item.elemIndex = nextElemIdx
heap.Push(h, item)
}
}

for i := range b.keysPerFile {
b.keys = append(b.keys, b.keysPerFile[i]...)
b.keysPerFile[i] = nil
b.values = append(b.values, b.valuesPerFile[i]...)
b.valuesPerFile[i] = nil
}
b.keysPerFile = nil
b.valuesPerFile = nil

return nil
}

// Engine stored sorted key/value pairs in an external storage.
Expand Down Expand Up @@ -285,7 +369,6 @@ func (e *Engine) loadBatchRegionData(ctx context.Context, jobKeys [][]byte, outC
if err != nil {
return err
}
e.memKVsAndBuffers.build(ctx)

readSecond := time.Since(readStart).Seconds()
readDurHist.Observe(readSecond)
Expand All @@ -294,33 +377,14 @@ func (e *Engine) loadBatchRegionData(ctx context.Context, jobKeys [][]byte, outC
zap.Int("droppedSize", e.memKVsAndBuffers.droppedSize))

sortStart := time.Now()
oldSortyGor := sorty.MaxGor
sorty.MaxGor = uint64(e.workerConcurrency * 2)
var dupKey atomic.Pointer[[]byte]
sorty.Sort(len(e.memKVsAndBuffers.keys), func(i, k, r, s int) bool {
cmp := bytes.Compare(e.memKVsAndBuffers.keys[i], e.memKVsAndBuffers.keys[k])
if cmp < 0 { // strict comparator like < or >
if r != s {
e.memKVsAndBuffers.keys[r], e.memKVsAndBuffers.keys[s] = e.memKVsAndBuffers.keys[s], e.memKVsAndBuffers.keys[r]
e.memKVsAndBuffers.values[r], e.memKVsAndBuffers.values[s] = e.memKVsAndBuffers.values[s], e.memKVsAndBuffers.values[r]
}
return true
}
if cmp == 0 && i != k {
cloned := append([]byte(nil), e.memKVsAndBuffers.keys[i]...)
dupKey.Store(&cloned)
}
return false
})
sorty.MaxGor = oldSortyGor

e.memKVsAndBuffers.build(ctx)

sortSecond := time.Since(sortStart).Seconds()
sortDurHist.Observe(sortSecond)
logutil.Logger(ctx).Info("sorting in loadBatchRegionData",
zap.Duration("cost time", time.Since(sortStart)))

if k := dupKey.Load(); k != nil {
return errors.Errorf("duplicate key found: %s", hex.EncodeToString(*k))
}
readAndSortSecond := time.Since(readStart).Seconds()
readAndSortDurHist.Observe(readAndSortSecond)

Expand Down
6 changes: 4 additions & 2 deletions pkg/lightning/backend/external/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func newLimitSizeMergeIter[
nextReaderIdx: end,
limit: limit,
}
// newMergeIter may close readers if the reader has no content, so we need to
// NewMergeIter may close readers if the reader has no content, so we need to
// fill more
for i, rp := range iter.readers {
if rp != nil {
Expand Down Expand Up @@ -571,7 +571,9 @@ func (i *MergeKVIter) Close() error {
return err
}
// memPool should be destroyed after reader's buffer pool.
i.memPool.Destroy()
if i.memPool != nil {
i.memPool.Destroy()
}
return nil
}

Expand Down
36 changes: 27 additions & 9 deletions pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/binary"
"encoding/hex"
"path/filepath"
"slices"
"sort"
"strconv"
"time"
Expand All @@ -38,6 +37,8 @@ import (
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"go.uber.org/zap"

"github.com/twotwotwo/sorts"
)

var (
Expand Down Expand Up @@ -321,6 +322,23 @@ func GetMaxOverlappingTotal(stats []MultipleFilesStat) int64 {
return GetMaxOverlapping(points)
}

type location struct {
key []byte
loc membuf.SliceLocation
}

type locations []location

func (l locations) Len() int { return len(l) }
func (l locations) Swap(i, j int) { l[i], l[j] = l[j], l[i] }
func (l locations) Key(i int) []byte { return l[i].key }
func (l locations) Less(i, j int) bool { return bytes.Compare(l[i].key, l[j].key) == -1 }
func (l locations) Sort() { sorts.ByBytes(l) }

func init() {
sorts.MaxProcs = 2
}

// Writer is used to write data into external storage.
type Writer struct {
store storage.ExternalStorage
Expand All @@ -335,8 +353,9 @@ type Writer struct {
memSizeLimit uint64

kvBuffer *membuf.Buffer
kvLocations []membuf.SliceLocation
kvSize int64
kvLocations []location

kvSize int64

onClose OnCloseFunc
closed bool
Expand Down Expand Up @@ -382,7 +401,7 @@ func (w *Writer) WriteRow(ctx context.Context, key, val []byte, handle tidbkv.Ha
keyAdapter.Encode(dataBuf[2*lengthBytes:2*lengthBytes:2*lengthBytes+encodedKeyLen], key, rowID)
copy(dataBuf[2*lengthBytes+encodedKeyLen:], val)

w.kvLocations = append(w.kvLocations, loc)
w.kvLocations = append(w.kvLocations, location{dataBuf[2*lengthBytes : 2*lengthBytes+encodedKeyLen], loc})
w.kvSize += int64(encodedKeyLen + len(val))
w.batchSize += uint64(length)
w.totalCnt += 1
Expand Down Expand Up @@ -452,9 +471,8 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
zap.Int("sequence-number", w.currentSeq),
)
sortStart := time.Now()
slices.SortFunc(w.kvLocations, func(i, j membuf.SliceLocation) int {
return bytes.Compare(w.getKeyByLoc(i), w.getKeyByLoc(j))
})
locations(w.kvLocations).Sort()

sortDuration := time.Since(sortStart)
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort").Observe(sortDuration.Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort").Observe(float64(w.batchSize) / 1024.0 / 1024.0 / sortDuration.Seconds())
Expand Down Expand Up @@ -488,7 +506,7 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort_and_write").Observe(totalDuration.Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort_and_write").Observe(float64(w.batchSize) / 1024.0 / 1024.0 / totalDuration.Seconds())

minKey, maxKey := w.getKeyByLoc(w.kvLocations[0]), w.getKeyByLoc(w.kvLocations[len(w.kvLocations)-1])
minKey, maxKey := w.getKeyByLoc(w.kvLocations[0].loc), w.getKeyByLoc(w.kvLocations[len(w.kvLocations)-1].loc)
w.recordMinMax(minKey, maxKey, uint64(w.kvSize))

// maintain 500-batch statistics
Expand Down Expand Up @@ -542,7 +560,7 @@ func (w *Writer) flushSortedKVs(ctx context.Context) (string, string, error) {
}

for _, pair := range w.kvLocations {
err = kvStore.addEncodedData(w.kvBuffer.GetSlice(pair))
err = kvStore.addEncodedData(w.kvBuffer.GetSlice(pair.loc))
if err != nil {
return "", "", err
}
Expand Down