Skip to content

Commit ec2ecd9

Browse files
committed
sstable: add adaptive compression
Add adaptive compression with zstd and snappy. zstd copmression is probed with exponential backoff when compression ratios are not better than 50%.
1 parent 6018d0e commit ec2ecd9

File tree

6 files changed

+91
-6
lines changed

6 files changed

+91
-6
lines changed

db.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2078,6 +2078,7 @@ func (d *DB) Metrics() *Metrics {
20782078
metrics.Table.CompressedCountSnappy += int64(compressionTypes.snappy)
20792079
metrics.Table.CompressedCountZstd += int64(compressionTypes.zstd)
20802080
metrics.Table.CompressedCountMinlz += int64(compressionTypes.minlz)
2081+
metrics.Table.CompressedCountAdaptive += int64(compressionTypes.adaptive)
20812082
metrics.Table.CompressedCountNone += int64(compressionTypes.none)
20822083
}
20832084

metrics.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,8 @@ type Metrics struct {
294294
CompressedCountMinlz int64
295295
// The number of sstables that are uncompressed.
296296
CompressedCountNone int64
297+
// The number of sstables that are compressed with adaptive.
298+
CompressedCountAdaptive int64
297299

298300
// Local file sizes.
299301
Local struct {
@@ -696,6 +698,9 @@ func (m *Metrics) SafeFormat(w redact.SafePrinter, _ rune) {
696698
if count := m.Table.CompressedCountMinlz; count > 0 {
697699
w.Printf(" minlz: %d", redact.Safe(count))
698700
}
701+
if count := m.Table.CompressedCountAdaptive; count > 0 {
702+
w.Printf(" adaptive: %d", redact.Safe(count))
703+
}
699704
if count := m.Table.CompressedCountNone; count > 0 {
700705
w.Printf(" none: %d", redact.Safe(count))
701706
}

options.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,12 @@ type Compression = block.Compression
4545

4646
// Exported Compression constants.
4747
const (
48-
DefaultCompression = block.DefaultCompression
49-
NoCompression = block.NoCompression
50-
SnappyCompression = block.SnappyCompression
51-
ZstdCompression = block.ZstdCompression
52-
MinlzCompression = block.MinlzCompression
48+
DefaultCompression = block.DefaultCompression
49+
NoCompression = block.NoCompression
50+
SnappyCompression = block.SnappyCompression
51+
ZstdCompression = block.ZstdCompression
52+
MinlzCompression = block.MinlzCompression
53+
AdaptiveCompression = block.AdaptiveCompression
5354
)
5455

5556
// FilterType exports the base.FilterType type.
@@ -1952,6 +1953,8 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error {
19521953
l.Compression = func() Compression { return ZstdCompression }
19531954
case "Minlz":
19541955
l.Compression = func() Compression { return MinlzCompression }
1956+
case "Adaptive":
1957+
l.Compression = func() Compression { return AdaptiveCompression }
19551958
default:
19561959
return errors.Errorf("pebble: unknown compression: %q", errors.Safe(value))
19571960
}

sstable/block/compression.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ const (
2626
SnappyCompression
2727
ZstdCompression
2828
MinlzCompression
29+
// AdaptiveCompression dynamically chooses between snappy and zstd
30+
// based on recent compression effectiveness. See comment for adaptiveCompressor
31+
// for more details.
32+
AdaptiveCompression
2933
NCompression
3034
)
3135

@@ -43,6 +47,8 @@ func (c Compression) String() string {
4347
return "ZSTD"
4448
case MinlzCompression:
4549
return "Minlz"
50+
case AdaptiveCompression:
51+
return "Adaptive"
4652
default:
4753
return "Unknown"
4854
}
@@ -62,6 +68,8 @@ func CompressionFromString(s string) Compression {
6268
return ZstdCompression
6369
case "Minlz":
6470
return MinlzCompression
71+
case "Adaptive":
72+
return AdaptiveCompression
6573
default:
6674
return DefaultCompression
6775
}

sstable/block/compressor.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package block
22

33
import (
44
"encoding/binary"
5+
"sync"
56

67
"github.com/cockroachdb/errors"
78
"github.com/cockroachdb/pebble/internal/base"
@@ -21,9 +22,28 @@ type noopCompressor struct{}
2122
type snappyCompressor struct{}
2223
type minlzCompressor struct{}
2324

25+
// adaptiveCompressor dynamically switches between snappy and zstd
26+
// compression. It prefers using zstd because of its potential for
27+
// high compression ratios. However, if zstd does not achieve a good
28+
// compression ratio, we apply exponential backoff before trying zstd again.
29+
// If the compression ratio is high (50% or better), we continue using zstd.
30+
type adaptiveCompressor struct {
31+
// timeTilTry is the number of operations to wait before
32+
// attempting zstd compression again after a poor result.
33+
timeTilTry int
34+
35+
// zstdBackoffStep is how much we increase timeTilTry
36+
// each time zstd compression fails to achieve at least
37+
// a 50% compression ratio.
38+
zstdBackoffStep int
39+
40+
zstdCompressor Compressor
41+
}
42+
2443
var _ Compressor = noopCompressor{}
2544
var _ Compressor = snappyCompressor{}
2645
var _ Compressor = minlzCompressor{}
46+
var _ Compressor = (*adaptiveCompressor)(nil)
2747

2848
func (noopCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
2949
dst = append(dst[:0], src...)
@@ -63,11 +83,56 @@ func GetCompressor(c Compression) Compressor {
6383
return getZstdCompressor()
6484
case MinlzCompression:
6585
return minlzCompressor{}
86+
case AdaptiveCompression:
87+
return adaptiveCompressorPool.Get().(*adaptiveCompressor)
6688
default:
6789
panic("Invalid compression type.")
6890
}
6991
}
7092

93+
var adaptiveCompressorPool = sync.Pool{
94+
New: func() any {
95+
return &adaptiveCompressor{zstdBackoffStep: 1, zstdCompressor: getZstdCompressor()}
96+
},
97+
}
98+
99+
func (a *adaptiveCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
100+
var algo CompressionIndicator
101+
var compressedBuf []byte
102+
if a.timeTilTry == 0 {
103+
z := a.zstdCompressor
104+
algo, compressedBuf = z.Compress(dst, src)
105+
// Perform a backoff if zstd compression ratio wasn't better than 50%.
106+
if 10*len(compressedBuf) >= 5*len(src) {
107+
a.increaseBackoff()
108+
} else {
109+
a.resetBackoff()
110+
}
111+
} else {
112+
// Use Snappy
113+
algo, compressedBuf = (snappyCompressor{}).Compress(dst, src)
114+
}
115+
a.timeTilTry--
116+
return algo, compressedBuf
117+
}
118+
119+
func (a *adaptiveCompressor) Close() {
120+
a.timeTilTry = 0
121+
a.zstdBackoffStep = 1
122+
adaptiveCompressorPool.Put(a)
123+
}
124+
125+
// Exponential backoff for zstd
126+
func (a *adaptiveCompressor) increaseBackoff() {
127+
a.zstdBackoffStep *= 2
128+
a.timeTilTry += a.zstdBackoffStep
129+
}
130+
131+
func (a *adaptiveCompressor) resetBackoff() {
132+
a.zstdBackoffStep = 1
133+
a.timeTilTry = 1
134+
}
135+
71136
type Decompressor interface {
72137
// DecompressInto decompresses compressed into buf. The buf slice must have the
73138
// exact size as the decompressed value. Callers may use DecompressedLen to

table_stats.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1042,7 +1042,7 @@ var compressionTypeAnnotator = manifest.Annotator[compressionTypes]{
10421042
type compressionTypeAggregator struct{}
10431043

10441044
type compressionTypes struct {
1045-
snappy, zstd, minlz, none, unknown uint64
1045+
snappy, zstd, minlz, adaptive, none, unknown uint64
10461046
}
10471047

10481048
func (a compressionTypeAggregator) Zero(dst *compressionTypes) *compressionTypes {
@@ -1063,6 +1063,8 @@ func (a compressionTypeAggregator) Accumulate(
10631063
dst.zstd++
10641064
case MinlzCompression:
10651065
dst.minlz++
1066+
case AdaptiveCompression:
1067+
dst.adaptive++
10661068
case NoCompression:
10671069
dst.none++
10681070
default:
@@ -1077,6 +1079,7 @@ func (a compressionTypeAggregator) Merge(
10771079
dst.snappy += src.snappy
10781080
dst.zstd += src.zstd
10791081
dst.minlz += src.minlz
1082+
dst.adaptive += src.adaptive
10801083
dst.none += src.none
10811084
dst.unknown += src.unknown
10821085
return dst

0 commit comments

Comments
 (0)