-
Notifications
You must be signed in to change notification settings - Fork 633
dl/translation: fixes for OOM handling #25423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
/dt |
/dt |
Retry command for Build#63324please wait until all jobs are finished before running the slash command
|
Could use a little bit more polish (hence the draft), but overall ready for review. |
/dt |
ss::future<> | ||
partition_translator::translate_when_notified(kafka::offset begin_offset) { | ||
co_await _ready_to_translate.wait( | ||
[this] { return _inflight_translation_state.has_value(); }); | ||
|
||
auto& as = _inflight_translation_state->as; | ||
auto reader = co_await _data_source->make_log_reader( | ||
begin_offset, datalake_priority(), as); | ||
if (!reader) { | ||
co_return; | ||
} | ||
vlog(_logger.trace, "starting translation from offset: {}", begin_offset); | ||
ss::timer<scheduling::clock> cancellation_timer; | ||
cancellation_timer.set_callback([&as] { as.request_abort(); }); | ||
|
||
auto translation_f | ||
= _translation_ctx | ||
->translate_now( | ||
std::move(reader.value()), _inflight_translation_state->as) | ||
.finally([this] { return _translation_ctx->flush(); }); | ||
cancellation_timer.arm(_inflight_translation_state->translate_for); | ||
co_await std::move(translation_f).finally([&cancellation_timer] { | ||
cancellation_timer.cancel(); | ||
}); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dead code, forgot to remove from a previous refactor, nothing to review here.
CI test resultstest results on build#63330
test results on build#63435
test results on build#63471
|
/dt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, high level this makes sense to me. I'm a little hesitant about this new recoverable category of errors because it means we need to think about getting there at any point in translation, but conceptually it feels like the right thing to do
src/v/datalake/translation/deps.cc
Outdated
} | ||
return ss::now(); | ||
} | ||
|
||
ss::future<checked<coordinator::translated_offset_range, translation_errc>> | ||
finish(retry_chain_node& rcn, ss::abort_source& as) final { | ||
_mem_tracker.release(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to only release the memory when exiting this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bump?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. ideally its not needed because flush() is always called after every iteration, added a comment and moved it to end of this block.
case time_quota_exceeded: | ||
return writer_error::time_limit_exceeded; | ||
case unknown: | ||
return writer_error::shutting_down; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is this actually the closest thing it will map to? If not, I worry a bit about callers seeing shutting_down and then assuming the entire system is shutting down and e.g. stop all further translations or something, vs some generic flush_error or somesuch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thats a fair point, remapped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still seeing this as shutting_down?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eeks done! brain fade, changed in all other places except this mapping.
@@ -167,7 +167,7 @@ class partition_translator : public scheduling::translator { | |||
ss::future<std::optional<translation_offsets>> | |||
fetch_translation_offsets(retry_chain_node&); | |||
|
|||
ss::future<> | |||
ss::future<bool> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a comment about what it returns? Or bool_class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
"Translation attempt failed: {}, discarding state to reset " | ||
"translation", | ||
ex); | ||
"Translation attempt ran into OOM, result will be flushed " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just for peace of mind i wonder if it's worth calling this something other than OOM, like it hit the reservation limit or something. OOM sounds a little scary? (not sure 🤔)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, reworded.
"immediately"); | ||
force_flush_after_iteration = true; | ||
} catch (const translator_time_quota_exceeded_error&) { | ||
vlog( | ||
_logger.debug, | ||
"Translation attempt exceeded scheduler time limit quota"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe add an explicitly comment that we're not keeping the exceptions because the underlying state is still safe to use. Could be mistaken for an oversight
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the review
case time_quota_exceeded: | ||
return writer_error::time_limit_exceeded; | ||
case unknown: | ||
return writer_error::shutting_down; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thats a fair point, remapped.
"Translation attempt failed: {}, discarding state to reset " | ||
"translation", | ||
ex); | ||
"Translation attempt ran into OOM, result will be flushed " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, reworded.
"immediately"); | ||
force_flush_after_iteration = true; | ||
} catch (const translator_time_quota_exceeded_error&) { | ||
vlog( | ||
_logger.debug, | ||
"Translation attempt exceeded scheduler time limit quota"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -167,7 +167,7 @@ class partition_translator : public scheduling::translator { | |||
ss::future<std::optional<translation_offsets>> | |||
fetch_translation_offsets(retry_chain_node&); | |||
|
|||
ss::future<> | |||
ss::future<bool> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
Had to rebase to fix conflicts, removed some uneeded custom_partitioning test commits |
/dt |
/dt |
Used to represent a translation task failure due to memory exhaustion.
Creation of a writer will involve a reservation and hence needs to be abortable.
case time_quota_exceeded: | ||
return writer_error::time_limit_exceeded; | ||
case unknown: | ||
return writer_error::shutting_down; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still seeing this as shutting_down?
@@ -195,7 +195,11 @@ local_parquet_file_writer_factory::create_writer( | |||
// of too many writers, needs empirical evaluation to determine the correct | |||
// sizing. | |||
static constexpr size_t WRITER_RESERVATION_OVERHEAD = 10_KiB; | |||
co_await _mem_tracker.reserve_bytes(WRITER_RESERVATION_OVERHEAD, as); | |||
auto reservation_err = co_await _mem_tracker.reserve_bytes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if this is no longer a draft it'd be nice if each non-trivial commit had a meaningful commit message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, this hunk was misplaced, moved to previous commit.
@@ -158,7 +158,13 @@ class RecordMultiplexerTestBase | |||
total_split_batches += subset.size(); | |||
auto reader = model::make_memory_record_batch_reader( | |||
std::move(subset)); | |||
mux.multiplex(std::move(reader), model::no_timeout, as).get(); | |||
mux |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth adding a simple unit test? I guess the interruptions test caught this, but it seems like a surprising way to catch this bug
auto& writer = writer_iter->second; | ||
auto write_result = co_await writer->add_data( | ||
auto add_data_result = co_await writer->add_data( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: commit message?
src/v/datalake/translation/deps.cc
Outdated
} | ||
return ss::now(); | ||
} | ||
|
||
ss::future<checked<coordinator::translated_offset_range, translation_errc>> | ||
finish(retry_chain_node& rcn, ss::abort_source& as) final { | ||
_mem_tracker.release(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bump?
Two main changes - Centralizes mem tracking in translation_ctx. This memory tracker is shared by all parquet writers from the same translator to reserve memory. Having access to the memory tracker within translation_ctx makes the call to buffered_bytes() inexpensive (doesn't need to loop through every writer to compute buffered bytes as they are rolled up into the mem tracker) - Changes API to do reserve(bytes, as) and free(bytes, as). The existing API turned out to be buggy in the presence of multiple writers as we overwrite current usage from the latest writer. This change is also a preparatory work for the next commit that wires up a fixed cost per writer.
Disambiguates various translation codepaths with their specific exception types and corresponding writer_error codes. This makes most data writer path noexcept and appropriate error codes are populated. This behavior is used in the next commits to help make partition translator forward progress if translator runs into these preemption code paths.
This commit classifies the writer errors into two classes recoverable and non recoverable. Recoverable errors are the ones caused because the translation was purposefully preempted by the scheduler due to time/memory violation. Every other class of errors and considered unrecoverable. Recoverable errors does not leave the writers in a bad shape, the translator can just flush them and make progress if it wishes to, which happens in the case of memory limit being exceeded.
Translation can be aborted in the middle of a batch which is then flushed to the coordinator and the subsequent iteration should start from the later offset. Since log reader only operates on batch boundaries we need to layer this additional check in the multiplexer to skip already translated offsets (by passing an additional param).
add_data may fail (for example writer exceeded memory budget) in which case we should _not_ update result offsets. It should only be done _after_ a write succeeds which guarantees its persistence.
Basically maps each record to its own partition. Nodes OOM in the multiplexor
/backport v25.1.x |
Oops! Something went wrong. |
Failed to create a backport PR to v25.1.x branch. I tried:
|
[v25.1.x] [backport] dl/translation: fixes for OOM handling #25423
Main changes in the PR
Backports Required
Release Notes