feat: [HUDI-18735][FLINK] Add bucketassign minibatch cache hit/miss metrics#18761
feat: [HUDI-18735][FLINK] Add bucketassign minibatch cache hit/miss metrics#18761yaoliclshlmch wants to merge 1 commit into
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #18761 +/- ##
=========================================
Coverage 68.15% 68.16%
- Complexity 29148 29163 +15
=========================================
Files 2521 2522 +1
Lines 141353 141380 +27
Branches 17549 17550 +1
=========================================
+ Hits 96343 96372 +29
- Misses 37076 37078 +2
+ Partials 7934 7930 -4
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds two simple counter metrics (bucketassign.minibatch.cache.hit.count / miss.count) to the GRLI backend by mirroring the existing RocksDBIndexBackend / FlinkRocksDBIndexMetrics pattern, with a re-registration guard and a null-guard at the call site. The hit/miss arithmetic (recordKeys.size() - missedKeys.size() vs missedKeys.size()) lines up with how the existing miss-collection loop builds missedKeys, and the test coverage exercises both the single-key and batch paths plus the unregistered-metrics path. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small naming and delegation nits below, but overall the code is clean and well-structured.
cc @yihua
| } | ||
|
|
||
| public void markCacheHit() { | ||
| cacheHitCount.inc(); |
There was a problem hiding this comment.
🤖 nit: could you have the no-arg markCacheHit() delegate to markCacheHit(1L) (and same for markCacheMiss())? Right now the two overloads are independent, so if any logic is ever added to the bulk path the single-increment path would silently miss it.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Describe the issue this Pull Request addresses
The Flink bucket-assign operator on the global record-level index (GRLI) path keeps an in-memory
RecordIndexCacheto serve record-key lookups, falling back to the metadata table on a miss. Today the cache is a black box at runtime — operators have no visibility into how effective it is, which makes it impossible to alert on a degraded hit ratio (e.g. due to undersized cache, eviction churn, or key-skew patterns) or to tuneINDEX_RLI_CACHE_SIZEbased on real workload behavior.This PR adds two counter metrics so the in-memory cache hit / miss volume becomes observable and alertable.
Summary and Changelog
Adds two Flink counter metrics on the bucket-assign operator's
MetricGroup, registered automatically whenever the GRLI backend is in use:bucketassign.minibatch.cache.hit.count— record-key lookups served by the in-memoryRecordIndexCache.bucketassign.minibatch.cache.miss.count— record-key lookups that fell back to a metadata-table read.Changelog:
org.apache.hudi.metrics.FlinkBucketAssignMetrics(mirrors the existingFlinkRocksDBIndexMetricspattern):SimpleCounterinstances and registers them under the suppliedMetricGroup.markCacheHit() / markCacheHit(long)andmarkCacheMiss() / markCacheMiss(long)helpers; the(long)overloads short-circuit onn <= 0so a single call site can drain a whole batch's hit/miss counts in one shot.org.apache.hudi.sink.partitioner.index.GlobalRecordLevelIndexBackend:registerMetrics(MetricGroup)to instantiateFlinkBucketAssignMetrics(re-registration guarded, matchingRocksDBIndexBackend).get(List<String> recordKeys)— which the single-keyget(String)overload also delegates to — bumps the hit/miss counters byrecordKeys.size() - missedKeys.size()andmissedKeys.size()after the existing miss-collection loop. The increment is null-guarded so call sites that never callregisterMetrics(e.g. unit-test harnesses) are unaffected.TestFlinkBucketAssignMetricsand three additional cases inTestGlobalRecordLevelIndexBackend.No code copied from other projects.
Impact
IndexBackend#registerMetricsextension point already existed.Counter.inc(long)calls per batch lookup, after the existing miss-collection loop.Risk Level
low
The change is observability-only: cache lookup semantics are unchanged, no new public APIs, no new config keys, and the increment path is a constant-time pair of counter bumps after the existing miss-collection loop. Behavior in tests that don't register metrics is preserved by the null-guard.
Verification: 46 tests across the bucket-assign + metrics surface pass with
-Pspark3.3,flink1.18(TestFlinkBucketAssignMetrics,TestGlobalRecordLevelIndexBackend,TestMinibatchBucketAssignFunction,TestBucketAssigner,TestRecordIndexCache,TestRocksDBIndexBackend,TestFlinkCompactionMetrics).Documentation Update
none — no new config or user-facing feature beyond two additional metrics emitted by the existing operator. The metric names are self-descriptive and follow the same convention as
FlinkRocksDBIndexMetrics.Contributor's checklist