Skip to content

Commit 736a282

Browse files
committed
dl/translator: graceful handling of OOM/scheduling timeouts
1 parent e815d71 commit 736a282

File tree

7 files changed

+67
-42
lines changed

7 files changed

+67
-42
lines changed

src/v/datalake/data_writer_interface.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ writer_error map_to_writer_error(reservation_error reservation_err) {
4545
case out_of_memory:
4646
return writer_error::oom_error;
4747
case time_quota_exceeded:
48-
return writer_error::oom_error;
48+
return writer_error::time_limit_exceeded;
4949
case unknown:
5050
return writer_error::shutting_down;
5151
}

src/v/datalake/local_parquet_file_writer.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,11 @@ local_parquet_file_writer_factory::create_writer(
186186
// in the output stream which defaults to 8_KiB, which is only released
187187
// on output stream close().
188188
static constexpr size_t WRITER_RESERVATION_OVERHEAD = 10_KiB;
189-
co_await _mem_tracker.reserve_bytes(WRITER_RESERVATION_OVERHEAD, as);
189+
auto reservation_err = co_await _mem_tracker.reserve_bytes(
190+
WRITER_RESERVATION_OVERHEAD, as);
191+
if (reservation_err != reservation_error::ok) {
192+
co_return map_to_writer_error(reservation_err);
193+
}
190194
auto writer = std::make_unique<local_parquet_file_writer>(
191195
create_filename(), _writer_factory, _mem_tracker);
192196

src/v/datalake/record_multiplexer.cc

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,28 @@
2525

2626
#include <seastar/core/loop.hh>
2727

28+
namespace {
29+
30+
// Recoverable errors are the class of errors that donot leave the underlying
31+
// writers in a bad shape. Upon recoverable errors the translator may choose to
32+
// flush and continue as if nothing happened, so we preserve the state to
33+
// facilitate that.
34+
bool is_recoverable_error(datalake::writer_error err) {
35+
switch (err) {
36+
case datalake::writer_error::ok:
37+
case datalake::writer_error::oom_error:
38+
case datalake::writer_error::time_limit_exceeded:
39+
return true;
40+
case datalake::writer_error::parquet_conversion_error:
41+
case datalake::writer_error::file_io_error:
42+
case datalake::writer_error::no_data:
43+
case datalake::writer_error::flush_error:
44+
case datalake::writer_error::shutting_down:
45+
return false;
46+
}
47+
}
48+
}; // namespace
49+
2850
namespace datalake {
2951

3052
namespace {
@@ -289,7 +311,7 @@ ss::future<ss::stop_iteration> record_multiplexer::do_multiplex(
289311
}
290312

291313
ss::future<writer_error> record_multiplexer::flush_writers() {
292-
if (_error) {
314+
if (_error && !is_recoverable_error(_error.value())) {
293315
co_return *_error;
294316
}
295317
auto result = co_await ss::coroutine::as_future(ss::max_concurrent_for_each(
@@ -332,7 +354,7 @@ record_multiplexer::finish() && {
332354
std::back_inserter(_result->dlq_files));
333355
}
334356
}
335-
if (_error) {
357+
if (_error && !is_recoverable_error(_error.value())) {
336358
co_return *_error;
337359
}
338360
co_return std::move(*_result);

src/v/datalake/translation/deps.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,13 +540,15 @@ class partition_translation_context : public translation_context {
540540
if (!_in_progress_translation) {
541541
co_return translation_errc::no_data;
542542
}
543+
vlog(datalake_log.debug, "[{}] finishing translation", _ntp);
543544
if (_discard_translated_state) {
544545
co_await discard().discard_result();
545546
co_return translation_errc::discard_error;
546547
}
547548
auto task = std::exchange(_in_progress_translation, std::nullopt);
548549
auto result = co_await std::move(task.value())
549550
.finish(_cp_enabled, _upload_path_prefix, rcn, as);
551+
550552
if (result.has_error()) {
551553
co_return map_error_code(result.error());
552554
}

src/v/datalake/translation/partition_translator.cc

Lines changed: 26 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -137,33 +137,6 @@ partition_translator::checkpoint_translation_result(
137137
});
138138
}
139139

140-
ss::future<>
141-
partition_translator::translate_when_notified(kafka::offset begin_offset) {
142-
co_await _ready_to_translate.wait(
143-
[this] { return _inflight_translation_state.has_value(); });
144-
145-
auto& as = _inflight_translation_state->as;
146-
auto reader = co_await _data_source->make_log_reader(
147-
begin_offset, datalake_priority(), as);
148-
if (!reader) {
149-
co_return;
150-
}
151-
vlog(_logger.trace, "starting translation from offset: {}", begin_offset);
152-
ss::timer<scheduling::clock> cancellation_timer;
153-
cancellation_timer.set_callback(
154-
[&as] { as.request_abort_ex(translator_time_quota_exceeded_error{}); });
155-
156-
auto translation_f
157-
= _translation_ctx
158-
->translate_now(
159-
std::move(reader.value()), _inflight_translation_state->as)
160-
.finally([this] { return _translation_ctx->flush(); });
161-
cancellation_timer.arm(_inflight_translation_state->translate_for);
162-
co_await std::move(translation_f).finally([&cancellation_timer] {
163-
cancellation_timer.cancel();
164-
});
165-
}
166-
167140
bool partition_translator::should_finish_inflight_translation() const {
168141
auto bytes_flushed_pending_upload = _translation_ctx->flushed_bytes();
169142
auto lag_window_ended = _lag_tracking->should_finish_inflight_translation();
@@ -250,7 +223,7 @@ partition_translator::fetch_translation_offsets(retry_chain_node& rcn) {
250223
co_return offsets;
251224
}
252225

253-
ss::future<> partition_translator::run_one_translation_iteration(
226+
ss::future<bool> partition_translator::run_one_translation_iteration(
254227
kafka::offset begin_offset) {
255228
_lag_tracking->notify_new_data_for_translation(begin_offset);
256229
// Notify the scheduler that there is some data to translate
@@ -259,14 +232,16 @@ ss::future<> partition_translator::run_one_translation_iteration(
259232
// time slice (i.e. scheduled in), then translate until the time
260233
// slice expires or we run out of data
261234
std::exception_ptr ex = nullptr;
235+
bool should_discard_result = false;
236+
bool force_flush_after_iteration = false;
262237
try {
263238
co_await _ready_to_translate.wait(
264239
[this] { return _inflight_translation_state.has_value(); });
265240
auto& as = _inflight_translation_state->as;
266241
auto reader = co_await _data_source->make_log_reader(
267242
begin_offset, datalake_priority(), as);
268243
if (!reader) {
269-
co_return;
244+
co_return force_flush_after_iteration;
270245
}
271246
vlog(
272247
_logger.trace, "starting translation from offset: {}", begin_offset);
@@ -282,13 +257,22 @@ ss::future<> partition_translator::run_one_translation_iteration(
282257
co_await std::move(translation_f).finally([&cancellation_timer] {
283258
cancellation_timer.cancel();
284259
});
285-
} catch (...) {
286-
ex = std::current_exception();
260+
_inflight_translation_state->as.check();
261+
} catch (const translator_out_of_memory_error&) {
287262
vlog(
288263
_logger.warn,
289-
"Translation attempt failed: {}, discarding state to reset "
290-
"translation",
291-
ex);
264+
"Translation attempt ran into OOM, result will be flushed "
265+
"immediately");
266+
force_flush_after_iteration = true;
267+
} catch (const translator_time_quota_exceeded_error&) {
268+
vlog(
269+
_logger.debug,
270+
"Translation attempt exceeded scheduler time limit quota");
271+
} catch (...) {
272+
// unknown exception or shutdown exception.
273+
should_discard_result = true;
274+
ex = std::current_exception();
275+
vlog(_logger.warn, "Translation attempt ran into an exception: {}", ex);
292276
}
293277
// inflight_translation_state tracks a single scheduled chunk of
294278
// work, so we reset it to nullopt for the next time we're scheduled
@@ -297,10 +281,13 @@ ss::future<> partition_translator::run_one_translation_iteration(
297281
// Let the scheduler know we are done
298282
_scheduler->notify_done(id());
299283

300-
if (ex) {
284+
if (should_discard_result) {
301285
co_await _translation_ctx->discard();
286+
}
287+
if (ex) {
302288
std::rethrow_exception(ex);
303289
}
290+
co_return force_flush_after_iteration;
304291
}
305292

306293
ss::future<bool> partition_translator::finish_inflight_translation(
@@ -404,6 +391,7 @@ ss::future<> partition_translator::translate_until_stopped() {
404391
if (!offsets) {
405392
continue;
406393
}
394+
auto finish_immediately = false;
407395
if (offsets->next_translation_begin_offset) {
408396
// new data is available to translate
409397
auto translate_f = co_await ss::coroutine::as_future(
@@ -413,8 +401,9 @@ ss::future<> partition_translator::translate_until_stopped() {
413401
translate_f.ignore_ready_future();
414402
continue;
415403
}
404+
finish_immediately = translate_f.get();
416405
}
417-
if (should_finish_inflight_translation()) {
406+
if (finish_immediately || should_finish_inflight_translation()) {
418407
auto success = co_await finish_inflight_translation(
419408
offsets->coordinator_lto, rcn);
420409
if (!success) {
@@ -513,6 +502,7 @@ void partition_translator::stop_translation() {
513502
if (_gate.is_closed() || !_inflight_translation_state) {
514503
return;
515504
}
505+
516506
// Currently only preempted on OOM error, if the policy changes
517507
// to preempt on other errors, should be updated accordingly.
518508
_inflight_translation_state->as.request_abort_ex(

src/v/datalake/translation/partition_translator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ class partition_translator : public scheduling::translator {
167167
ss::future<std::optional<translation_offsets>>
168168
fetch_translation_offsets(retry_chain_node&);
169169

170-
ss::future<>
170+
ss::future<bool>
171171
run_one_translation_iteration(kafka::offset translation_begin_offset);
172172

173173
/**

src/v/datalake/translation/scheduling.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,14 @@ class default_reservations_tracker : public reservations_tracker {
6060
co_return std::move(opt_units.value());
6161
}
6262
_notifier.notify_memory_exhausted();
63-
co_return co_await ss::get_units(_available_memory, nr, as);
63+
try {
64+
co_return co_await ss::get_units(_available_memory, nr, as);
65+
} catch (...) {
66+
// propagate the exception in abort source, if any
67+
as.check();
68+
// else, rethrow generic exception
69+
std::rethrow_exception(std::current_exception());
70+
}
6471
}
6572

6673
size_t allocated_memory() const override {

0 commit comments

Comments
 (0)