Dynamic Bucket for Flink streaming with Partitioned RLI #18514
Replies: 3 comments 3 replies
-
|
overall looks good, can you clarify these items:
|
Beta Was this translation helpful? Give feedback.
-
Currently, we already have
For partitioned RLI, mappings are organized by data partition. They are not mixed together with mappings from other data partitions, thus there is no cross-partition read amplification. Concretely, the file Group ID naming for partitioned RLI is :
|
Beta Was this translation helpful? Give feedback.
-
|
Thanks @cshuo for the detailed writeup — the problem statement is clear and the motivation around limitations of existing bucket indexes is well articulated. I had a question about the design choice that I'd like to understand better. The core of this proposal is: use partitioned RLI as the persistent key → bucket mapping, lazily load it into an in-memory cache, and look up every key against that cache for routing. The bucket assignment is immutable once written. But if we're already paying the cost of maintaining partitioned RLI and doing per-key lookups against it, I'm wondering — what does the bucket index abstraction add on top of just using partitioned RLI directly? Consider the standard write path with partitioned RLI (no bucket index):
The main difference I see is that this proposal makes bucket assignment immutable forever, which is presented as a benefit (no data relocation). But this also means:
With plain partitioned RLI, clustering can merge small file groups, split large ones, re-sort data — and simply update the RLI. The layout remains fully optimizable over time, which seems strictly more flexible. I'd also like to flag the workload profile assumption here. The lazy bootstrap + partition-granularity cache eviction works well for fact table workloads where only recent partitions are actively written to — older partitions go cold, their caches get evicted, and memory stays bounded. But for dimension table workloads, where updates arrive across all partitions randomly and continuously, most partitions stay hot. In that scenario, the cache effectively needs to hold key → bucket mappings for the entire table in memory, and partition-level eviction provides little relief. How would this design handle such workloads without running into memory pressure? So the question is: is there a specific capability or property that the bucket index framing provides, beyond what partitioned RLI with the existing small file handling already gives us? If the answer is primarily the file naming convention and compatibility with bucket index readers, that might not justify the immutability constraint and the workload limitations. Would love to hear your thoughts. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Background
Hudi currently provides multiple bucket-based indexing options, but all have practical limitations for continuously growing production workloads.
The main limitation of simple bucket index is weak rescaling capability: the number of buckets is fixed once configured, when data keeps growing over time, each bucket may accumulate more and more records which can eventually hurt query performance.
The partition-level bucket index improves flexibility by allowing different partitions to use different bucket numbers. However, once configured for a partition, the bucket count is still not dynamically scalable online, and can only be rescaled through an offline rewrite process.
The consistent bucket index supports bucket split and merge, but it also has limitations: 1) The overall bucket resize lifecycle is coupled with clustering; 2) Before clustering completes, writes during the transition rely on dual-write semantics, which introduces extra write overhead and affects write performance. For Flink streaming workloads, this model is too heavy to use.
This proposal takes a different direction:
Reuse Hudi's partitioned RLI as the source of truth for bucket assigning.
Dynamic bucket growth only affects new keys, old keys always go to the original bucket, thus no split/merge for original bucket.
With this approach, dynamic bucket growth can be performed online during streaming ingestion and is also lightweight, without coupling with any background table service and heavy dual-write.
Goals
Support dynamic bucket assigning built on top of partitioned RLI.
Keep bucket assignment immutable once a record key is assigned to avoid historical data relocation for bucket growth.
Support lazy bootstrap of
key -> bucketcache from partitioned RLI.Keep memory usage bounded through partition-granularity cache lifecycle management.
Reuse existing MDT / RLI infrastructure as much as possible.
Non Goals
Introducing a new hash-based or consistent-hashing bucket index.
Rebalancing historical keys after bucket growth.
Solving hot-key skew caused by a small set of existing keys.
Multiple writers scenario.
The Design
The high-level ideas:
Use partitioned RLI as the persistent backend for dynamic bucket assigning.
Set the initial bucket/file-group count to the number of bucket assigners.
Maintain per-partition mapping cache in the bucket assigner, and is lazily bootstrapped from partitioned RLI.
Partition -> { recordKey -> fileId }The memory usage of the cache is bounded and can be spilled to disk.
Support partition-granularity cache eviction after commit and inactivity
Support MDT lookup for the index data of a specific data partition
The Impl
The Initial Bucket Count
This gives a natural initial routing space and aligns initial bucket layout with write parallelism.
The Bucket Assigning Strategy
We are not calculating bucket id based on static hash strategy anymore. Instead, for each data partition, maintain the per-partition mapping cache:
Assigning behavior:

if the record key already exists in the mapping, use the existing bucket id/fileGroup id
if the record key does not exist:
Select a bucket which is not 'full', and assign the bucket to the record key.
Create a new bucket if all the existing buckets are 'full',
Lazy Bootstrap for Bucket Assign Cache
The
recordKey -> fileIdcache is bootstrapped lazily from partitioned RLI.Behavior:
No eager preload for all partitions
When a partition becomes active, load its routing mapping from partitioned RLI
Once loaded, serve the bucket assigning from the cache.
Set a total memory cap for the cache, and the cache will spill to disk/rocksdb if exceeding the limit.
This keeps memory proportional to active partitions instead of total table size.
Cache Eviction
Bucket assigning Cache is managed at partition granularity, there a flag for each Partition bucket cache:
lastUpdatedCheckpoint: denote the last checkpoint interval during which the bucket cache is updated.Eviction flow
When bucket assigner assigns bucket for a record key:
lastUpdatedCheckpointas the current checkpoint id.When the bucket assigner operator receives a checkpoint complete notification:
Get the latest successful checkpoint id
lastestSuccessfulCheckpointcorrespoinding to the latest completed instant.Save the
lastestSuccessfulCheckpointin bucket assign operator, which will be used to decide whether a bucket assign cache is evictable.The bucket assign cache is evicted lazily:
The eviction strategy can avoid unbounded cache growth while keeping hot partitions resident.
The Write of Partitioned RLI
The index metadata is stored under the partitioned RLI in MDT, since the index write pipeline for global RLI is already supported, we can reuse the pipeline for partitioned RLI.
Index write rules:
Only insert index entries for new record keys
Existing keys never update their bucket assignment
This keeps index maintenance simple and the partitioned RLI data updated incrementally.
The Lookup Path
The system should support lookup query of partitoned RLI data for a specific data partition from MDT.
Concurrent Writers
This design currently does not work well under the concurrent writers scenario. The main risks are:
Conflicting bucket assignment for the same new key
two writers may assign the same new key to different buckets
this breaks the correctness of the
key -> bucketmappingConflicting bucket creation during bucket expansion
two writers may generate the same bucket id but bind it to different file groups
this creates bucket ownership conflicts, similar to the problem in simple bucket index.
Because of these risks, concurrent writers are not supported without additional coordination.
Benefits
No need to move/rewrite historical data when bucket count grows.
No bucket lineage or transitional read semantics are required.
Update routing remains simple because the same key always maps to the same bucket.
Reuses Hudi's existing partitioned RLI / MDT infrastructure.
More natural for workloads where new keys continuously arrive over time.
Tradeoffs / Risks
The cache can become large for hot, high-cardinality partitions.
First access to a large partition may incur bootstrap latency.
Bucket growth only helps future new keys, not existing hot keys.
Summary
This proposal introduces a Partitioned RLI-based Dynamic Bucket Index for Hudi.
The key idea is to use partitioned RLI as the persistent routing backend for a stable per-key bucket assignment:
initial bucket count is set by bucket assigner parallelism
the cache is bootstrapped lazily per partition, and is evicted gradually to avoid OOM.
only index entries for new keys are written into RLI
In short, this design treats dynamic bucket routing as an explicit metadata indexing problem, using Hudi's own partitioned RLI as the source of truth.
Beta Was this translation helpful? Give feedback.
All reactions