-
Notifications
You must be signed in to change notification settings - Fork 709
ct/reconciler: reconcile topics together based on time-based retention properties
#29278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
A function that returns the effective lifetime of data based on time-dependent Kafka retention properties.
…erties We recently changed the `reconciler` to only multiplex together partitions from the same topic. This is overly pessimistic- to make garbage collection of L1 objects efficient, we should be able to reconcile together partitions that have similar expected data lifetimes. The easiest way, at present, to do this is to use time based retention properties like `retention.ms` (for `delete` topics) or `max.compaction.lag.ms` (for `compact` topics). In the worst case (or for topics that don't have time based retention enabled), this approach falls back to grouping together sources by `topic_id`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR updates the reconciler to group partitions for reconciliation based on time-based retention properties rather than only by topic_id. This enables more efficient garbage collection of L1 objects by grouping partitions with similar data lifetimes together.
Changes:
- Introduced retention-based bucketing using logarithmic scaling to group sources with similar lifetimes
- Added
effective_retention_ms()method to thesourceinterface to expose time-based retention properties - Moved
partition_sources_into_sets()from a private reconciler method to a public function for better testability
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/v/cloud_topics/reconciler/reconciliation_source.h | Added virtual effective_retention_ms() method to source interface |
| src/v/cloud_topics/reconciler/reconciliation_source.cc | Implemented effective_retention_ms() for l0_source using ntp_config properties |
| src/v/cloud_topics/reconciler/reconciler.h | Moved partition_sources_into_sets() from private method to public function |
| src/v/cloud_topics/reconciler/reconciler.cc | Rewrote partitioning logic to use retention-based logarithmic bucketing instead of topic_id grouping |
| src/v/cloud_topics/reconciler/BUILD | Added storage dependency for ntp_config access |
| src/v/cloud_topics/reconciler/tests/test_utils.h | Added fake_l0_source test helper with ntp_config-based retention calculation |
| src/v/cloud_topics/reconciler/tests/partition_sources_into_sets_test.cc | Added comprehensive unit tests for retention-based partitioning logic |
| src/v/cloud_topics/reconciler/tests/BUILD | Added new test target with required dependencies |
| chunked_hash_map<uint8_t, chunked_vector<ss::shared_ptr<source>>> | ||
| retention_bucket_idx_to_source; |
Copilot
AI
Jan 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using uint8_t limits the number of buckets to 256, but the comment on line 247 states there can be at most 87 buckets. While 256 is sufficient, the type choice should be documented or verified. More critically, the cast on line 263 could overflow if the computed bucket index exceeds 255, potentially causing silent data corruption by wrapping around to lower buckets. Consider adding a safety check or using a larger integer type like uint16_t.
| // These should all be in different buckets due to log scaling | ||
| // 1h, ~2.7h (1h * e), ~7.4h (1h * e^2), ~20h (1h * e^3) | ||
| sources.push_back(make_delete_source(1h)); | ||
| sources.push_back(make_compact_source( | ||
| std::chrono::milliseconds( | ||
| static_cast<int64_t>(std::chrono::milliseconds(1h).count() * 2.72)))); | ||
| sources.push_back(make_delete_source( | ||
| std::chrono::milliseconds( | ||
| static_cast<int64_t>(std::chrono::milliseconds(1h).count() * 7.39)))); | ||
| sources.push_back(make_compact_source( | ||
| std::chrono::milliseconds( | ||
| static_cast<int64_t>(std::chrono::milliseconds(1h).count() * 20.09)))); |
Copilot
AI
Jan 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The magic number 2.72 (approximation of e) is used here and in similar calculations below. Consider defining a named constant like constexpr double e_approx = 2.71828; to make the test's intent clearer and reduce the risk of typos in the approximation.
| // These should all be in different buckets due to log scaling | |
| // 1h, ~2.7h (1h * e), ~7.4h (1h * e^2), ~20h (1h * e^3) | |
| sources.push_back(make_delete_source(1h)); | |
| sources.push_back(make_compact_source( | |
| std::chrono::milliseconds( | |
| static_cast<int64_t>(std::chrono::milliseconds(1h).count() * 2.72)))); | |
| sources.push_back(make_delete_source( | |
| std::chrono::milliseconds( | |
| static_cast<int64_t>(std::chrono::milliseconds(1h).count() * 7.39)))); | |
| sources.push_back(make_compact_source( | |
| std::chrono::milliseconds( | |
| static_cast<int64_t>(std::chrono::milliseconds(1h).count() * 20.09)))); | |
| // Approximate e and its powers for bucket spacing | |
| constexpr double e_approx = 2.71828; | |
| constexpr double e2_approx = e_approx * e_approx; | |
| constexpr double e3_approx = e2_approx * e_approx; | |
| // These should all be in different buckets due to log scaling | |
| // 1h, ~2.7h (1h * e), ~7.4h (1h * e^2), ~20h (1h * e^3) | |
| sources.push_back(make_delete_source(1h)); | |
| sources.push_back(make_compact_source( | |
| std::chrono::milliseconds(static_cast<int64_t>( | |
| std::chrono::milliseconds(1h).count() * e_approx)))); | |
| sources.push_back(make_delete_source( | |
| std::chrono::milliseconds(static_cast<int64_t>( | |
| std::chrono::milliseconds(1h).count() * e2_approx)))); | |
| sources.push_back(make_compact_source( | |
| std::chrono::milliseconds(static_cast<int64_t>( | |
| std::chrono::milliseconds(1h).count() * e3_approx)))); |
rockwotj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll leave my thoughts in slack for this, but I think we should do more research before making assumptions here.
35390bb to
f44a0cb
Compare
Retry command for Build#79107please wait until all jobs are finished before running the slash command |
| // Bucket width for logarithmic bucketing based on retention lifetimes. Each | ||
| // bucket spans ~1.65x. With natural log scaling and a bucket_width = 0.5, | ||
| // we will have at most (when considering the int64_t::max() cap on | ||
| // retention properties) 87 buckets. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have few problems with this approach.
First, all source sets are reconciled sequentially. Every source set generates the staging file and then this file gets uploaded. So all latencies from all source sets are getting stacked on top of one another. If the reconciler is running non-stop we will get some base latency with one bucket. If the number of buckets doubles the throughput gets twice as low etc.
Second, the scheduler adopts the largest number of produced bytes. This means that the L1 object size will always undershoot. Essentially, we're stacking a bunch of upload latencies but the scheduler is not aware of that. It registers that certain number of bytes are uploaded during some time interval. We're losing the information about the upload size in most buckets.
Finally, there is an issue of sharding. The reconciler runs on every shard. So it will make as many L1 uploads per cycle as there are shards. Let's say we want to upload 64MiB objects. With one shard we will keep 64MiB of data in cache. If ingress is 1MiB/s we will be uploading ~one object per minute.
Now let's say we have 32 shards. Let's say we have enough partitions to use all of them and the data is distributed evenly across all 32 shards. If the ingress is the same we will have to accumulate data for about 30 min so every shard could have 64MiB. And that will require 2GiB of memory in the record batch cache (of course we will upload smaller objects more frequently to maintain RPO). Now we're multiplying this factor by 3 or 4. I understand that we will be able to reconcile all this data anyway. But we will also have to read all this data back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a point - This PR will actually do more to group together partitions than what is currently in place, since the current policy only groups together partitions of the same topic.
When we group by retention policy, we will still continue to group together partitions of the same topic, with the added benefit of grouping together partitions from other topics that have similar retention policies.
CI test resultstest results on build#79107
|
We recently changed the
reconcilerto only multiplex together partitions from the same topic.This is overly pessimistic- to make garbage collection of L1 objects efficient, we should be able to reconcile together partitions that have similar expected data lifetimes. The easiest way, at present, to do this is to use time based retention properties like
retention.ms(fordeletetopics) ormax.compaction.lag.ms(forcompacttopics).In the worst case (or for topics that don't have time based retention enabled), this approach falls back to grouping together sources by
topic_id.We are not yet grouping together topics based on their size-based retention properties (
retention.bytes,min.cleanable.dirty.ratio), as this requires assumptions about ingress rates & behaviors that are not explicitly defined by these properties alone.Backports Required
Release Notes