@@ -876,10 +876,7 @@ impl LogServer {
876
876
& self ,
877
877
request : GetAllCollectionInfoToCompactRequest ,
878
878
) -> Result < Response < GetAllCollectionInfoToCompactResponse > , Status > {
879
- // TODO(rescrv): Realistically we could make this configurable.
880
- // TODO(rescrv): Magic constant.
881
- const MAX_COLLECTION_INFO_NUMBER : usize = 10000 ;
882
- let mut selected_rollups = Vec :: with_capacity ( MAX_COLLECTION_INFO_NUMBER ) ;
879
+ let mut selected_rollups = Vec :: with_capacity ( self . config . rollup_max_size ) ;
883
880
// Do a non-allocating pass here.
884
881
{
885
882
let need_to_compact = self . need_to_compact . lock ( ) ;
@@ -889,6 +886,9 @@ impl LogServer {
889
886
>= request. min_compaction_size
890
887
{
891
888
selected_rollups. push ( ( * collection_id, * rollup) ) ;
889
+ if selected_rollups. len ( ) >= self . config . rollup_max_size {
890
+ break ;
891
+ }
892
892
}
893
893
}
894
894
}
@@ -997,17 +997,16 @@ impl LogServer {
997
997
. scan (
998
998
cursor. position ,
999
999
Limits {
1000
- max_files : Some ( 10_000 ) ,
1000
+ max_files : Some ( self . config . rollup_max_size as u64 ) ,
1001
1001
max_bytes : Some ( 1_000_000_000 ) ,
1002
- max_records : Some ( 10_000 ) ,
1002
+ max_records : Some ( self . config . rollup_max_size as u64 ) ,
1003
1003
} ,
1004
1004
)
1005
1005
. await ?;
1006
1006
if dirty_fragments. is_empty ( ) {
1007
1007
return Ok ( ( witness, cursor, vec ! [ ] ) ) ;
1008
1008
}
1009
- // TODO(rescrv): Magic constant.
1010
- if dirty_fragments. len ( ) >= 1_000 {
1009
+ if dirty_fragments. len ( ) >= self . config . rollup_max_size / 10 {
1011
1010
tracing:: error!( "Too many dirty fragments: {}" , dirty_fragments. len( ) ) ;
1012
1011
}
1013
1012
let dirty_futures = dirty_fragments
@@ -2048,6 +2047,8 @@ pub struct LogServerConfig {
2048
2047
pub reinsert_threshold : u64 ,
2049
2048
#[ serde( default = "LogServerConfig::default_rollup_interval" ) ]
2050
2049
pub rollup_interval : Duration ,
2050
+ #[ serde( default = "LogServerConfig::default_rollup_max_size" ) ]
2051
+ pub rollup_max_size : usize ,
2051
2052
#[ serde( default = "LogServerConfig::default_log_keepalive" ) ]
2052
2053
pub log_keep_alive : Duration ,
2053
2054
#[ serde( default = "LogServerConfig::default_effectuate_log_transfer_batch_size" ) ]
@@ -2085,6 +2086,11 @@ impl LogServerConfig {
2085
2086
Duration :: from_secs ( 10 )
2086
2087
}
2087
2088
2089
+ /// return at most 10k collection infos from get all collections to compact
2090
+ fn default_rollup_max_size ( ) -> usize {
2091
+ 10_000
2092
+ }
2093
+
2088
2094
/// keep logs in-memory for 60 seconds
2089
2095
fn default_log_keepalive ( ) -> Duration {
2090
2096
Duration :: from_secs ( 60 )
@@ -2120,6 +2126,7 @@ impl Default for LogServerConfig {
2120
2126
num_records_before_backpressure : Self :: default_num_records_before_backpressure ( ) ,
2121
2127
reinsert_threshold : Self :: default_reinsert_threshold ( ) ,
2122
2128
rollup_interval : Self :: default_rollup_interval ( ) ,
2129
+ rollup_max_size : Self :: default_rollup_max_size ( ) ,
2123
2130
log_keep_alive : Self :: default_log_keepalive ( ) ,
2124
2131
effectuate_log_transfer_batch_size : Self :: default_effectuate_log_transfer_batch_size ( ) ,
2125
2132
effectuate_log_transfer_retries : Self :: default_effectuate_log_transfer_retries ( ) ,
0 commit comments