Skip to content

Commit 28465b7

Browse files
committed
dl/translation: refactor translation mem tracking
Two main changes - Centralizes mem tracking in translation_ctx. This memory tracker is shared by all parquet writers from the same translator to reserve memory. Having access to the memory tracker within translation_ctx makes the call to buffered_bytes() inexpensive (doesn't need to loop through every writer to compute buffered bytes as they are rolled up into the mem tracker) - Changes API to do reserve(bytes, as) and free(bytes, as). The existing API turned out to be buggy in the presence of multiple writers as we overwrite current usage from the latest writer. This change is also a preparatory work for the next commit that wires up a fixed cost per writer.
1 parent 87289b2 commit 28465b7

File tree

9 files changed

+84
-68
lines changed

9 files changed

+84
-68
lines changed

src/v/datalake/data_writer_interface.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ inline std::error_code make_error_code(writer_error e) noexcept {
4646
}
4747

4848
/**
49-
* Interface to track memory used by the parquet writer. The reservations are
49+
* Interface to track memory used by the parquet writers. The reservations are
5050
* held until the tracker object is alive or release is explicitly called.
5151
*/
5252
class writer_mem_tracker {
@@ -60,14 +60,14 @@ class writer_mem_tracker {
6060
virtual ~writer_mem_tracker() = default;
6161

6262
/**
63-
* Notify the mem tracker of current memory usage. The writer may
64-
* choose to compress/shrink memory upon which the tracker must be
65-
* notified of the current usage. May not be called concurrently with
66-
* other methods.
63+
* Reserves passed input bytes.
6764
*/
68-
virtual ss::future<>
69-
update_current_memory_usage(size_t current_bytes_usage, ss::abort_source&)
70-
= 0;
65+
virtual ss::future<> reserve_bytes(size_t bytes, ss::abort_source&) = 0;
66+
67+
/**
68+
* Frees up passed input bytes.
69+
*/
70+
virtual ss::future<> free_bytes(size_t bytes, ss::abort_source&) = 0;
7171

7272
/**
7373
* Releases all the reservations. After this caller, the reserved bytes

src/v/datalake/local_parquet_file_writer.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,17 +167,17 @@ local_parquet_file_writer_factory::local_parquet_file_writer_factory(
167167
local_path base_directory,
168168
ss::sstring file_name_prefix,
169169
ss::shared_ptr<parquet_ostream_factory> writer_factory,
170-
std::unique_ptr<writer_mem_tracker> mem_tracker)
170+
writer_mem_tracker& mem_tracker)
171171
: _base_directory(std::move(base_directory))
172172
, _file_name_prefix(std::move(file_name_prefix))
173173
, _writer_factory(std::move(writer_factory))
174-
, _mem_tracker(std::move(mem_tracker)) {}
174+
, _mem_tracker(mem_tracker) {}
175175

176176
ss::future<result<std::unique_ptr<parquet_file_writer>, writer_error>>
177177
local_parquet_file_writer_factory::create_writer(
178178
const iceberg::struct_type& schema, ss::abort_source&) {
179179
auto writer = std::make_unique<local_parquet_file_writer>(
180-
create_filename(), _writer_factory, *_mem_tracker);
180+
create_filename(), _writer_factory, _mem_tracker);
181181

182182
auto res = co_await writer->initialize(schema);
183183
if (res.has_error()) {

src/v/datalake/local_parquet_file_writer.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class local_parquet_file_writer_factory : public parquet_file_writer_factory {
6565
local_path base_directory,
6666
ss::sstring file_name_prefix,
6767
ss::shared_ptr<parquet_ostream_factory>,
68-
std::unique_ptr<writer_mem_tracker>);
68+
writer_mem_tracker&);
6969

7070
ss::future<result<std::unique_ptr<parquet_file_writer>, writer_error>>
7171
create_writer(const iceberg::struct_type& schema, ss::abort_source&) final;
@@ -76,7 +76,7 @@ class local_parquet_file_writer_factory : public parquet_file_writer_factory {
7676
local_path _base_directory;
7777
ss::sstring _file_name_prefix;
7878
ss::shared_ptr<parquet_ostream_factory> _writer_factory;
79-
std::unique_ptr<writer_mem_tracker> _mem_tracker;
79+
writer_mem_tracker& _mem_tracker;
8080
};
8181

8282
} // namespace datalake

src/v/datalake/serde_parquet_writer.cc

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,16 @@ ss::future<writer_error> serde_parquet_writer::add_data_struct(
2020
std::move(conversion_result.value()));
2121
try {
2222
auto stats = co_await _writer.write_row(std::move(group));
23-
_buffered_bytes = stats.buffered_size;
23+
auto new_buffered_bytes = stats.buffered_size;
24+
if (new_buffered_bytes > _buffered_bytes) {
25+
co_await _mem_tracker.reserve_bytes(
26+
new_buffered_bytes - _buffered_bytes, as);
27+
} else if (new_buffered_bytes < _buffered_bytes) {
28+
co_await _mem_tracker.free_bytes(
29+
_buffered_bytes - new_buffered_bytes, as);
30+
}
31+
_buffered_bytes = new_buffered_bytes;
2432
_flushed_bytes = stats.flushed_size;
25-
co_await _mem_tracker.update_current_memory_usage(_buffered_bytes, as);
2633
} catch (...) {
2734
vlog(
2835
datalake_log.warn,
@@ -45,12 +52,10 @@ ss::future<> serde_parquet_writer::flush() {
4552
_buffered_bytes == 0,
4653
"Memory buffered in the writer after flush: {}",
4754
_buffered_bytes);
48-
_mem_tracker.release();
4955
}
5056

5157
ss::future<writer_error> serde_parquet_writer::finish() {
5258
co_await _writer.close();
53-
_mem_tracker.release();
5459
_buffered_bytes = _flushed_bytes = 0;
5560
co_return writer_error::ok;
5661
}

src/v/datalake/serde_parquet_writer.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ class serde_parquet_writer : public parquet_ostream {
2525
private:
2626
serde::parquet::writer _writer;
2727
writer_mem_tracker& _mem_tracker;
28-
size_t _buffered_bytes{0};
29-
size_t _flushed_bytes{0};
28+
int64_t _buffered_bytes{0};
29+
int64_t _flushed_bytes{0};
3030
};
3131

3232
class serde_parquet_writer_factory : public parquet_ostream_factory {

src/v/datalake/tests/test_data_writer.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
namespace datalake {
2424
class noop_mem_tracker : public writer_mem_tracker {
2525
public:
26-
ss::future<>
27-
update_current_memory_usage(size_t, ss::abort_source&) override {
26+
ss::future<> reserve_bytes(size_t, ss::abort_source&) override {
27+
return ss::make_ready_future<>();
28+
}
29+
ss::future<> free_bytes(size_t, ss::abort_source&) override {
2830
return ss::make_ready_future<>();
2931
}
3032
void release() override {}

src/v/datalake/translation/deps.cc

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,18 @@ ss::future<cluster::errc> wait_stm_translated(
6161
}
6262
} // namespace
6363

64-
ss::future<>
65-
noop_mem_tracker::update_current_memory_usage(size_t, ss::abort_source&) {
64+
ss::future<> noop_mem_tracker::reserve_bytes(size_t, ss::abort_source&) {
65+
return ss::make_ready_future<>();
66+
}
67+
ss::future<> noop_mem_tracker::free_bytes(size_t, ss::abort_source&) {
6668
return ss::make_ready_future<>();
6769
}
6870
void noop_mem_tracker::release() {}
6971

70-
ss::future<> writer_reservations_impl::update_current_memory_usage(
71-
size_t new_used_bytes, ss::abort_source& as) {
72-
_current_used_bytes = new_used_bytes;
73-
// todo: as a potential optimization we can choose to release bytes if the
74-
// new usage is less than current
75-
while (_current_used_bytes > _reservations.count()) {
72+
ss::future<>
73+
translator_mem_tracker::reserve_bytes(size_t bytes, ss::abort_source& as) {
74+
_current_usage += bytes;
75+
while (_current_usage > _reservations.count()) {
7676
// reserve any deficit
7777
auto reservation = co_await _reservations_tracker.reserve_memory(as);
7878
if (_reservations.count()) {
@@ -83,16 +83,20 @@ ss::future<> writer_reservations_impl::update_current_memory_usage(
8383
}
8484
}
8585

86-
void writer_reservations_impl::release() {
87-
_current_used_bytes = 0;
88-
_reservations.return_all();
86+
ss::future<>
87+
translator_mem_tracker::free_bytes(size_t bytes, ss::abort_source&) {
88+
_current_usage -= std::min(_current_usage, bytes);
89+
return ss::now();
8990
}
9091

91-
size_t writer_reservations_impl::current_usage() const {
92-
return _current_used_bytes;
92+
void translator_mem_tracker::release() {
93+
_current_usage = 0;
94+
_reservations.return_all();
9395
}
9496

95-
size_t writer_reservations_impl::total_reserved() const {
97+
size_t translator_mem_tracker::current_usage() const { return _current_usage; }
98+
99+
size_t translator_mem_tracker::total_reserved() const {
96100
return _reservations.count();
97101
}
98102

@@ -409,7 +413,8 @@ class partition_translation_context : public translation_context {
409413
, _probe(std::move(probe))
410414
, _invalid_record_action(compute_invalid_record_action())
411415
, _cp_enabled(translation_task::custom_partitioning_enabled{
412-
_features.is_active(features::feature::datalake_iceberg_ga)}) {}
416+
_features.is_active(features::feature::datalake_iceberg_ga)})
417+
, _mem_tracker(translator_mem_tracker{_reservations}) {}
413418

414419
ss::future<> translate_now(
415420
model::record_batch_reader reader, ss::abort_source& as) final {
@@ -480,8 +485,9 @@ class partition_translation_context : public translation_context {
480485

481486
ss::future<> flush() final {
482487
if (_in_progress_translation) {
483-
return _in_progress_translation->flush().then_wrapped(
484-
[](auto result_f) {
488+
vlog(datalake_log.trace, "[{}] flushing writers", _ntp);
489+
return _in_progress_translation->flush()
490+
.then_wrapped([](auto result_f) {
485491
if (result_f.failed()) {
486492
return ss::make_exception_future(
487493
result_f.get_exception());
@@ -491,13 +497,15 @@ class partition_translation_context : public translation_context {
491497
return ss::make_exception_future(result.error());
492498
}
493499
return ss::now();
494-
});
500+
})
501+
.finally([this]() { _mem_tracker.release(); });
495502
}
496503
return ss::now();
497504
}
498505

499506
ss::future<checked<coordinator::translated_offset_range, translation_errc>>
500507
finish(retry_chain_node& rcn, ss::abort_source& as) final {
508+
_mem_tracker.release();
501509
if (!_in_progress_translation) {
502510
co_return translation_errc::no_data;
503511
}
@@ -522,11 +530,7 @@ class partition_translation_context : public translation_context {
522530
co_await std::move(task.value()).discard().discard_result();
523531
}
524532

525-
size_t buffered_bytes() const final {
526-
return _in_progress_translation
527-
? _in_progress_translation->buffered_bytes()
528-
: 0;
529-
}
533+
size_t buffered_bytes() const final { return _mem_tracker.current_usage(); }
530534

531535
private:
532536
scheduling::clock::duration compute_target_lag() const {
@@ -546,7 +550,7 @@ class partition_translation_context : public translation_context {
546550
_writer_scratch_space, // storage temp files are written to
547551
"", // file prefix
548552
ss::make_shared<serde_parquet_writer_factory>(),
549-
std::make_unique<writer_reservations_impl>(_reservations));
553+
_mem_tracker);
550554
}
551555

552556
model::iceberg_invalid_record_action compute_invalid_record_action() const {
@@ -577,7 +581,7 @@ class partition_translation_context : public translation_context {
577581
ss::lw_shared_ptr<translation_probe> _probe;
578582
model::iceberg_invalid_record_action _invalid_record_action;
579583
translation_task::custom_partitioning_enabled _cp_enabled;
580-
584+
translator_mem_tracker _mem_tracker;
581585
std::optional<translation_task> _in_progress_translation;
582586
bool _discard_translated_state{false};
583587
};

src/v/datalake/translation/deps.h

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,29 @@ namespace datalake::translation {
2929

3030
class noop_mem_tracker : public writer_mem_tracker {
3131
public:
32-
ss::future<>
33-
update_current_memory_usage(size_t, ss::abort_source&) override;
32+
ss::future<> reserve_bytes(size_t, ss::abort_source&) override;
33+
ss::future<> free_bytes(size_t, ss::abort_source&) override;
3434
void release() override;
3535
};
3636

37-
class writer_reservations_impl : public writer_mem_tracker {
37+
/**
38+
* Tracks memory usage across all writers in a single translator.
39+
*/
40+
class translator_mem_tracker : public writer_mem_tracker {
3841
public:
39-
explicit writer_reservations_impl(
42+
explicit translator_mem_tracker(
4043
scheduling::reservations_tracker& scheduling_reservations)
4144
: _reservations_tracker(scheduling_reservations) {}
4245

43-
ss::future<>
44-
update_current_memory_usage(size_t, ss::abort_source&) override;
46+
ss::future<> reserve_bytes(size_t, ss::abort_source&) override;
47+
ss::future<> free_bytes(size_t, ss::abort_source&) override;
4548
void release() override;
4649

4750
size_t current_usage() const;
4851
size_t total_reserved() const;
4952

5053
private:
51-
size_t _current_used_bytes{0};
54+
size_t _current_usage{0};
5255
scheduling::reservations_tracker& _reservations_tracker;
5356
ssx::semaphore_units _reservations;
5457
};

src/v/datalake/translation/tests/partition_translator_tests.cc

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -517,51 +517,53 @@ using scheduling::scheduler_fixture;
517517
TEST_F_CORO(scheduler_fixture, test_writer_reservations_accounting) {
518518
auto& reservations = *_scheduler->reservations();
519519
{
520-
writer_reservations_impl writer{reservations};
520+
translator_mem_tracker writer{reservations};
521521

522522
auto cleanup = ss::defer([&writer] { writer.release(); });
523523

524524
ss::abort_source as;
525525

526-
ASSERT_EQ_CORO(writer.current_usage(), 0);
526+
size_t total_usage = 0;
527+
ASSERT_EQ_CORO(writer.current_usage(), total_usage);
527528
ASSERT_EQ_CORO(writer.total_reserved(), 0);
528529

529-
auto current_usage = 1_MiB;
530+
auto bytes = 1_MiB;
530531
// update initial usage, should result in a reservation
531-
co_await writer.update_current_memory_usage(current_usage, as);
532+
co_await writer.reserve_bytes(bytes, as);
533+
total_usage += bytes;
532534

533-
ASSERT_EQ_CORO(writer.current_usage(), current_usage);
535+
ASSERT_EQ_CORO(writer.current_usage(), total_usage);
534536
ASSERT_EQ_CORO(writer.total_reserved(), block_size);
535537

536538
// try 3 more times, we are still within the block limit;
537-
while (current_usage <= block_size) {
538-
co_await writer.update_current_memory_usage(current_usage, as);
539-
current_usage += 1_MiB;
539+
while (total_usage < block_size) {
540+
co_await writer.reserve_bytes(bytes, as);
541+
total_usage += bytes;
540542
}
541543

542544
ASSERT_EQ_CORO(writer.current_usage(), block_size);
543545
ASSERT_EQ_CORO(writer.total_reserved(), block_size);
544546

545-
current_usage += 1_MiB;
546547
// update again, should reserve a new block.
547-
co_await writer.update_current_memory_usage(current_usage, as);
548+
co_await writer.reserve_bytes(bytes, as);
549+
total_usage += bytes;
548550

549-
ASSERT_EQ_CORO(writer.current_usage(), current_usage);
551+
ASSERT_EQ_CORO(writer.current_usage(), total_usage);
550552
ASSERT_EQ_CORO(writer.total_reserved(), 2 * block_size);
551553

552554
// exhaust all memory
553555
while (writer.current_usage() != total_memory) {
554-
current_usage += 1_MiB;
555-
co_await writer.update_current_memory_usage(current_usage, as);
556+
co_await writer.reserve_bytes(bytes, as);
557+
total_usage += 1_MiB;
556558
}
557559

558560
ss::promise<> done;
559561
// update again, should block on reservations due to memory limit
560562
// exhaustion.
561563
auto f = ss::with_timeout(
562564
ss::steady_clock_type::now() + 500ms,
563-
writer.update_current_memory_usage(current_usage + 1_MiB, as)
564-
.finally([&done] { done.set_value(); }));
565+
writer.reserve_bytes(bytes, as).finally(
566+
[&done] { done.set_value(); }));
565567

566568
ASSERT_THROW_CORO(co_await std::move(f), ss::timed_out_error);
567569

0 commit comments

Comments
 (0)