File tree Expand file tree Collapse file tree 2 files changed +22
-1
lines changed Expand file tree Collapse file tree 2 files changed +22
-1
lines changed Original file line number Diff line number Diff line change 10
10
11
11
#include " datalake/local_parquet_file_writer.h"
12
12
13
+ #include " base/units.h"
13
14
#include " base/vlog.h"
14
15
#include " datalake/logger.h"
15
16
@@ -175,7 +176,17 @@ local_parquet_file_writer_factory::local_parquet_file_writer_factory(
175
176
176
177
ss::future<result<std::unique_ptr<parquet_file_writer>, writer_error>>
177
178
local_parquet_file_writer_factory::create_writer (
178
- const iceberg::struct_type& schema, ss::abort_source&) {
179
+ const iceberg::struct_type& schema, ss::abort_source& as) {
180
+ // There is a per writer cost associated which includes stuff like
181
+ // local path strings and some inmemory data structures holding
182
+ // the writer instances. This is in place to void an explosion of
183
+ // writer instances, example partition_by(offset) which creates a
184
+ // writer per offset.
185
+ // Additionally one other contributor per writer is the buffer used
186
+ // in the output stream which defaults to 8_KiB, which is only released
187
+ // on output stream close().
188
+ static constexpr size_t WRITER_RESERVATION_OVERHEAD = 10_KiB;
189
+ co_await _mem_tracker.reserve_bytes (WRITER_RESERVATION_OVERHEAD, as);
179
190
auto writer = std::make_unique<local_parquet_file_writer>(
180
191
create_filename (), _writer_factory, _mem_tracker);
181
192
Original file line number Diff line number Diff line change @@ -485,6 +485,16 @@ class partition_translation_context : public translation_context {
485
485
486
486
ss::future<> flush () final {
487
487
if (_in_progress_translation) {
488
+ // Note: The flush here *does not* fully release memory associated
489
+ // with the underlying file output stream because Seastar only
490
+ // allows flush() on stream close(). The default buffer size is
491
+ // 8KiB, this means up to 8KiB per writer can still be buffered even
492
+ // after flush which is not accounted in reservations. This could be
493
+ // an issue if there is an explosion of file writer instances. We
494
+ // try to factor 10KiB overhead per writer, when it is created but
495
+ // it will be released as soon as the flush is called. An
496
+ // improvement could be to account for the fixed reservation cost
497
+ // across flush calls and only release on finish.
488
498
vlog (datalake_log.trace , " [{}] flushing writers" , _ntp);
489
499
return _in_progress_translation->flush ()
490
500
.then_wrapped ([](auto result_f) {
You can’t perform that action at this time.
0 commit comments