Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Projection pushdown optimization #4180

Open
wants to merge 64 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
e6bf8a8
Initial commit
balavinaithirthan May 2, 2024
f660086
Some initial ideas
balavinaithirthan May 2, 2024
9c32232
Revert "Some initial ideas"
balavinaithirthan May 2, 2024
6642ff5
Add select_projection arguement to optimize and member optimize_result
balavinaithirthan May 3, 2024
e7ab6dc
Add CAF inspect for feather and select_projection
balavinaithirthan May 3, 2024
6f1b3bd
Modify name and data stucture of columnar_selection
balavinaithirthan May 9, 2024
a823a03
Implement non-nesting projection push into feather read
balavinaithirthan May 9, 2024
69bf807
Allow for basic nesting in feather parsing
balavinaithirthan May 16, 2024
d73a0c8
Fix optimization for parquet and feather format
balavinaithirthan May 22, 2024
7630030
Add concept of no_columnar_selection
balavinaithirthan May 22, 2024
b92c5aa
Change various operators to allow for selection to pass/block
balavinaithirthan May 22, 2024
3edb0c2
Incorperate selection movement into pipeline optimization function
balavinaithirthan May 22, 2024
410f018
Remove unnec spacing and logic in feather parsing
balavinaithirthan May 22, 2024
16658a8
Improve specificity of selection parsing
balavinaithirthan May 22, 2024
1814902
Add new selection_finished param to columnar_selection
balavinaithirthan May 22, 2024
18d3901
Modify pipeline logic to reset after selection
balavinaithirthan May 22, 2024
45e2010
Remove truncation reset in feather parsing
balavinaithirthan May 22, 2024
005e2fc
Update nix and tenzir-plugin link
balavinaithirthan May 22, 2024
d9e8e77
Add nix dependency
balavinaithirthan May 22, 2024
baac41d
fix merge conflict
balavinaithirthan May 22, 2024
042c229
Complete last files of git merge
balavinaithirthan May 22, 2024
683dbc4
Add selection to cron
balavinaithirthan May 22, 2024
b3f38dd
simplify pipeline logic
balavinaithirthan May 22, 2024
d814884
Remove debugging info for pipeline
balavinaithirthan May 22, 2024
d706cde
Remove std::optional where not necc and selection_finished flag
balavinaithirthan May 23, 2024
cbdedfd
Change columnar_selection name to select_optimization
balavinaithirthan May 23, 2024
89d63ec
Add tenzir-plugin links
balavinaithirthan May 23, 2024
9388d39
Merge remote-tracking branch 'origin/main' into topic/projectionPushd…
balavinaithirthan May 23, 2024
e186cb8
Block selection for order_invariants
balavinaithirthan May 23, 2024
7ed976b
Add optimization fields to plugin_parser base class
balavinaithirthan May 23, 2024
072b839
Further changes for order_invariant
balavinaithirthan May 23, 2024
6a41e84
Add optimization barriers
balavinaithirthan May 23, 2024
87d9765
simplify logic to use std::nullopt as optimization blocker
balavinaithirthan May 23, 2024
39963e0
Add selection about section to the optimize() virtual function
balavinaithirthan May 23, 2024
e325b91
Force empty selections to go unoptimized
balavinaithirthan May 23, 2024
6205c49
Minor formatting changes
balavinaithirthan May 23, 2024
ba50a7c
Fix for empty selection returning empty events
balavinaithirthan May 24, 2024
c1f1bdc
Parquet selection tests and references
balavinaithirthan May 24, 2024
bb119a6
Feather selection tests and references
balavinaithirthan May 24, 2024
5c1798a
Modified tests for empty selection
balavinaithirthan May 24, 2024
a9ea3af
Change log entry for empty selection bug fix
balavinaithirthan May 24, 2024
5dfbea6
Additional sanity test for selecting non feather/parquet
balavinaithirthan May 24, 2024
009f3a7
Update parquet reference file for empty event
balavinaithirthan May 24, 2024
03c8b9d
Fix redefined variables in parquet
balavinaithirthan May 29, 2024
b7284cd
Move selection optimization before expression
balavinaithirthan May 29, 2024
051a957
Add optimize_parser_result as return type to account for type of opti…
balavinaithirthan May 29, 2024
8f700bf
Update links to tenzir/plugins
balavinaithirthan May 29, 2024
ebfe2fd
Merge remote-tracking branch 'origin/main' into topic/projectionPushd…
balavinaithirthan May 29, 2024
19e68a9
Fix headers
balavinaithirthan May 31, 2024
365bf7e
Remove space in exec pipeline
balavinaithirthan May 31, 2024
bd50ecc
Add flags for plugin_parser optimization types
balavinaithirthan May 31, 2024
b9bd14e
Change optimize_parser_result for usage clarity
balavinaithirthan Jun 4, 2024
c03a687
Change name fields_of_interest to fields
balavinaithirthan Jun 5, 2024
025947e
Prevent optimization of overlapping select calls
balavinaithirthan Jun 5, 2024
8b8d7e6
Allow filter and selection to be placed back into pipeline
balavinaithirthan Jun 5, 2024
65834ed
Move opt.selection to properly account for filter+selection case
balavinaithirthan Jun 5, 2024
00bb8cb
Add comments and clean up formatting
balavinaithirthan Jun 5, 2024
cf82c71
Update links
balavinaithirthan Jun 5, 2024
c20ad1d
Merge remote-tracking branch 'origin/main' into topic/projectionPushd…
balavinaithirthan Jun 5, 2024
7041d45
Change select_optimization to struct
balavinaithirthan Jun 7, 2024
07e283f
Add new tenzir-plugin link
balavinaithirthan Jun 7, 2024
7f0700a
Block select from moving past where, so only where | select is allowed
balavinaithirthan Jun 7, 2024
9813e15
Update comment for optimize()
balavinaithirthan Jun 7, 2024
390d79b
Prevent where & select opt in feather and parquet
balavinaithirthan Jun 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/tenzir-plugins
balavinaithirthan marked this conversation as resolved.
Show resolved Hide resolved
135 changes: 124 additions & 11 deletions libtenzir/builtins/formats/feather.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
#include <tenzir/fwd.hpp>
#include <tenzir/generator.hpp>
#include <tenzir/make_byte_reader.hpp>
#include <tenzir/pipeline.hpp>
#include <tenzir/plugin.hpp>
#include <tenzir/select_optimization.hpp>
#include <tenzir/store.hpp>
#include <tenzir/table_slice.hpp>

Expand All @@ -31,7 +33,10 @@
#include <arrow/util/key_value_metadata.h>
#include <caf/expected.hpp>

#include <cstddef>
#include <memory>
#include <queue>
#include <vector>

namespace tenzir::plugins::feather {

Expand Down Expand Up @@ -285,22 +290,108 @@ class callback_listener : public arrow::ipc::Listener {
public:
callback_listener() = default;

arrow::Status OnRecordBatchDecoded(
std::shared_ptr<arrow::RecordBatch> record_batch) override {
auto OnRecordBatchDecoded(std::shared_ptr<arrow::RecordBatch> record_batch)
-> arrow::Status override {
record_batch_buffer.push(std::move(record_batch));
return arrow::Status::OK();
}

auto OnSchemaDecoded(std::shared_ptr<arrow::Schema> schema)
-> arrow::Status override {
decoded_schema = std::move(schema);
return arrow::Status::OK();
}

std::shared_ptr<arrow::Schema> decoded_schema;
std::queue<std::shared_ptr<arrow::RecordBatch>> record_batch_buffer;
};

auto parse_feather(generator<chunk_ptr> input, operator_control_plane& ctrl)
auto parse_feather(generator<chunk_ptr> input, operator_control_plane& ctrl,
const located<select_optimization> selection)
-> generator<table_slice> {
auto byte_reader = make_byte_reader(std::move(input));
auto listener = std::make_shared<callback_listener>();
auto stream_decoder = arrow::ipc::StreamDecoder(listener);
auto schema_listener = std::make_shared<callback_listener>();
auto read_options = arrow::ipc::IpcReadOptions::Defaults();
auto schema_stream_decoder
= arrow::ipc::StreamDecoder(schema_listener, read_options);
type schema;
auto buffered_schema_data = std::vector<std::shared_ptr<arrow::Buffer>>{};
auto truncated_bytes = size_t{0};
auto decoded_once = false;
auto further_selection_needed = false;
while (true) {
auto required_size
= detail::narrow_cast<size_t>(schema_stream_decoder.next_required_size());
auto payload = byte_reader(required_size);
if (!payload) {
co_yield {};
continue;
}
truncated_bytes += payload->size();
if (payload->size() < required_size) {
if (truncated_bytes != 0 and payload->size() != 0) {
// Ideally this always would be just a warning, but the stream decoder
// happily continues to consume invalid bytes. E.g., trying to read a
// JSON file with this parser will just swallow all bytes, emitting this
// one error at the very end. Not a single time does consuming a buffer
// actually fail. We should probably look into limiting the memory usage
// here, as the stream decoder will keep consumed-but-not-yet-converted
// buffers in memory.
diagnostic::warning("truncated {} trailing bytes", truncated_bytes)
.severity(decoded_once ? severity::warning : severity::error)
.emit(ctrl.diagnostics());
}
co_return;
}
auto arrow_buffer = as_arrow_buffer(std::move(payload));
buffered_schema_data.push_back(arrow_buffer);
auto decode_result = schema_stream_decoder.Consume(std::move(arrow_buffer));
if (!decode_result.ok()) {
diagnostic::error("{}", decode_result.ToStringWithoutContextLines())
.note("failed to decode the byte stream into a record batch")
.emit(ctrl.diagnostics());
co_return;
}
if (schema_listener->decoded_schema) {
decoded_once = false;
schema = type::from_arrow(*schema_listener->decoded_schema);
break;
}
}
auto indices = std::vector<tenzir::offset>{};
if (!selection.inner.fields_of_interest.empty()) {
for (const auto& field : selection.inner.fields_of_interest) {
for (auto index : schema.resolve(field)) {
if (index.size() > 1) {
further_selection_needed = true;
}
read_options.included_fields.push_back(static_cast<int>(index[0]));
indices.push_back(std::move(index));
}
}
std::sort(indices.begin(), indices.end());
indices.erase(std::unique(indices.begin(), indices.end()), indices.end());
std::sort(read_options.included_fields.begin(),
read_options.included_fields.end());
read_options.included_fields.erase(
std::unique(read_options.included_fields.begin(),
read_options.included_fields.end()),
read_options.included_fields.end());
for (size_t i = 0; i < indices.size(); i++) {
indices[i][0] = i;
}
}
auto listener = std::make_shared<callback_listener>();
auto stream_decoder = arrow::ipc::StreamDecoder(listener, read_options);
for (auto&& buffer : buffered_schema_data) {
auto decode_result = stream_decoder.Consume(buffer);
if (!decode_result.ok()) {
diagnostic::error("{}", decode_result.ToStringWithoutContextLines())
.note("failed to decode the byte stream into a record batch")
.emit(ctrl.diagnostics());
co_return;
}
}
balavinaithirthan marked this conversation as resolved.
Show resolved Hide resolved
while (true) {
auto required_size
= detail::narrow_cast<size_t>(stream_decoder.next_required_size());
Expand Down Expand Up @@ -355,7 +446,11 @@ auto parse_feather(generator<chunk_ptr> input, operator_control_plane& ctrl)
.emit(ctrl.diagnostics());
co_return;
}
co_yield table_slice(batch);
if (further_selection_needed) {
co_yield select_columns(table_slice(batch), indices);
} else {
co_yield table_slice(batch);
}
}
}
}
Expand All @@ -375,8 +470,8 @@ auto print_feather(
.emit(ctrl.diagnostics());
co_return;
}
// We must finish the clear the buffer because the provided APIs do not offer
// a scrape and rewrite on the allocated same memory.
// We must finish the clear the buffer because the provided APIs do not
// offer a scrape and rewrite on the allocated same memory.
auto finished_buffer_result = sink->Finish();
if (!finished_buffer_result.ok()) {
diagnostic::error(
Expand Down Expand Up @@ -413,20 +508,38 @@ class feather_options {
class feather_parser final : public plugin_parser {
public:
feather_parser() = default;

feather_parser(located<select_optimization> selection)
: selection_{std::move(selection)} {
selection_optimized = true;
}
auto name() const -> std::string override {
return "feather";
}

auto
instantiate(generator<chunk_ptr> input, operator_control_plane& ctrl) const
-> std::optional<generator<table_slice>> override {
return parse_feather(std::move(input), ctrl);
return parse_feather(std::move(input), ctrl, selection_);
}

friend auto inspect(auto& f, feather_parser& x) -> bool {
return f.object(x).fields();
return f.object(x).fields(f.field("selection", x.selection_));
}

auto optimize(expression const& filter, event_order order,
select_optimization const& selection)
-> std::unique_ptr<plugin_parser> override {
(void)filter;
(void)order;
if (selection.fields_of_interest.empty()) {
std::make_unique<feather_parser>();
}
return std::make_unique<feather_parser>(
located(selection, location::unknown));
}

private:
located<select_optimization> selection_{};
};

class feather_printer final : public plugin_printer {
Expand Down
6 changes: 5 additions & 1 deletion libtenzir/builtins/formats/json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,11 @@ class json_parser final : public plugin_parser {
return "json";
}

auto optimize(event_order order) -> std::unique_ptr<plugin_parser> override {
auto optimize(expression const& filter, event_order order,
select_optimization const& selection)
-> std::unique_ptr<plugin_parser> override {
(void)filter;
(void)selection;
auto args = args_;
args.preserve_order = order == event_order::ordered;
return std::make_unique<json_parser>(std::move(args));
Expand Down
4 changes: 3 additions & 1 deletion libtenzir/builtins/operators/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ class api_operator final : public crtp_operator<api_operator> {
return operator_location::remote;
}

auto optimize(expression const& filter, event_order order) const
auto optimize(expression const& filter, event_order order,
select_optimization const& selection) const
-> optimize_result override {
(void)selection;
(void)order;
(void)filter;
return do_not_optimize(*this);
Expand Down
6 changes: 4 additions & 2 deletions libtenzir/builtins/operators/batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,12 @@ class batch_operator final : public crtp_operator<batch_operator> {
}
}

auto optimize(expression const& filter, event_order order) const
auto optimize(expression const& filter, event_order order,
select_optimization const& selection) const
-> optimize_result override {
return optimize_result{
filter, order, std::make_unique<batch_operator>(limit_, timeout_, order)};
filter, order, std::make_unique<batch_operator>(limit_, timeout_, order),
selection};
}

auto name() const -> std::string override {
Expand Down
4 changes: 3 additions & 1 deletion libtenzir/builtins/operators/chart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,11 @@ class chart_operator final : public crtp_operator<chart_operator> {
return "chart";
}

auto optimize(const expression& filter, event_order order) const
auto optimize(const expression& filter, event_order order,
select_optimization const& selection) const
-> optimize_result override {
(void)filter;
(void)selection;
return optimize_result::order_invariant(*this, order);
}

Expand Down
8 changes: 6 additions & 2 deletions libtenzir/builtins/operators/compress_decompress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,10 @@ class compress_operator final : public crtp_operator<compress_operator> {
return "compress";
}

auto optimize(expression const& filter, event_order order) const
auto optimize(expression const& filter, event_order order,
select_optimization const& selection) const
-> optimize_result override {
(void)selection;
(void)filter;
(void)order;
return do_not_optimize(*this);
Expand Down Expand Up @@ -340,8 +342,10 @@ class decompress_operator final : public crtp_operator<decompress_operator> {
return "decompress";
}

auto optimize(expression const& filter, event_order order) const
auto optimize(expression const& filter, event_order order,
select_optimization const& selection) const
-> optimize_result override {
(void)selection;
(void)filter;
(void)order;
return do_not_optimize(*this);
Expand Down