Skip to content

Commit 282bec7

Browse files
authored
Merge pull request #25498 from bharathv/v251x-oom-backport
[v25.1.x] [backport] dl/translation: fixes for OOM handling #25423
2 parents acc4dea + e5f8f50 commit 282bec7

26 files changed

+698
-211
lines changed

src/v/datalake/data_writer_interface.cc

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,42 @@ namespace datalake {
1515
std::ostream& operator<<(std::ostream& os, const writer_error& ev) {
1616
switch (ev) {
1717
case writer_error::ok:
18-
os << "Ok";
19-
break;
18+
return os << "Ok";
2019
case writer_error::parquet_conversion_error:
21-
os << "Parquet Conversion Error";
22-
break;
20+
return os << "Parquet Conversion Error";
2321
case writer_error::file_io_error:
24-
os << "File IO Error";
25-
break;
22+
return os << "File IO Error";
2623
case writer_error::no_data:
27-
os << "No data";
28-
break;
24+
return os << "No data";
2925
case writer_error::flush_error:
30-
os << "Flush failed";
31-
break;
26+
return os << "Flush failed";
27+
case writer_error::oom_error:
28+
return os << "Memory exhausted";
29+
case writer_error::time_limit_exceeded:
30+
return os << "Time limit exceeded";
31+
case writer_error::shutting_down:
32+
return os << "Shutting down";
33+
case writer_error::unknown_error:
34+
return os << "Unknown error";
3235
}
33-
return os;
3436
}
3537
std::string data_writer_error_category::message(int ev) const {
3638
return fmt::to_string(static_cast<writer_error>(ev));
3739
}
3840

41+
writer_error map_to_writer_error(reservation_error reservation_err) {
42+
switch (reservation_err) {
43+
case ok:
44+
return writer_error::ok;
45+
case shutting_down:
46+
return writer_error::shutting_down;
47+
case out_of_memory:
48+
return writer_error::oom_error;
49+
case time_quota_exceeded:
50+
return writer_error::time_limit_exceeded;
51+
case unknown:
52+
return writer_error::unknown_error;
53+
}
54+
}
55+
3956
} // namespace datalake

src/v/datalake/data_writer_interface.h

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ enum class writer_error {
2626
file_io_error,
2727
no_data,
2828
flush_error,
29+
oom_error,
30+
time_limit_exceeded,
31+
shutting_down,
32+
unknown_error,
2933
};
3034
std::ostream& operator<<(std::ostream&, const writer_error&);
3135

@@ -44,8 +48,17 @@ inline std::error_code make_error_code(writer_error e) noexcept {
4448
return {static_cast<int>(e), data_writer_error_category::error_category()};
4549
}
4650

51+
enum reservation_error {
52+
ok = 0,
53+
shutting_down = 1,
54+
out_of_memory = 2,
55+
time_quota_exceeded = 3,
56+
unknown = 4,
57+
};
58+
59+
writer_error map_to_writer_error(reservation_error);
4760
/**
48-
* Interface to track memory used by the parquet writer. The reservations are
61+
* Interface to track memory used by the parquet writers. The reservations are
4962
* held until the tracker object is alive or release is explicitly called.
5063
*/
5164
class writer_mem_tracker {
@@ -59,14 +72,15 @@ class writer_mem_tracker {
5972
virtual ~writer_mem_tracker() = default;
6073

6174
/**
62-
* Notify the mem tracker of current memory usage. The writer may
63-
* choose to compress/shrink memory upon which the tracker must be
64-
* notified of the current usage. May not be called concurrently with
65-
* other methods.
75+
* Reserves passed input bytes.
6676
*/
67-
virtual ss::future<>
68-
update_current_memory_usage(size_t current_bytes_usage, ss::abort_source&)
69-
= 0;
77+
virtual ss::future<reservation_error>
78+
reserve_bytes(size_t bytes, ss::abort_source&) noexcept = 0;
79+
80+
/**
81+
* Frees up passed input bytes.
82+
*/
83+
virtual ss::future<> free_bytes(size_t bytes, ss::abort_source&) = 0;
7084

7185
/**
7286
* Releases all the reservations. After this caller, the reserved bytes
@@ -173,7 +187,8 @@ class parquet_file_writer_factory {
173187

174188
virtual ss::future<
175189
result<std::unique_ptr<parquet_file_writer>, writer_error>>
176-
create_writer(const iceberg::struct_type& /* schema */) = 0;
190+
create_writer(const iceberg::struct_type& /* schema */, ss::abort_source&)
191+
= 0;
177192
};
178193

179194
} // namespace datalake

src/v/datalake/local_parquet_file_writer.cc

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#include "datalake/local_parquet_file_writer.h"
1212

13+
#include "base/units.h"
1314
#include "base/vlog.h"
1415
#include "datalake/logger.h"
1516

@@ -167,17 +168,40 @@ local_parquet_file_writer_factory::local_parquet_file_writer_factory(
167168
local_path base_directory,
168169
ss::sstring file_name_prefix,
169170
ss::shared_ptr<parquet_ostream_factory> writer_factory,
170-
std::unique_ptr<writer_mem_tracker> mem_tracker)
171+
writer_mem_tracker& mem_tracker)
171172
: _base_directory(std::move(base_directory))
172173
, _file_name_prefix(std::move(file_name_prefix))
173174
, _writer_factory(std::move(writer_factory))
174-
, _mem_tracker(std::move(mem_tracker)) {}
175+
, _mem_tracker(mem_tracker) {}
175176

176177
ss::future<result<std::unique_ptr<parquet_file_writer>, writer_error>>
177178
local_parquet_file_writer_factory::create_writer(
178-
const iceberg::struct_type& schema) {
179+
const iceberg::struct_type& schema, ss::abort_source& as) {
180+
// There is a per writer cost associated which includes stuff like
181+
// - local path string
182+
// - associated partition key
183+
// - schema
184+
// - stats tracked about the writer
185+
// - data structure overhead
186+
//
187+
// This limit is in place to avoid an explosion of writer instances,
188+
// example partition_by(offset) which creates a writer per offset.
189+
//
190+
// Additionally one other contributor per writer is the buffer used
191+
// in the output stream which defaults to 8_KiB, which is only released
192+
// on output stream close().
193+
//
194+
// TODO: This is just a conservative estimate to prevent pathological cases
195+
// of too many writers, needs empirical evaluation to determine the correct
196+
// sizing.
197+
static constexpr size_t WRITER_RESERVATION_OVERHEAD = 10_KiB;
198+
auto reservation_err = co_await _mem_tracker.reserve_bytes(
199+
WRITER_RESERVATION_OVERHEAD, as);
200+
if (reservation_err != reservation_error::ok) {
201+
co_return map_to_writer_error(reservation_err);
202+
}
179203
auto writer = std::make_unique<local_parquet_file_writer>(
180-
create_filename(), _writer_factory, *_mem_tracker);
204+
create_filename(), _writer_factory, _mem_tracker);
181205

182206
auto res = co_await writer->initialize(schema);
183207
if (res.has_error()) {

src/v/datalake/local_parquet_file_writer.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,18 +65,18 @@ class local_parquet_file_writer_factory : public parquet_file_writer_factory {
6565
local_path base_directory,
6666
ss::sstring file_name_prefix,
6767
ss::shared_ptr<parquet_ostream_factory>,
68-
std::unique_ptr<writer_mem_tracker>);
68+
writer_mem_tracker&);
6969

7070
ss::future<result<std::unique_ptr<parquet_file_writer>, writer_error>>
71-
create_writer(const iceberg::struct_type& schema) final;
71+
create_writer(const iceberg::struct_type& schema, ss::abort_source&) final;
7272

7373
private:
7474
local_path create_filename() const;
7575

7676
local_path _base_directory;
7777
ss::sstring _file_name_prefix;
7878
ss::shared_ptr<parquet_ostream_factory> _writer_factory;
79-
std::unique_ptr<writer_mem_tracker> _mem_tracker;
79+
writer_mem_tracker& _mem_tracker;
8080
};
8181

8282
} // namespace datalake

src/v/datalake/partitioning_writer.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ ss::future<writer_error> partitioning_writer::add_data(
5757
}
5858
auto writer_iter = writers_.find(pk);
5959
if (writer_iter == writers_.end()) {
60-
auto writer_res = co_await writer_factory_.create_writer(type_);
60+
auto writer_res = co_await writer_factory_.create_writer(type_, as);
6161
if (writer_res.has_error()) {
6262
vlog(
6363
datalake_log.error,

src/v/datalake/record_multiplexer.cc

Lines changed: 59 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,29 @@
2525

2626
#include <seastar/core/loop.hh>
2727

28+
namespace {
29+
30+
// Recoverable errors are the class of errors that donot leave the underlying
31+
// writers in a bad shape. Upon recoverable errors the translator may choose to
32+
// flush and continue as if nothing happened, so we preserve the state to
33+
// facilitate that.
34+
bool is_recoverable_error(datalake::writer_error err) {
35+
switch (err) {
36+
case datalake::writer_error::ok:
37+
case datalake::writer_error::oom_error:
38+
case datalake::writer_error::time_limit_exceeded:
39+
return true;
40+
case datalake::writer_error::parquet_conversion_error:
41+
case datalake::writer_error::file_io_error:
42+
case datalake::writer_error::no_data:
43+
case datalake::writer_error::flush_error:
44+
case datalake::writer_error::shutting_down:
45+
case datalake::writer_error::unknown_error:
46+
return false;
47+
}
48+
}
49+
}; // namespace
50+
2851
namespace datalake {
2952

3053
namespace {
@@ -72,17 +95,19 @@ record_multiplexer::record_multiplexer(
7295

7396
ss::future<> record_multiplexer::multiplex(
7497
model::record_batch_reader reader,
98+
kafka::offset start_offset,
7599
model::timeout_clock::time_point deadline,
76100
ss::abort_source& as) {
77101
co_await std::move(reader).consume(
78-
relaying_consumer{[this, &as](model::record_batch b) mutable {
79-
return do_multiplex(std::move(b), as);
80-
}},
102+
relaying_consumer{
103+
[this, start_offset, &as](model::record_batch b) mutable {
104+
return do_multiplex(std::move(b), start_offset, as);
105+
}},
81106
deadline);
82107
}
83108

84109
ss::future<ss::stop_iteration> record_multiplexer::do_multiplex(
85-
model::record_batch batch, ss::abort_source& as) {
110+
model::record_batch batch, kafka::offset start_offset, ss::abort_source& as) {
86111
if (batch.compressed()) {
87112
batch = co_await storage::internal::decompress_batch(std::move(batch));
88113
}
@@ -99,6 +124,9 @@ ss::future<ss::stop_iteration> record_multiplexer::do_multiplex(
99124
auto timestamp = model::timestamp{
100125
first_timestamp + record.timestamp_delta()};
101126
kafka::offset offset{batch.base_offset()() + record.offset_delta()};
127+
if (offset < start_offset) {
128+
continue;
129+
}
102130
int64_t estimated_size = (key ? key->size_bytes() : 0)
103131
+ (val ? val->size_bytes() : 0);
104132
chunked_vector<std::pair<std::optional<iobuf>, std::optional<iobuf>>>
@@ -259,37 +287,36 @@ ss::future<ss::stop_iteration> record_multiplexer::do_multiplex(
259287
writer_iter = iter;
260288
}
261289

262-
// TODO: we want to ensure we're using an offset translating reader so
263-
// that these will be Kafka offsets, not Raft offsets.
264-
if (!_result.has_value()) {
265-
_result = write_result{
266-
.start_offset = offset,
267-
};
268-
}
269-
270-
_result.value().last_offset = offset;
271-
272290
auto& writer = writer_iter->second;
273-
auto write_result = co_await writer->add_data(
291+
auto add_data_result = co_await writer->add_data(
274292
std::move(record_data_res.value()), estimated_size, as);
275293

276-
if (write_result != writer_error::ok) {
294+
if (add_data_result != writer_error::ok) {
277295
vlog(
278296
_log.warn,
279297
"Error adding data to writer for record {}: {}",
280298
offset,
281-
write_result);
282-
_error = write_result;
299+
add_data_result);
300+
_error = add_data_result;
283301
// If a write fails, the writer is left in an indeterminate state,
284302
// we cannot continue in this case.
285303
co_return ss::stop_iteration::yes;
286304
}
305+
306+
// TODO: we want to ensure we're using an offset translating reader so
307+
// that these will be Kafka offsets, not Raft offsets.
308+
if (!_result.has_value()) {
309+
_result = write_result{
310+
.start_offset = offset,
311+
};
312+
}
313+
_result.value().last_offset = offset;
287314
}
288315
co_return ss::stop_iteration::no;
289316
}
290317

291318
ss::future<writer_error> record_multiplexer::flush_writers() {
292-
if (_error) {
319+
if (_error && !is_recoverable_error(_error.value())) {
293320
co_return *_error;
294321
}
295322
auto result = co_await ss::coroutine::as_future(ss::max_concurrent_for_each(
@@ -304,37 +331,41 @@ ss::future<writer_error> record_multiplexer::flush_writers() {
304331

305332
ss::future<result<record_multiplexer::write_result, writer_error>>
306333
record_multiplexer::finish() && {
307-
if (!_result) {
308-
// no batches were processed.
309-
co_return writer_error::no_data;
310-
}
311334
auto writers = std::move(_writers);
312335
for (auto& [id, writer] : writers) {
313336
auto res = co_await std::move(*writer).finish();
314337
if (res.has_error()) {
315338
_error = res.error();
316339
continue;
317340
}
318-
auto& files = res.value();
319-
std::move(
320-
files.begin(), files.end(), std::back_inserter(_result->data_files));
341+
if (_result) {
342+
auto& files = res.value();
343+
std::move(
344+
files.begin(),
345+
files.end(),
346+
std::back_inserter(_result->data_files));
347+
}
321348
}
322349
if (_invalid_record_writer) {
323350
auto writer = std::move(_invalid_record_writer);
324351
auto res = co_await std::move(*writer).finish();
325352
if (res.has_error()) {
326353
_error = res.error();
327-
} else {
354+
} else if (_result) {
328355
auto& files = res.value();
329356
std::move(
330357
files.begin(),
331358
files.end(),
332359
std::back_inserter(_result->dlq_files));
333360
}
334361
}
335-
if (_error) {
362+
if (_error && !is_recoverable_error(_error.value())) {
336363
co_return *_error;
337364
}
365+
if (!_result) {
366+
// no batches were processed.
367+
co_return writer_error::no_data;
368+
}
338369
co_return std::move(*_result);
339370
}
340371

src/v/datalake/record_multiplexer.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,17 +77,23 @@ class record_multiplexer {
7777
/**
7878
* Multiplex the data from a reader into writers per schema and partition.
7979
* Can be called multiple times in succession before calling finish().
80+
*
81+
* start_offset controls minimum offset from which multiplexer can work. If
82+
* the previous translation stopped in the middle of a batch, we do not want
83+
* to multiplex already translated offsets in the batch, start_offset helps
84+
* solve that problem.
8085
*/
8186
ss::future<> multiplex(
8287
model::record_batch_reader reader,
88+
kafka::offset start_offset,
8389
model::timeout_clock::time_point deadline,
8490
ss::abort_source& as);
8591

8692
/**
8793
* Abortable multiplexing on a single batch. Visible for testing.
8894
*/
89-
ss::future<ss::stop_iteration>
90-
do_multiplex(model::record_batch batch, ss::abort_source&);
95+
ss::future<ss::stop_iteration> do_multiplex(
96+
model::record_batch batch, kafka::offset start_offset, ss::abort_source&);
9197

9298
/**
9399
* Forces a flush on all the underlying file writers resulting in freeing

0 commit comments

Comments
 (0)