Skip to content

Conversation

@WillemKauf
Copy link
Contributor

We recently changed the reconciler to only multiplex together partitions from the same topic.

This is overly pessimistic- to make garbage collection of L1 objects efficient, we should be able to reconcile together partitions that have similar expected data lifetimes. The easiest way, at present, to do this is to use time based retention properties like retention.ms (for delete topics) or max.compaction.lag.ms (for compact topics).

In the worst case (or for topics that don't have time based retention enabled), this approach falls back to grouping together sources by topic_id.

We are not yet grouping together topics based on their size-based retention properties (retention.bytes, min.cleanable.dirty.ratio), as this requires assumptions about ingress rates & behaviors that are not explicitly defined by these properties alone.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v25.3.x
  • v25.2.x
  • v25.1.x

Release Notes

  • none

A function that returns the effective lifetime of data based on
time-dependent Kafka retention properties.
…erties

We recently changed the `reconciler` to only multiplex together partitions
from the same topic.

This is overly pessimistic- to make garbage collection of L1 objects efficient,
we should be able to reconcile together partitions that have similar expected
data lifetimes. The easiest way, at present, to do this is to use time based
retention properties like `retention.ms` (for `delete` topics) or
`max.compaction.lag.ms` (for `compact` topics).

In the worst case (or for topics that don't have time based retention enabled),
this approach falls back to grouping together sources by `topic_id`.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the reconciler to group partitions for reconciliation based on time-based retention properties rather than only by topic_id. This enables more efficient garbage collection of L1 objects by grouping partitions with similar data lifetimes together.

Changes:

  • Introduced retention-based bucketing using logarithmic scaling to group sources with similar lifetimes
  • Added effective_retention_ms() method to the source interface to expose time-based retention properties
  • Moved partition_sources_into_sets() from a private reconciler method to a public function for better testability

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/v/cloud_topics/reconciler/reconciliation_source.h Added virtual effective_retention_ms() method to source interface
src/v/cloud_topics/reconciler/reconciliation_source.cc Implemented effective_retention_ms() for l0_source using ntp_config properties
src/v/cloud_topics/reconciler/reconciler.h Moved partition_sources_into_sets() from private method to public function
src/v/cloud_topics/reconciler/reconciler.cc Rewrote partitioning logic to use retention-based logarithmic bucketing instead of topic_id grouping
src/v/cloud_topics/reconciler/BUILD Added storage dependency for ntp_config access
src/v/cloud_topics/reconciler/tests/test_utils.h Added fake_l0_source test helper with ntp_config-based retention calculation
src/v/cloud_topics/reconciler/tests/partition_sources_into_sets_test.cc Added comprehensive unit tests for retention-based partitioning logic
src/v/cloud_topics/reconciler/tests/BUILD Added new test target with required dependencies

Comment on lines +252 to +253
chunked_hash_map<uint8_t, chunked_vector<ss::shared_ptr<source>>>
retention_bucket_idx_to_source;
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using uint8_t limits the number of buckets to 256, but the comment on line 247 states there can be at most 87 buckets. While 256 is sufficient, the type choice should be documented or verified. More critically, the cast on line 263 could overflow if the computed bucket index exceeds 255, potentially causing silent data corruption by wrapping around to lower buckets. Consider adding a safety check or using a larger integer type like uint16_t.

Copilot uses AI. Check for mistakes.
Comment on lines +169 to +180
// These should all be in different buckets due to log scaling
// 1h, ~2.7h (1h * e), ~7.4h (1h * e^2), ~20h (1h * e^3)
sources.push_back(make_delete_source(1h));
sources.push_back(make_compact_source(
std::chrono::milliseconds(
static_cast<int64_t>(std::chrono::milliseconds(1h).count() * 2.72))));
sources.push_back(make_delete_source(
std::chrono::milliseconds(
static_cast<int64_t>(std::chrono::milliseconds(1h).count() * 7.39))));
sources.push_back(make_compact_source(
std::chrono::milliseconds(
static_cast<int64_t>(std::chrono::milliseconds(1h).count() * 20.09))));
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic number 2.72 (approximation of e) is used here and in similar calculations below. Consider defining a named constant like constexpr double e_approx = 2.71828; to make the test's intent clearer and reduce the risk of typos in the approximation.

Suggested change
// These should all be in different buckets due to log scaling
// 1h, ~2.7h (1h * e), ~7.4h (1h * e^2), ~20h (1h * e^3)
sources.push_back(make_delete_source(1h));
sources.push_back(make_compact_source(
std::chrono::milliseconds(
static_cast<int64_t>(std::chrono::milliseconds(1h).count() * 2.72))));
sources.push_back(make_delete_source(
std::chrono::milliseconds(
static_cast<int64_t>(std::chrono::milliseconds(1h).count() * 7.39))));
sources.push_back(make_compact_source(
std::chrono::milliseconds(
static_cast<int64_t>(std::chrono::milliseconds(1h).count() * 20.09))));
// Approximate e and its powers for bucket spacing
constexpr double e_approx = 2.71828;
constexpr double e2_approx = e_approx * e_approx;
constexpr double e3_approx = e2_approx * e_approx;
// These should all be in different buckets due to log scaling
// 1h, ~2.7h (1h * e), ~7.4h (1h * e^2), ~20h (1h * e^3)
sources.push_back(make_delete_source(1h));
sources.push_back(make_compact_source(
std::chrono::milliseconds(static_cast<int64_t>(
std::chrono::milliseconds(1h).count() * e_approx))));
sources.push_back(make_delete_source(
std::chrono::milliseconds(static_cast<int64_t>(
std::chrono::milliseconds(1h).count() * e2_approx))));
sources.push_back(make_compact_source(
std::chrono::milliseconds(static_cast<int64_t>(
std::chrono::milliseconds(1h).count() * e3_approx))));

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave my thoughts in slack for this, but I think we should do more research before making assumptions here.

@WillemKauf WillemKauf force-pushed the log_bucket_reconciler branch from 35390bb to f44a0cb Compare January 15, 2026 19:00
@vbotbuildovich
Copy link
Collaborator

Retry command for Build#79107

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/random_node_operations_smoke_test.py::RedpandaNodeOperationsSmokeTest.test_node_ops_smoke_test@{"cloud_storage_type":1,"mixed_versions":false}

// Bucket width for logarithmic bucketing based on retention lifetimes. Each
// bucket spans ~1.65x. With natural log scaling and a bucket_width = 0.5,
// we will have at most (when considering the int64_t::max() cap on
// retention properties) 87 buckets.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have few problems with this approach.

First, all source sets are reconciled sequentially. Every source set generates the staging file and then this file gets uploaded. So all latencies from all source sets are getting stacked on top of one another. If the reconciler is running non-stop we will get some base latency with one bucket. If the number of buckets doubles the throughput gets twice as low etc.

Second, the scheduler adopts the largest number of produced bytes. This means that the L1 object size will always undershoot. Essentially, we're stacking a bunch of upload latencies but the scheduler is not aware of that. It registers that certain number of bytes are uploaded during some time interval. We're losing the information about the upload size in most buckets.

Finally, there is an issue of sharding. The reconciler runs on every shard. So it will make as many L1 uploads per cycle as there are shards. Let's say we want to upload 64MiB objects. With one shard we will keep 64MiB of data in cache. If ingress is 1MiB/s we will be uploading ~one object per minute.

Now let's say we have 32 shards. Let's say we have enough partitions to use all of them and the data is distributed evenly across all 32 shards. If the ingress is the same we will have to accumulate data for about 30 min so every shard could have 64MiB. And that will require 2GiB of memory in the record batch cache (of course we will upload smaller objects more frequently to maintain RPO). Now we're multiplying this factor by 3 or 4. I understand that we will be able to reconcile all this data anyway. But we will also have to read all this data back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a point - This PR will actually do more to group together partitions than what is currently in place, since the current policy only groups together partitions of the same topic.

When we group by retention policy, we will still continue to group together partitions of the same topic, with the added benefit of grouping together partitions from other topics that have similar retention policies.

@vbotbuildovich
Copy link
Collaborator

CI test results

test results on build#79107
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
RedpandaNodeOperationsSmokeTest test_node_ops_smoke_test {"cloud_storage_type": 1, "mixed_versions": false} integration https://buildkite.com/redpanda/redpanda/builds/79107#019bc310-c631-4d71-bc3d-d96d9e53358b FAIL 0/1 https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=RedpandaNodeOperationsSmokeTest&test_method=test_node_ops_smoke_test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants