Skip to content

dl/translation: fixes for OOM handling #25423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 21, 2025
Merged

Conversation

bharathv
Copy link
Contributor

@bharathv bharathv commented Mar 18, 2025

Main changes in the PR

  • Adds exceptional handling to all abortable paths in the file writer and propagate them all the way into translation context via appropriate error codes.
  • In abortable paths (eg: scheduler time limit exceeded/ OOM), the output is gracefully handled and flushed instead of discarding to make forward progress.
  • Handles OOM handling for pathological cases by adding reservation per writer.
  • Added tests that exercised these exceptional paths and found a few relevant bugs.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v24.3.x
  • v24.2.x
  • v24.1.x

Release Notes

  • none

@bharathv
Copy link
Contributor Author

/dt

@bharathv
Copy link
Contributor Author

/dt

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Mar 19, 2025

Retry command for Build#63324

please wait until all jobs are finished before running the slash command


/ci-repeat 1
tests/rptest/tests/datalake/translation_interruption_test.py::DatalakeTranslationInterruptionsTest.test_scheduler_time_slice_interruptions@{"catalog_type":"rest_hadoop","cloud_storage_type":1,"query_engine":"spark"}
tests/rptest/tests/datalake/custom_partitioning_test.py::DatalakeCustomPartitioningTest.test_many_partitions@{"catalog_type":"nessie","cloud_storage_type":1}

@bharathv
Copy link
Contributor Author

Could use a little bit more polish (hence the draft), but overall ready for review.

@bharathv
Copy link
Contributor Author

/dt

Comment on lines -140 to -165
ss::future<>
partition_translator::translate_when_notified(kafka::offset begin_offset) {
co_await _ready_to_translate.wait(
[this] { return _inflight_translation_state.has_value(); });

auto& as = _inflight_translation_state->as;
auto reader = co_await _data_source->make_log_reader(
begin_offset, datalake_priority(), as);
if (!reader) {
co_return;
}
vlog(_logger.trace, "starting translation from offset: {}", begin_offset);
ss::timer<scheduling::clock> cancellation_timer;
cancellation_timer.set_callback([&as] { as.request_abort(); });

auto translation_f
= _translation_ctx
->translate_now(
std::move(reader.value()), _inflight_translation_state->as)
.finally([this] { return _translation_ctx->flush(); });
cancellation_timer.arm(_inflight_translation_state->translate_for);
co_await std::move(translation_f).finally([&cancellation_timer] {
cancellation_timer.cancel();
});
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code, forgot to remove from a previous refactor, nothing to review here.

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Mar 19, 2025

CI test results

test results on build#63330
test_id test_kind job_url test_status passed
gtest_record_multiplexer_rpfixture.gtest_record_multiplexer_rpfixture unit https://buildkite.com/redpanda/redpanda/builds/63330#0195acb5-bb37-4604-84e4-fcefce82d24b FAIL 0/2
gtest_record_multiplexer_rpfixture.gtest_record_multiplexer_rpfixture unit https://buildkite.com/redpanda/redpanda/builds/63330#0195acb5-bb38-449b-a0d8-1daafa3c0c94 FAIL 0/2
rptest.tests.compaction_recovery_test.CompactionRecoveryUpgradeTest.test_index_recovery_after_upgrade ducktape https://buildkite.com/redpanda/redpanda/builds/63330#0195acf6-755f-4c6d-a8ad-d520ee4dde2c FLAKY 1/2
rptest.tests.compaction_recovery_test.CompactionRecoveryUpgradeTest.test_index_recovery_after_upgrade ducktape https://buildkite.com/redpanda/redpanda/builds/63330#0195acfa-b9fb-4e17-a08d-e57b46eb31a5 FLAKY 1/2
rptest.tests.data_migrations_api_test.DataMigrationsApiTest.test_migrated_topic_data_integrity.transfer_leadership=True.params=.cancellation.dir.in.stage.preparing.use_alias.True ducktape https://buildkite.com/redpanda/redpanda/builds/63330#0195acf6-7560-4316-a71e-8b469867b68d FLAKY 1/4
rptest.tests.datalake.cluster_restore_test.DatalakeClusterRestoreTest.test_slow_tiered_storage_dupe_records.cloud_storage_type=CloudStorageType.S3.catalog_type=CatalogType.REST_HADOOP ducktape https://buildkite.com/redpanda/redpanda/builds/63330#0195acf6-755e-48f3-bed1-4c9240d1955a FLAKY 1/2
rptest.tests.datalake.rest_catalog_connection_test.RestCatalogConnectionTest.test_redpanda_connection_to_rest_catalog.cloud_storage_type=CloudStorageType.S3 ducktape https://buildkite.com/redpanda/redpanda/builds/63330#0195acf6-755f-4c6d-a8ad-d520ee4dde2c FLAKY 1/2
rptest.tests.tiered_storage_pause_test.TestTieredStoragePause.test_safe_pause_resume.allow_gaps_topic_level=True.allow_gaps_cluster_level=True ducktape https://buildkite.com/redpanda/redpanda/builds/63330#0195acfa-b9fc-484c-a3c4-e3c497916a81 FLAKY 1/2
rptest.tests.timequery_test.TimeQueryKafkaTest.test_timequery_below_start_offset ducktape https://buildkite.com/redpanda/redpanda/builds/63330#0195acfa-b9fc-484c-a3c4-e3c497916a81 FLAKY 1/2
test results on build#63435
test_id test_kind job_url test_status passed
rptest.tests.compaction_recovery_test.CompactionRecoveryUpgradeTest.test_index_recovery_after_upgrade ducktape https://buildkite.com/redpanda/redpanda/builds/63435#0195b3f3-644a-433b-9eed-c720b0d36204 FLAKY 1/2
rptest.tests.data_migrations_api_test.DataMigrationsApiTest.test_migrated_topic_data_integrity.transfer_leadership=True.params=.cancellation.dir.in.stage.preparing.use_alias.False ducktape https://buildkite.com/redpanda/redpanda/builds/63435#0195b3f3-644a-433b-9eed-c720b0d36204 FLAKY 1/3
rptest.tests.datalake.datalake_upgrade_test.DatalakeUpgradeTest.test_upload_through_upgrade.cloud_storage_type=CloudStorageType.S3.query_engine=QueryEngineType.SPARK ducktape https://buildkite.com/redpanda/redpanda/builds/63435#0195b40e-3695-4cae-90c9-8244c38fc796 FLAKY 1/2
rptest.tests.nodes_decommissioning_test.NodesDecommissioningTest.test_recommissioning_node_finishes ducktape https://buildkite.com/redpanda/redpanda/builds/63435#0195b3f3-6449-4b65-bca5-d66fd91bad80 FLAKY 1/2
test results on build#63471
test_id test_kind job_url test_status passed
kafka_server_rpfixture.kafka_server_rpfixture unit https://buildkite.com/redpanda/redpanda/builds/63471#0195b626-0558-4940-90b2-1f3e36e00760 FLAKY 1/2
rptest.tests.compaction_recovery_test.CompactionRecoveryUpgradeTest.test_index_recovery_after_upgrade ducktape https://buildkite.com/redpanda/redpanda/builds/63471#0195b667-e54b-46f7-940f-e285681922ca FLAKY 1/2
rptest.tests.data_migrations_api_test.DataMigrationsApiTest.test_migrated_topic_data_integrity.transfer_leadership=False.params=.cancellation.dir.in.stage.preparing.use_alias.True ducktape https://buildkite.com/redpanda/redpanda/builds/63471#0195b667-e549-48ee-a74a-d1912646e7a5 FLAKY 1/2
rptest.tests.data_migrations_api_test.DataMigrationsApiTest.test_migrated_topic_data_integrity.transfer_leadership=True.params=.cancellation.dir.in.stage.preparing.use_alias.False ducktape https://buildkite.com/redpanda/redpanda/builds/63471#0195b667-e54b-46f7-940f-e285681922ca FLAKY 1/2
rptest.tests.datalake.cluster_restore_test.DatalakeClusterRestoreTest.test_slow_tiered_storage_dupe_records.cloud_storage_type=CloudStorageType.S3.catalog_type=CatalogType.NESSIE ducktape https://buildkite.com/redpanda/redpanda/builds/63471#0195b667-e54d-4e77-ac9a-0c93729cecfe FLAKY 1/2
rptest.tests.scaling_up_test.ScalingUpTest.test_scaling_up_with_recovered_topic ducktape https://buildkite.com/redpanda/redpanda/builds/63471#0195b66b-5510-44b4-bb49-9e425844f84e FLAKY 1/4

@bharathv
Copy link
Contributor Author

/dt

Copy link
Contributor

@andrwng andrwng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, high level this makes sense to me. I'm a little hesitant about this new recoverable category of errors because it means we need to think about getting there at any point in translation, but conceptually it feels like the right thing to do

}
return ss::now();
}

ss::future<checked<coordinator::translated_offset_range, translation_errc>>
finish(retry_chain_node& rcn, ss::abort_source& as) final {
_mem_tracker.release();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to only release the memory when exiting this function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. ideally its not needed because flush() is always called after every iteration, added a comment and moved it to end of this block.

case time_quota_exceeded:
return writer_error::time_limit_exceeded;
case unknown:
return writer_error::shutting_down;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this actually the closest thing it will map to? If not, I worry a bit about callers seeing shutting_down and then assuming the entire system is shutting down and e.g. stop all further translations or something, vs some generic flush_error or somesuch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats a fair point, remapped.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still seeing this as shutting_down?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eeks done! brain fade, changed in all other places except this mapping.

@@ -167,7 +167,7 @@ class partition_translator : public scheduling::translator {
ss::future<std::optional<translation_offsets>>
fetch_translation_offsets(retry_chain_node&);

ss::future<>
ss::future<bool>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a comment about what it returns? Or bool_class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

"Translation attempt failed: {}, discarding state to reset "
"translation",
ex);
"Translation attempt ran into OOM, result will be flushed "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just for peace of mind i wonder if it's worth calling this something other than OOM, like it hit the reservation limit or something. OOM sounds a little scary? (not sure 🤔)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, reworded.

Comment on lines 267 to 280
"immediately");
force_flush_after_iteration = true;
} catch (const translator_time_quota_exceeded_error&) {
vlog(
_logger.debug,
"Translation attempt exceeded scheduler time limit quota");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe add an explicitly comment that we're not keeping the exceptions because the underlying state is still safe to use. Could be mistaken for an oversight

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Copy link
Contributor Author

@bharathv bharathv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the review

case time_quota_exceeded:
return writer_error::time_limit_exceeded;
case unknown:
return writer_error::shutting_down;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats a fair point, remapped.

"Translation attempt failed: {}, discarding state to reset "
"translation",
ex);
"Translation attempt ran into OOM, result will be flushed "
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, reworded.

Comment on lines 267 to 280
"immediately");
force_flush_after_iteration = true;
} catch (const translator_time_quota_exceeded_error&) {
vlog(
_logger.debug,
"Translation attempt exceeded scheduler time limit quota");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -167,7 +167,7 @@ class partition_translator : public scheduling::translator {
ss::future<std::optional<translation_offsets>>
fetch_translation_offsets(retry_chain_node&);

ss::future<>
ss::future<bool>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@bharathv
Copy link
Contributor Author

Had to rebase to fix conflicts, removed some uneeded custom_partitioning test commits

@bharathv
Copy link
Contributor Author

/dt

@bharathv
Copy link
Contributor Author

/dt

Used to represent a translation task failure due to memory exhaustion.
Creation of a writer will involve a reservation and hence needs to be
abortable.
@bharathv bharathv marked this pull request as ready for review March 20, 2025 13:14
case time_quota_exceeded:
return writer_error::time_limit_exceeded;
case unknown:
return writer_error::shutting_down;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still seeing this as shutting_down?

@@ -195,7 +195,11 @@ local_parquet_file_writer_factory::create_writer(
// of too many writers, needs empirical evaluation to determine the correct
// sizing.
static constexpr size_t WRITER_RESERVATION_OVERHEAD = 10_KiB;
co_await _mem_tracker.reserve_bytes(WRITER_RESERVATION_OVERHEAD, as);
auto reservation_err = co_await _mem_tracker.reserve_bytes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if this is no longer a draft it'd be nice if each non-trivial commit had a meaningful commit message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, this hunk was misplaced, moved to previous commit.

@@ -158,7 +158,13 @@ class RecordMultiplexerTestBase
total_split_batches += subset.size();
auto reader = model::make_memory_record_batch_reader(
std::move(subset));
mux.multiplex(std::move(reader), model::no_timeout, as).get();
mux
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth adding a simple unit test? I guess the interruptions test caught this, but it seems like a surprising way to catch this bug

auto& writer = writer_iter->second;
auto write_result = co_await writer->add_data(
auto add_data_result = co_await writer->add_data(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: commit message?

}
return ss::now();
}

ss::future<checked<coordinator::translated_offset_range, translation_errc>>
finish(retry_chain_node& rcn, ss::abort_source& as) final {
_mem_tracker.release();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump?

bharathv and others added 8 commits March 20, 2025 17:37
Two main changes

- Centralizes mem tracking in translation_ctx. This memory tracker is
  shared by all parquet writers from the same translator to reserve
  memory. Having access to the memory tracker within translation_ctx
  makes the call to buffered_bytes() inexpensive (doesn't need to loop
  through every writer to compute buffered bytes as they are rolled up
  into the mem tracker)

- Changes API to do reserve(bytes, as) and free(bytes, as). The existing
  API turned out to be buggy in the presence of multiple writers as we
  overwrite current usage from the latest writer. This change is also a
  preparatory work for the next commit that wires up a fixed cost per
  writer.
Disambiguates various translation codepaths with their specific
exception types and corresponding writer_error codes. This makes most
data writer path noexcept and appropriate error codes are populated.

This behavior is used in the next commits to help make partition
translator forward progress if translator runs into these preemption
code paths.
This commit classifies the writer errors into two classes recoverable
and non recoverable.

Recoverable errors are the ones caused because the translation was
purposefully preempted by the scheduler due to time/memory violation.
Every other class of errors and considered unrecoverable.

Recoverable errors does not leave the writers in a bad shape, the
translator can just flush them and make progress if it wishes to, which
happens in the case of memory limit being exceeded.
Translation can be aborted in the middle of a batch which is then
flushed to the coordinator and the subsequent iteration should start
from the later offset. Since log reader only operates on batch
boundaries we need to layer this additional check in the multiplexer to
skip already translated offsets (by passing an additional param).
add_data may fail (for example writer exceeded memory budget) in which
case we should _not_ update result offsets. It should only be done
_after_ a write succeeds which guarantees its persistence.
Basically maps each record to its own partition. Nodes OOM in the multiplexor
@piyushredpanda piyushredpanda added this to the v25.1.1-rc3 milestone Mar 21, 2025
@bharathv bharathv enabled auto-merge March 21, 2025 03:21
@bharathv bharathv merged commit a310d53 into redpanda-data:dev Mar 21, 2025
20 checks passed
@piyushredpanda
Copy link
Contributor

/backport v25.1.x

@vbotbuildovich
Copy link
Collaborator

Oops! Something went wrong.

Workflow run logs.

@vbotbuildovich
Copy link
Collaborator

Failed to create a backport PR to v25.1.x branch. I tried:

git remote add upstream https://github.com/redpanda-data/redpanda.git
git fetch --all
git checkout -b backport-pr-25423-v25.1.x-627 remotes/upstream/v25.1.x
git cherry-pick -x ef1ba6e014 0bf9b1d2a9 bdd24b13f9 69106dc821 7d339a8896 d433adac1f 6dc1950bc7 5fcb56dd22 32d3548cae 92126bb711

Workflow run logs.

bharathv added a commit that referenced this pull request Mar 21, 2025
[v25.1.x] [backport] dl/translation: fixes for OOM handling #25423
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants