Skip to content

Commit 87289b2

Browse files
committed
dl/translation/writer: propagate an abort source to create writer
1 parent 06aa36e commit 87289b2

File tree

5 files changed

+9
-6
lines changed

5 files changed

+9
-6
lines changed

src/v/datalake/data_writer_interface.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ class parquet_file_writer_factory {
174174

175175
virtual ss::future<
176176
result<std::unique_ptr<parquet_file_writer>, writer_error>>
177-
create_writer(const iceberg::struct_type& /* schema */) = 0;
177+
create_writer(const iceberg::struct_type& /* schema */, ss::abort_source&)
178+
= 0;
178179
};
179180

180181
} // namespace datalake

src/v/datalake/local_parquet_file_writer.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ local_parquet_file_writer_factory::local_parquet_file_writer_factory(
175175

176176
ss::future<result<std::unique_ptr<parquet_file_writer>, writer_error>>
177177
local_parquet_file_writer_factory::create_writer(
178-
const iceberg::struct_type& schema) {
178+
const iceberg::struct_type& schema, ss::abort_source&) {
179179
auto writer = std::make_unique<local_parquet_file_writer>(
180180
create_filename(), _writer_factory, *_mem_tracker);
181181

src/v/datalake/local_parquet_file_writer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class local_parquet_file_writer_factory : public parquet_file_writer_factory {
6868
std::unique_ptr<writer_mem_tracker>);
6969

7070
ss::future<result<std::unique_ptr<parquet_file_writer>, writer_error>>
71-
create_writer(const iceberg::struct_type& schema) final;
71+
create_writer(const iceberg::struct_type& schema, ss::abort_source&) final;
7272

7373
private:
7474
local_path create_filename() const;

src/v/datalake/partitioning_writer.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ ss::future<writer_error> partitioning_writer::add_data(
5757
}
5858
auto writer_iter = writers_.find(pk);
5959
if (writer_iter == writers_.end()) {
60-
auto writer_res = co_await writer_factory_.create_writer(type_);
60+
auto writer_res = co_await writer_factory_.create_writer(type_, as);
6161
if (writer_res.has_error()) {
6262
vlog(
6363
datalake_log.error,

src/v/datalake/tests/test_data_writer.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ class test_data_writer_factory : public parquet_file_writer_factory {
7373
: _return_error{return_error} {}
7474

7575
ss::future<result<std::unique_ptr<parquet_file_writer>, writer_error>>
76-
create_writer(const iceberg::struct_type& schema) override {
76+
create_writer(
77+
const iceberg::struct_type& schema, ss::abort_source&) override {
7778
co_return std::make_unique<test_data_writer>(
7879
std::move(schema), _return_error);
7980
}
@@ -122,7 +123,8 @@ class test_serde_parquet_data_writer : public parquet_file_writer {
122123
class test_serde_parquet_writer_factory : public parquet_file_writer_factory {
123124
public:
124125
ss::future<result<std::unique_ptr<parquet_file_writer>, writer_error>>
125-
create_writer(const iceberg::struct_type& schema) override {
126+
create_writer(
127+
const iceberg::struct_type& schema, ss::abort_source&) override {
126128
auto ostream_writer = co_await _serde_parquet_factory.create_writer(
127129
schema, utils::make_null_output_stream(), _mem_tracker);
128130

0 commit comments

Comments
 (0)