Skip to content

Commit 757a071

Browse files
committed
Change the compression to lz4 on main (aptos-labs#12747)
1 parent d43fc86 commit 757a071

File tree

8 files changed

+70
-53
lines changed

8 files changed

+70
-53
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ impl Worker {
8080
enable_cache_compression: bool,
8181
) -> Result<Self> {
8282
let cache_storage_format = if enable_cache_compression {
83-
StorageFormat::GzipCompressedProto
83+
StorageFormat::Lz4CompressedProto
8484
} else {
8585
StorageFormat::Base64UncompressedProto
8686
};

ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
134134
.map_err(|e| anyhow::anyhow!("Failed to build reflection service: {}", e))?;
135135

136136
let cache_storage_format: StorageFormat = if self.enable_cache_compression {
137-
StorageFormat::GzipCompressedProto
137+
StorageFormat::Lz4CompressedProto
138138
} else {
139139
StorageFormat::Base64UncompressedProto
140140
};

ecosystem/indexer-grpc/indexer-grpc-file-store/src/processor.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use tracing::debug;
1818
// If the version is ahead of the cache head, retry after a short sleep.
1919
const AHEAD_OF_CACHE_SLEEP_DURATION_IN_MILLIS: u64 = 100;
2020
const SERVICE_TYPE: &str = "file_worker";
21+
const MAX_CONCURRENT_BATCHES: usize = 50;
2122

2223
/// Processor tails the data in cache and stores the data in file store.
2324
pub struct Processor {
@@ -34,7 +35,7 @@ impl Processor {
3435
enable_cache_compression: bool,
3536
) -> Result<Self> {
3637
let cache_storage_format = if enable_cache_compression {
37-
StorageFormat::GzipCompressedProto
38+
StorageFormat::Lz4CompressedProto
3839
} else {
3940
StorageFormat::Base64UncompressedProto
4041
};
@@ -132,8 +133,17 @@ impl Processor {
132133
while start_version + (FILE_ENTRY_TRANSACTION_COUNT) < cache_worker_latest {
133134
batches.push(start_version);
134135
start_version += FILE_ENTRY_TRANSACTION_COUNT;
136+
if batches.len() >= MAX_CONCURRENT_BATCHES {
137+
break;
138+
}
135139
}
136140

141+
tracing::info!(
142+
batch_start_version = batch_start_version,
143+
cache_worker_latest = cache_worker_latest,
144+
batches = ?batches,
145+
"Filestore processor loop"
146+
);
137147
// we're too close to the head
138148
if batches.is_empty() {
139149
debug!(
@@ -150,6 +160,7 @@ impl Processor {
150160

151161
// Create thread and fetch transactions
152162
let mut tasks = vec![];
163+
153164
for start_version in batches {
154165
let mut cache_operator_clone = self.cache_operator.clone();
155166
let mut file_store_operator_clone = self.file_store_operator.clone_box();
@@ -172,7 +183,9 @@ impl Processor {
172183
Some(FILE_ENTRY_TRANSACTION_COUNT as i64),
173184
None,
174185
);
175-
186+
for (i, txn) in transactions.iter().enumerate() {
187+
assert_eq!(txn.version, start_version + i as u64);
188+
}
176189
let upload_start_time = std::time::Instant::now();
177190
let (start, end) = file_store_operator_clone
178191
.upload_transaction_batch(chain_id, transactions)

ecosystem/indexer-grpc/indexer-grpc-utils/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ base64 = { workspace = true }
2222
chrono = { workspace = true }
2323
cloud-storage = { workspace = true }
2424
dashmap = { workspace = true }
25-
flate2 = { workspace = true }
2625
futures = { workspace = true }
2726
itertools = { workspace = true }
27+
lz4 = { workspace = true }
2828
once_cell = { workspace = true }
2929
prometheus = { workspace = true }
3030
prost = { workspace = true }

ecosystem/indexer-grpc/indexer-grpc-utils/src/compression_util.rs

Lines changed: 49 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@
33

44
use crate::default_file_storage_format;
55
use aptos_protos::{indexer::v1::TransactionsInStorage, transaction::v1::Transaction};
6-
use flate2::read::{GzDecoder, GzEncoder};
6+
use lz4::{Decoder, EncoderBuilder};
77
use prost::Message;
88
use ripemd::{Digest, Ripemd128};
99
use serde::{Deserialize, Serialize};
10-
use std::io::Read;
10+
use std::io::{Read, Write};
1111

1212
pub const FILE_ENTRY_TRANSACTION_COUNT: u64 = 1000;
1313

1414
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
1515
pub enum StorageFormat {
16-
GzipCompressedProto,
16+
Lz4CompressedProto,
1717
// Only used for legacy file format.
1818
// Use by cache only.
1919
Base64UncompressedProto,
@@ -66,15 +66,15 @@ impl FileStoreMetadata {
6666
}
6767

6868
pub enum CacheEntry {
69-
GzipCompressionProto(Vec<u8>),
69+
Lz4CompressionProto(Vec<u8>),
7070
// Only used for legacy cache entry.
7171
Base64UncompressedProto(Vec<u8>),
7272
}
7373

7474
impl CacheEntry {
7575
pub fn new(bytes: Vec<u8>, storage_format: StorageFormat) -> Self {
7676
match storage_format {
77-
StorageFormat::GzipCompressedProto => Self::GzipCompressionProto(bytes),
77+
StorageFormat::Lz4CompressedProto => Self::Lz4CompressionProto(bytes),
7878
// Legacy format.
7979
StorageFormat::Base64UncompressedProto => Self::Base64UncompressedProto(bytes),
8080
StorageFormat::JsonBase64UncompressedProto => {
@@ -85,14 +85,14 @@ impl CacheEntry {
8585

8686
pub fn into_inner(self) -> Vec<u8> {
8787
match self {
88-
CacheEntry::GzipCompressionProto(bytes) => bytes,
88+
CacheEntry::Lz4CompressionProto(bytes) => bytes,
8989
CacheEntry::Base64UncompressedProto(bytes) => bytes,
9090
}
9191
}
9292

9393
pub fn size(&self) -> usize {
9494
match self {
95-
CacheEntry::GzipCompressionProto(bytes) => bytes.len(),
95+
CacheEntry::Lz4CompressionProto(bytes) => bytes.len(),
9696
CacheEntry::Base64UncompressedProto(bytes) => bytes.len(),
9797
}
9898
}
@@ -103,13 +103,15 @@ impl CacheEntry {
103103
.encode(&mut bytes)
104104
.expect("proto serialization failed.");
105105
match storage_format {
106-
StorageFormat::GzipCompressedProto => {
107-
let mut compressed = GzEncoder::new(bytes.as_slice(), flate2::Compression::fast());
108-
let mut result = Vec::new();
106+
StorageFormat::Lz4CompressedProto => {
107+
let mut compressed = EncoderBuilder::new()
108+
.level(4)
109+
.build(Vec::new())
110+
.expect("Lz4 compression failed.");
109111
compressed
110-
.read_to_end(&mut result)
111-
.expect("Gzip compression failed.");
112-
CacheEntry::GzipCompressionProto(result)
112+
.write_all(&bytes)
113+
.expect("Lz4 compression failed.");
114+
CacheEntry::Lz4CompressionProto(compressed.finish().0)
113115
},
114116
StorageFormat::Base64UncompressedProto => {
115117
let base64 = base64::encode(bytes).into_bytes();
@@ -124,8 +126,8 @@ impl CacheEntry {
124126

125127
pub fn build_key(version: u64, storage_format: StorageFormat) -> String {
126128
match storage_format {
127-
StorageFormat::GzipCompressedProto => {
128-
format!("gz:{}", version)
129+
StorageFormat::Lz4CompressedProto => {
130+
format!("l4:{}", version)
129131
},
130132
StorageFormat::Base64UncompressedProto => {
131133
format!("{}", version)
@@ -139,12 +141,12 @@ impl CacheEntry {
139141

140142
pub fn into_transaction(self) -> Transaction {
141143
match self {
142-
CacheEntry::GzipCompressionProto(bytes) => {
143-
let mut decompressor = GzDecoder::new(&bytes[..]);
144+
CacheEntry::Lz4CompressionProto(bytes) => {
145+
let mut decompressor = Decoder::new(&bytes[..]).expect("Lz4 decompression failed.");
144146
let mut decompressed = Vec::new();
145147
decompressor
146148
.read_to_end(&mut decompressed)
147-
.expect("Gzip decompression failed.");
149+
.expect("Lz4 decompression failed.");
148150
Transaction::decode(decompressed.as_slice()).expect("proto deserialization failed.")
149151
},
150152
CacheEntry::Base64UncompressedProto(bytes) => {
@@ -156,15 +158,15 @@ impl CacheEntry {
156158
}
157159

158160
pub enum FileEntry {
159-
GzipCompressionProto(Vec<u8>),
161+
Lz4CompressionProto(Vec<u8>),
160162
// Only used for legacy file format.
161163
JsonBase64UncompressedProto(Vec<u8>),
162164
}
163165

164166
impl FileEntry {
165167
pub fn new(bytes: Vec<u8>, storage_format: StorageFormat) -> Self {
166168
match storage_format {
167-
StorageFormat::GzipCompressedProto => Self::GzipCompressionProto(bytes),
169+
StorageFormat::Lz4CompressedProto => Self::Lz4CompressionProto(bytes),
168170
StorageFormat::Base64UncompressedProto => {
169171
panic!("Base64UncompressedProto is not supported.")
170172
},
@@ -174,14 +176,14 @@ impl FileEntry {
174176

175177
pub fn into_inner(self) -> Vec<u8> {
176178
match self {
177-
FileEntry::GzipCompressionProto(bytes) => bytes,
179+
FileEntry::Lz4CompressionProto(bytes) => bytes,
178180
FileEntry::JsonBase64UncompressedProto(bytes) => bytes,
179181
}
180182
}
181183

182184
pub fn size(&self) -> usize {
183185
match self {
184-
FileEntry::GzipCompressionProto(bytes) => bytes.len(),
186+
FileEntry::Lz4CompressionProto(bytes) => bytes.len(),
185187
FileEntry::JsonBase64UncompressedProto(bytes) => bytes.len(),
186188
}
187189
}
@@ -203,18 +205,20 @@ impl FileEntry {
203205
panic!("Starting version has to be a multiple of FILE_ENTRY_TRANSACTION_COUNT.")
204206
}
205207
match storage_format {
206-
StorageFormat::GzipCompressedProto => {
208+
StorageFormat::Lz4CompressedProto => {
207209
let t = TransactionsInStorage {
208210
starting_version: Some(transactions.first().unwrap().version),
209211
transactions,
210212
};
211213
t.encode(&mut bytes).expect("proto serialization failed.");
212-
let mut compressed = GzEncoder::new(bytes.as_slice(), flate2::Compression::fast());
213-
let mut result = Vec::new();
214+
let mut compressed = EncoderBuilder::new()
215+
.level(4)
216+
.build(Vec::new())
217+
.expect("Lz4 compression failed.");
214218
compressed
215-
.read_to_end(&mut result)
216-
.expect("Gzip compression failed.");
217-
FileEntry::GzipCompressionProto(result)
219+
.write_all(&bytes)
220+
.expect("Lz4 compression failed.");
221+
FileEntry::Lz4CompressionProto(compressed.finish().0)
218222
},
219223
StorageFormat::Base64UncompressedProto => {
220224
panic!("Base64UncompressedProto is not supported.")
@@ -247,9 +251,9 @@ impl FileEntry {
247251
hasher.update(starting_version.to_string());
248252
let file_prefix = format!("{:x}", hasher.finalize());
249253
match storage_format {
250-
StorageFormat::GzipCompressedProto => {
254+
StorageFormat::Lz4CompressedProto => {
251255
format!(
252-
"compressed_files/gzip/{}_{}.bin",
256+
"compressed_files/lz4/{}_{}.bin",
253257
file_prefix, starting_version
254258
)
255259
},
@@ -264,12 +268,12 @@ impl FileEntry {
264268

265269
pub fn into_transactions_in_storage(self) -> TransactionsInStorage {
266270
match self {
267-
FileEntry::GzipCompressionProto(bytes) => {
268-
let mut decompressor = GzDecoder::new(&bytes[..]);
271+
FileEntry::Lz4CompressionProto(bytes) => {
272+
let mut decompressor = Decoder::new(&bytes[..]).expect("Lz4 decompression failed.");
269273
let mut decompressed = Vec::new();
270274
decompressor
271275
.read_to_end(&mut decompressed)
272-
.expect("Gzip decompression failed.");
276+
.expect("Lz4 decompression failed.");
273277
TransactionsInStorage::decode(decompressed.as_slice())
274278
.expect("proto deserialization failed.")
275279
},
@@ -317,7 +321,7 @@ mod tests {
317321
}
318322

319323
#[test]
320-
fn test_cache_entry_builder_gzip_compressed_proto() {
324+
fn test_cache_entry_builder_lz4_compressed_proto() {
321325
let transaction = Transaction {
322326
version: 42,
323327
epoch: 333,
@@ -326,7 +330,7 @@ mod tests {
326330
let transaction_clone = transaction.clone();
327331
let proto_size = transaction.encoded_len();
328332
let cache_entry =
329-
CacheEntry::from_transaction(transaction, StorageFormat::GzipCompressedProto);
333+
CacheEntry::from_transaction(transaction, StorageFormat::Lz4CompressedProto);
330334
let compressed_size = cache_entry.size();
331335
assert!(compressed_size != proto_size);
332336
let deserialized_transaction = cache_entry.into_transaction();
@@ -379,7 +383,7 @@ mod tests {
379383
}
380384

381385
#[test]
382-
fn test_file_entry_builder_gzip_compressed_proto() {
386+
fn test_file_entry_builder_lz4_compressed_proto() {
383387
let transactions = (1000..2000)
384388
.map(|version| Transaction {
385389
version,
@@ -393,7 +397,7 @@ mod tests {
393397
};
394398
let transactions_in_storage_size = transactions_in_storage.encoded_len();
395399
let file_entry =
396-
FileEntry::from_transactions(transactions.clone(), StorageFormat::GzipCompressedProto);
400+
FileEntry::from_transactions(transactions.clone(), StorageFormat::Lz4CompressedProto);
397401
assert_ne!(file_entry.size(), transactions_in_storage_size);
398402
let deserialized_transactions = file_entry.into_transactions_in_storage();
399403
for (i, transaction) in transactions.iter().enumerate() {
@@ -402,10 +406,10 @@ mod tests {
402406
}
403407

404408
#[test]
405-
fn test_cache_entry_key_to_string_gzip_compressed_proto() {
409+
fn test_cache_entry_key_to_string_lz4_compressed_proto() {
406410
assert_eq!(
407-
CacheEntry::build_key(42, StorageFormat::GzipCompressedProto),
408-
"gz:42"
411+
CacheEntry::build_key(42, StorageFormat::Lz4CompressedProto),
412+
"l4:42"
409413
);
410414
}
411415

@@ -424,10 +428,10 @@ mod tests {
424428
}
425429

426430
#[test]
427-
fn test_file_entry_key_to_string_gzip_compressed_proto() {
431+
fn test_file_entry_key_to_string_lz4_compressed_proto() {
428432
assert_eq!(
429-
FileEntry::build_key(42, StorageFormat::GzipCompressedProto),
430-
"compressed_files/gzip/3d1bff1ba654ca5fdb6ac1370533d876_0.bin"
433+
FileEntry::build_key(42, StorageFormat::Lz4CompressedProto),
434+
"compressed_files/lz4/3d1bff1ba654ca5fdb6ac1370533d876_0.bin"
431435
);
432436
}
433437

@@ -470,15 +474,15 @@ mod tests {
470474
"chain_id": 1,
471475
"file_folder_size": 1000,
472476
"version": 1,
473-
"storage_format": "GzipCompressedProto"
477+
"storage_format": "Lz4CompressedProto"
474478
}"#;
475479

476480
let file_metadata: FileStoreMetadata = serde_json::from_str(file_metadata_serialized_json)
477481
.expect("FileStoreMetadata deserialization failed.");
478482

479483
assert_eq!(
480484
file_metadata.storage_format,
481-
StorageFormat::GzipCompressedProto
485+
StorageFormat::Lz4CompressedProto
482486
);
483487
assert_eq!(file_metadata.chain_id, 1);
484488
assert_eq!(file_metadata.file_folder_size, 1000);

ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/gcs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ impl GcsFileStoreOperator {
3131
) -> Self {
3232
env::set_var(SERVICE_ACCOUNT_ENV_VAR, service_account_path);
3333
let storage_format = if enable_compression {
34-
StorageFormat::GzipCompressedProto
34+
StorageFormat::Lz4CompressedProto
3535
} else {
3636
StorageFormat::JsonBase64UncompressedProto
3737
};

ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/local.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub struct LocalFileStoreOperator {
2323
impl LocalFileStoreOperator {
2424
pub fn new(path: PathBuf, enable_compression: bool) -> Self {
2525
let storage_format = if enable_compression {
26-
StorageFormat::GzipCompressedProto
26+
StorageFormat::Lz4CompressedProto
2727
} else {
2828
StorageFormat::JsonBase64UncompressedProto
2929
};

0 commit comments

Comments
 (0)