Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lightning: Make sure we are using default block size of 16KB if user does not specify one. (#60097) #60184

Merged
3 changes: 3 additions & 0 deletions lightning/pkg/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,9 @@ func (tr *TableImporter) preprocessEngine(
logTask := tr.logger.With(zap.Int32("engineNumber", engineID)).Begin(zap.InfoLevel, "encode kv data and write")
dataEngineCfg := &backend.EngineConfig{
TableInfo: tr.tableInfo,
Local: backend.LocalEngineConfig{
BlockSize: int(rc.cfg.TikvImporter.BlockSize),
},
}
if !tr.tableMeta.IsRowOrdered {
dataEngineCfg.Local.Compact = true
Expand Down
10 changes: 10 additions & 0 deletions pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,16 @@ func newSSTWriter(path string, blockSize int) (*sstable.Writer, error) {
if err != nil {
return nil, errors.Trace(err)
}

// Logic to ensure the default block size is set to 16KB.
// If a smaller block size is used (e.g., 4KB, the default for Pebble),
// a single large SST file may generate a disproportionately large index block,
// potentially causing a memory spike and leading to an Out of Memory (OOM) scenario.
// If the user specifies a smaller block size, respect their choice.
if blockSize <= 0 {
blockSize = config.DefaultBlockSize
}

writable := objstorageprovider.NewFileWritable(f)
writer := sstable.NewWriter(writable, sstable.WriterOptions{
TablePropertyCollectors: []func() pebble.TablePropertyCollector{
Expand Down
35 changes: 35 additions & 0 deletions pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"path"
"path/filepath"
"reflect"
"sync"
"testing"
"time"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -240,3 +242,36 @@ func TestIterOutputHasUniqueMemorySpace(t *testing.T) {
// after iter closed, the memory buffer of iter goes to pool
require.Greater(t, pool.TotalSize(), int64(0))
}

// TestCreateSSTWriterDefaultBlockSize tests that createSSTWriter will use the default block size of 16KB if the block size is not set.
func TestCreateSSTWriterDefaultBlockSize(t *testing.T) {
db, tmpPath := makePebbleDB(t, nil)
t.Cleanup(func() {
require.NoError(t, db.Close())
})
engine := &Engine{
config: backend.LocalEngineConfig{
BlockSize: 0, // BlockSize is not set
},
sstDir: tmpPath,
logger: log.Logger{},
}

writer := &Writer{
engine: engine,
}

sstWriter, err := writer.createSSTWriter()
require.NoError(t, err)
require.NotNil(t, sstWriter)

// blockSize is a private field of sstWriter.writer, so we use reflection to access the private field blockSize
writerValue := reflect.ValueOf(sstWriter.writer).Elem()
blockSizeField := writerValue.FieldByName("blockSize")
require.True(t, blockSizeField.IsValid(), "blockSize field should be valid")
require.Equal(t, config.DefaultBlockSize, int(blockSizeField.Int()))

// clean up
err = sstWriter.writer.Close()
require.NoError(t, err)
}
2 changes: 1 addition & 1 deletion pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1496,7 +1496,7 @@ func NewConfig() *Config {
DiskQuota: ByteSize(math.MaxInt64),
DuplicateResolution: NoneOnDup,
PausePDSchedulerScope: PausePDSchedulerScopeTable,
BlockSize: 16 * 1024,
BlockSize: DefaultBlockSize,
LogicalImportBatchSize: ByteSize(defaultLogicalImportBatchSize),
LogicalImportBatchRows: defaultLogicalImportBatchRows,
LogicalImportPrepStmt: defaultLogicalImportPrepStmt,
Expand Down