Skip to content

Commit e815d71

Browse files
committed
dl/translation: propagate abort errors all the way
1 parent 27a06c7 commit e815d71

File tree

9 files changed

+114
-21
lines changed

9 files changed

+114
-21
lines changed

src/v/datalake/data_writer_interface.cc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,29 @@ std::ostream& operator<<(std::ostream& os, const writer_error& ev) {
2626
return os << "Flush failed";
2727
case writer_error::oom_error:
2828
return os << "Memory exhausted";
29+
case writer_error::time_limit_exceeded:
30+
return os << "Time limit exceeded";
31+
case writer_error::shutting_down:
32+
return os << "Shutting down";
2933
}
3034
}
3135
std::string data_writer_error_category::message(int ev) const {
3236
return fmt::to_string(static_cast<writer_error>(ev));
3337
}
3438

39+
writer_error map_to_writer_error(reservation_error reservation_err) {
40+
switch (reservation_err) {
41+
case ok:
42+
return writer_error::ok;
43+
case shutting_down:
44+
return writer_error::shutting_down;
45+
case out_of_memory:
46+
return writer_error::oom_error;
47+
case time_quota_exceeded:
48+
return writer_error::oom_error;
49+
case unknown:
50+
return writer_error::shutting_down;
51+
}
52+
}
53+
3554
} // namespace datalake

src/v/datalake/data_writer_interface.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ enum class writer_error {
2727
no_data,
2828
flush_error,
2929
oom_error,
30+
time_limit_exceeded,
31+
shutting_down,
3032
};
3133
std::ostream& operator<<(std::ostream&, const writer_error&);
3234

@@ -45,6 +47,15 @@ inline std::error_code make_error_code(writer_error e) noexcept {
4547
return {static_cast<int>(e), data_writer_error_category::error_category()};
4648
}
4749

50+
enum reservation_error {
51+
ok = 0,
52+
shutting_down = 1,
53+
out_of_memory = 2,
54+
time_quota_exceeded = 3,
55+
unknown = 4,
56+
};
57+
58+
writer_error map_to_writer_error(reservation_error);
4859
/**
4960
* Interface to track memory used by the parquet writers. The reservations are
5061
* held until the tracker object is alive or release is explicitly called.
@@ -62,7 +73,8 @@ class writer_mem_tracker {
6273
/**
6374
* Reserves passed input bytes.
6475
*/
65-
virtual ss::future<> reserve_bytes(size_t bytes, ss::abort_source&) = 0;
76+
virtual ss::future<reservation_error>
77+
reserve_bytes(size_t bytes, ss::abort_source&) noexcept = 0;
6678

6779
/**
6880
* Frees up passed input bytes.

src/v/datalake/serde_parquet_writer.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ ss::future<writer_error> serde_parquet_writer::add_data_struct(
2222
auto stats = co_await _writer.write_row(std::move(group));
2323
auto new_buffered_bytes = stats.buffered_size;
2424
if (new_buffered_bytes > _buffered_bytes) {
25-
co_await _mem_tracker.reserve_bytes(
25+
auto reservation_result = co_await _mem_tracker.reserve_bytes(
2626
new_buffered_bytes - _buffered_bytes, as);
27+
if (reservation_result != reservation_error::ok) {
28+
co_return map_to_writer_error(reservation_result);
29+
}
2730
} else if (new_buffered_bytes < _buffered_bytes) {
2831
co_await _mem_tracker.free_bytes(
2932
_buffered_bytes - new_buffered_bytes, as);

src/v/datalake/tests/test_data_writer.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@
2323
namespace datalake {
2424
class noop_mem_tracker : public writer_mem_tracker {
2525
public:
26-
ss::future<> reserve_bytes(size_t, ss::abort_source&) override {
27-
return ss::make_ready_future<>();
26+
ss::future<reservation_error>
27+
reserve_bytes(size_t, ss::abort_source&) noexcept override {
28+
return ss::make_ready_future<reservation_error>(reservation_error::ok);
2829
}
2930
ss::future<> free_bytes(size_t, ss::abort_source&) override {
3031
return ss::make_ready_future<>();

src/v/datalake/translation/deps.cc

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ map_error_code(datalake::translation_task::errc errc) {
3838
return translation_errc::no_data;
3939
case datalake::translation_task::errc::oom_error:
4040
return translation_errc::oom_error;
41+
case datalake::translation_task::errc::time_limit_exceeded:
42+
return translation_errc::time_limit_exceeded;
43+
case datalake::translation_task::errc::shutting_down:
44+
return translation_errc::shutting_down;
4145
}
4246
}
4347
} // namespace
@@ -61,26 +65,39 @@ ss::future<cluster::errc> wait_stm_translated(
6165
}
6266
} // namespace
6367

64-
ss::future<> noop_mem_tracker::reserve_bytes(size_t, ss::abort_source&) {
65-
return ss::make_ready_future<>();
68+
ss::future<reservation_error>
69+
noop_mem_tracker::reserve_bytes(size_t, ss::abort_source&) noexcept {
70+
return ss::make_ready_future<reservation_error>(reservation_error::ok);
6671
}
6772
ss::future<> noop_mem_tracker::free_bytes(size_t, ss::abort_source&) {
6873
return ss::make_ready_future<>();
6974
}
7075
void noop_mem_tracker::release() {}
7176

72-
ss::future<>
73-
translator_mem_tracker::reserve_bytes(size_t bytes, ss::abort_source& as) {
77+
ss::future<reservation_error> translator_mem_tracker::reserve_bytes(
78+
size_t bytes, ss::abort_source& as) noexcept {
7479
_current_usage += bytes;
75-
while (_current_usage > _reservations.count()) {
76-
// reserve any deficit
77-
auto reservation = co_await _reservations_tracker.reserve_memory(as);
78-
if (_reservations.count()) {
79-
_reservations.adopt(std::move(reservation));
80-
} else {
81-
_reservations = std::move(reservation);
80+
try {
81+
while (_current_usage > _reservations.count()) {
82+
// reserve any deficit
83+
auto reservation = co_await _reservations_tracker.reserve_memory(
84+
as);
85+
if (_reservations.count()) {
86+
_reservations.adopt(std::move(reservation));
87+
} else {
88+
_reservations = std::move(reservation);
89+
}
8290
}
83-
}
91+
} catch (const translator_out_of_memory_error&) {
92+
co_return reservation_error::out_of_memory;
93+
} catch (const translator_shutdown_error&) {
94+
co_return reservation_error::shutting_down;
95+
} catch (const translator_time_quota_exceeded_error&) {
96+
co_return reservation_error::time_quota_exceeded;
97+
} catch (...) {
98+
co_return reservation_error::unknown;
99+
}
100+
co_return reservation_error::ok;
84101
}
85102

86103
ss::future<>
@@ -377,6 +394,10 @@ std::ostream& operator<<(std::ostream& o, translation_errc ec) {
377394
return o << "translation_errc::discard_error";
378395
case oom_error:
379396
return o << "translation_errc::oom_error";
397+
case time_limit_exceeded:
398+
return o << "translation_errc::time_limit_exceeded";
399+
case shutting_down:
400+
return o << "translation_errc::shutting_down";
380401
}
381402
}
382403

src/v/datalake/translation/deps.h

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,28 @@
2727

2828
namespace datalake::translation {
2929

30+
class translator_out_of_memory_error final : public std::runtime_error {
31+
public:
32+
explicit translator_out_of_memory_error()
33+
: std::runtime_error("translator_out_of_memory") {}
34+
};
35+
36+
class translator_shutdown_error final : public std::runtime_error {
37+
public:
38+
explicit translator_shutdown_error()
39+
: std::runtime_error("translator_shutdown") {}
40+
};
41+
42+
class translator_time_quota_exceeded_error final : public std::runtime_error {
43+
public:
44+
explicit translator_time_quota_exceeded_error()
45+
: std::runtime_error("translator_time_quota_exceeded") {}
46+
};
47+
3048
class noop_mem_tracker : public writer_mem_tracker {
3149
public:
32-
ss::future<> reserve_bytes(size_t, ss::abort_source&) override;
50+
ss::future<reservation_error>
51+
reserve_bytes(size_t, ss::abort_source&) noexcept override;
3352
ss::future<> free_bytes(size_t, ss::abort_source&) override;
3453
void release() override;
3554
};
@@ -43,7 +62,8 @@ class translator_mem_tracker : public writer_mem_tracker {
4362
scheduling::reservations_tracker& scheduling_reservations)
4463
: _reservations_tracker(scheduling_reservations) {}
4564

46-
ss::future<> reserve_bytes(size_t, ss::abort_source&) override;
65+
ss::future<reservation_error>
66+
reserve_bytes(size_t, ss::abort_source&) noexcept override;
4767
ss::future<> free_bytes(size_t, ss::abort_source&) override;
4868
void release() override;
4969

@@ -161,6 +181,8 @@ enum translation_errc {
161181
flush_error,
162182
discard_error,
163183
oom_error,
184+
time_limit_exceeded,
185+
shutting_down,
164186
};
165187

166188
std::ostream& operator<<(std::ostream&, translation_errc);

src/v/datalake/translation/partition_translator.cc

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ partition_translator::translate_when_notified(kafka::offset begin_offset) {
150150
}
151151
vlog(_logger.trace, "starting translation from offset: {}", begin_offset);
152152
ss::timer<scheduling::clock> cancellation_timer;
153-
cancellation_timer.set_callback([&as] { as.request_abort(); });
153+
cancellation_timer.set_callback(
154+
[&as] { as.request_abort_ex(translator_time_quota_exceeded_error{}); });
154155

155156
auto translation_f
156157
= _translation_ctx
@@ -472,7 +473,8 @@ ss::future<> partition_translator::close() noexcept {
472473
_as.request_abort();
473474
_ready_to_translate.broken();
474475
if (_inflight_translation_state) {
475-
_inflight_translation_state->as.request_abort();
476+
_inflight_translation_state->as.request_abort_ex(
477+
translator_shutdown_error{});
476478
}
477479
_data_source->close();
478480
co_await _gate.close();
@@ -511,6 +513,9 @@ void partition_translator::stop_translation() {
511513
if (_gate.is_closed() || !_inflight_translation_state) {
512514
return;
513515
}
514-
_inflight_translation_state->as.request_abort();
516+
// Currently only preempted on OOM error, if the policy changes
517+
// to preempt on other errors, should be updated accordingly.
518+
_inflight_translation_state->as.request_abort_ex(
519+
translator_out_of_memory_error{});
515520
}
516521
} // namespace datalake::translation

src/v/datalake/translation_task.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ translation_task::errc map_error_code(writer_error errc) {
4444
return translation_task::errc::flush_error;
4545
case writer_error::oom_error:
4646
return translation_task::errc::oom_error;
47+
case writer_error::time_limit_exceeded:
48+
return translation_task::errc::time_limit_exceeded;
49+
case writer_error::shutting_down:
50+
return translation_task::errc::shutting_down;
4751
}
4852
}
4953

@@ -359,6 +363,10 @@ std::ostream& operator<<(std::ostream& o, translation_task::errc ec) {
359363
return o << "no data to translate";
360364
case translation_task::errc::oom_error:
361365
return o << "memory exhausted";
366+
case translation_task::errc::time_limit_exceeded:
367+
return o << "time limit exceeded";
368+
case translation_task::errc::shutting_down:
369+
return o << "shutting down";
362370
}
363371
}
364372
} // namespace datalake

src/v/datalake/translation_task.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ class translation_task {
4646
flush_error,
4747
no_data,
4848
oom_error,
49+
time_limit_exceeded,
50+
shutting_down,
4951
};
5052

5153
using custom_partitioning_enabled

0 commit comments

Comments
 (0)