Skip to content

Commit c86227b

Browse files
committed
sstable: add adaptive compression
Add adaptive compression with zstd and snappy. zstd copmression is probed with exponential backoff when compression ratios are not better than 50%.
1 parent 567a8d3 commit c86227b

File tree

6 files changed

+91
-6
lines changed

6 files changed

+91
-6
lines changed

db.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2072,6 +2072,7 @@ func (d *DB) Metrics() *Metrics {
20722072
metrics.Table.CompressedCountSnappy += int64(compressionTypes.snappy)
20732073
metrics.Table.CompressedCountZstd += int64(compressionTypes.zstd)
20742074
metrics.Table.CompressedCountMinlz += int64(compressionTypes.minlz)
2075+
metrics.Table.CompressedCountAdaptive += int64(compressionTypes.adaptive)
20752076
metrics.Table.CompressedCountNone += int64(compressionTypes.none)
20762077
}
20772078

metrics.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,8 @@ type Metrics struct {
294294
CompressedCountMinlz int64
295295
// The number of sstables that are uncompressed.
296296
CompressedCountNone int64
297+
// The number of sstables that are compressed with adaptive.
298+
CompressedCountAdaptive int64
297299

298300
// Local file sizes.
299301
Local struct {
@@ -640,6 +642,9 @@ func (m *Metrics) SafeFormat(w redact.SafePrinter, _ rune) {
640642
if count := m.Table.CompressedCountMinlz; count > 0 {
641643
w.Printf(" minlz: %d", redact.Safe(count))
642644
}
645+
if count := m.Table.CompressedCountAdaptive; count > 0 {
646+
w.Printf(" adaptive: %d", redact.Safe(count))
647+
}
643648
if count := m.Table.CompressedCountNone; count > 0 {
644649
w.Printf(" none: %d", redact.Safe(count))
645650
}

options.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,12 @@ type Compression = block.Compression
4444

4545
// Exported Compression constants.
4646
const (
47-
DefaultCompression = block.DefaultCompression
48-
NoCompression = block.NoCompression
49-
SnappyCompression = block.SnappyCompression
50-
ZstdCompression = block.ZstdCompression
51-
MinlzCompression = block.MinlzCompression
47+
DefaultCompression = block.DefaultCompression
48+
NoCompression = block.NoCompression
49+
SnappyCompression = block.SnappyCompression
50+
ZstdCompression = block.ZstdCompression
51+
MinlzCompression = block.MinlzCompression
52+
AdaptiveCompression = block.AdaptiveCompression
5253
)
5354

5455
// FilterType exports the base.FilterType type.
@@ -1948,6 +1949,8 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error {
19481949
l.Compression = func() Compression { return ZstdCompression }
19491950
case "Minlz":
19501951
l.Compression = func() Compression { return MinlzCompression }
1952+
case "Adaptive":
1953+
l.Compression = func() Compression { return AdaptiveCompression }
19511954
default:
19521955
return errors.Errorf("pebble: unknown compression: %q", errors.Safe(value))
19531956
}

sstable/block/compression.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ const (
2626
SnappyCompression
2727
ZstdCompression
2828
MinlzCompression
29+
// AdaptiveCompression dynamically chooses between snappy and zstd
30+
// based on recent compression effectiveness. See comment for adaptiveCompressor
31+
// for more details.
32+
AdaptiveCompression
2933
NCompression
3034
)
3135

@@ -43,6 +47,8 @@ func (c Compression) String() string {
4347
return "ZSTD"
4448
case MinlzCompression:
4549
return "Minlz"
50+
case AdaptiveCompression:
51+
return "Adaptive"
4652
default:
4753
return "Unknown"
4854
}
@@ -62,6 +68,8 @@ func CompressionFromString(s string) Compression {
6268
return ZstdCompression
6369
case "Minlz":
6470
return MinlzCompression
71+
case "Adaptive":
72+
return AdaptiveCompression
6573
default:
6674
return DefaultCompression
6775
}

sstable/block/compressor.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package block
22

33
import (
44
"encoding/binary"
5+
"sync"
56

67
"github.com/cockroachdb/errors"
78
"github.com/cockroachdb/pebble/internal/base"
@@ -21,9 +22,28 @@ type noopCompressor struct{}
2122
type snappyCompressor struct{}
2223
type minlzCompressor struct{}
2324

25+
// adaptiveCompressor dynamically switches between snappy and zstd
26+
// compression. It prefers using zstd because of its potential for
27+
// high compression ratios. However, if zstd does not achieve a good
28+
// compression ratio, we apply exponential backoff before trying zstd again.
29+
// If the compression ratio is high (50% or better), we continue using zstd.
30+
type adaptiveCompressor struct {
31+
// timeTilTry is the number of operations to wait before
32+
// attempting zstd compression again after a poor result.
33+
timeTilTry int
34+
35+
// zstdBackoffStep is how much we increase timeTilTry
36+
// each time zstd compression fails to achieve at least
37+
// a 50% compression ratio.
38+
zstdBackoffStep int
39+
40+
zstdCompressor Compressor
41+
}
42+
2443
var _ Compressor = noopCompressor{}
2544
var _ Compressor = snappyCompressor{}
2645
var _ Compressor = minlzCompressor{}
46+
var _ Compressor = (*adaptiveCompressor)(nil)
2747

2848
func (noopCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
2949
panic("NoCompressionCompressor.Compress() should not be called.")
@@ -62,11 +82,56 @@ func GetCompressor(c Compression) Compressor {
6282
return getZstdCompressor()
6383
case MinlzCompression:
6484
return minlzCompressor{}
85+
case AdaptiveCompression:
86+
return adaptiveCompressorPool.Get().(*adaptiveCompressor)
6587
default:
6688
panic("Invalid compression type.")
6789
}
6890
}
6991

92+
var adaptiveCompressorPool = sync.Pool{
93+
New: func() any {
94+
return &adaptiveCompressor{zstdBackoffStep: 1, zstdCompressor: getZstdCompressor()}
95+
},
96+
}
97+
98+
func (a *adaptiveCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
99+
var algo CompressionIndicator
100+
var compressedBuf []byte
101+
if a.timeTilTry == 0 {
102+
z := a.zstdCompressor
103+
algo, compressedBuf = z.Compress(dst, src)
104+
// Perform a backoff if zstd compression ratio wasn't better than 50%.
105+
if 10*len(compressedBuf) >= 5*len(src) {
106+
a.increaseBackoff()
107+
} else {
108+
a.resetBackoff()
109+
}
110+
} else {
111+
// Use Snappy
112+
algo, compressedBuf = (snappyCompressor{}).Compress(dst, src)
113+
}
114+
a.timeTilTry--
115+
return algo, compressedBuf
116+
}
117+
118+
func (a *adaptiveCompressor) Close() {
119+
a.timeTilTry = 0
120+
a.zstdBackoffStep = 1
121+
adaptiveCompressorPool.Put(a)
122+
}
123+
124+
// Exponential backoff for zstd
125+
func (a *adaptiveCompressor) increaseBackoff() {
126+
a.zstdBackoffStep *= 2
127+
a.timeTilTry += a.zstdBackoffStep
128+
}
129+
130+
func (a *adaptiveCompressor) resetBackoff() {
131+
a.zstdBackoffStep = 1
132+
a.timeTilTry = 1
133+
}
134+
70135
type Decompressor interface {
71136
// DecompressInto decompresses compressed into buf. The buf slice must have the
72137
// exact size as the decompressed value. Callers may use DecompressedLen to

table_stats.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1034,7 +1034,7 @@ var compressionTypeAnnotator = manifest.Annotator[compressionTypes]{
10341034
type compressionTypeAggregator struct{}
10351035

10361036
type compressionTypes struct {
1037-
snappy, zstd, minlz, none, unknown uint64
1037+
snappy, zstd, minlz, adaptive, none, unknown uint64
10381038
}
10391039

10401040
func (a compressionTypeAggregator) Zero(dst *compressionTypes) *compressionTypes {
@@ -1055,6 +1055,8 @@ func (a compressionTypeAggregator) Accumulate(
10551055
dst.zstd++
10561056
case MinlzCompression:
10571057
dst.minlz++
1058+
case AdaptiveCompression:
1059+
dst.adaptive++
10581060
case NoCompression:
10591061
dst.none++
10601062
default:
@@ -1069,6 +1071,7 @@ func (a compressionTypeAggregator) Merge(
10691071
dst.snappy += src.snappy
10701072
dst.zstd += src.zstd
10711073
dst.minlz += src.minlz
1074+
dst.adaptive += src.adaptive
10721075
dst.none += src.none
10731076
dst.unknown += src.unknown
10741077
return dst

0 commit comments

Comments
 (0)