Skip to content

Commit

Permalink
Revert "Some initial ideas"
Browse files Browse the repository at this point in the history
This reverts commit f660086.
  • Loading branch information
balavinaithirthan committed May 2, 2024
1 parent f660086 commit 9c32232
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 115 deletions.
15 changes: 2 additions & 13 deletions libtenzir/builtins/formats/feather.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,10 @@ class callback_listener : public arrow::ipc::Listener {
std::queue<std::shared_ptr<arrow::RecordBatch>> record_batch_buffer;
};

auto parse_feather(generator<chunk_ptr> input, operator_control_plane& ctrl,
std::optional<std::vector<int>> fields)
auto parse_feather(generator<chunk_ptr> input, operator_control_plane& ctrl)
-> generator<table_slice> {
auto byte_reader = make_byte_reader(std::move(input));
auto listener = std::make_shared<callback_listener>();
auto read_options = arrow::ipc::IpcReadOptions::Defaults();
read_options.included_fields = std::move(*fields);
auto stream_decoder = arrow::ipc::StreamDecoder(listener);
auto truncated_bytes = size_t{0};
auto decoded_once = false;
Expand Down Expand Up @@ -424,20 +421,12 @@ class feather_parser final : public plugin_parser {
auto
instantiate(generator<chunk_ptr> input, operator_control_plane& ctrl) const
-> std::optional<generator<table_slice>> override {
return parse_feather(std::move(input), ctrl, fields);
return parse_feather(std::move(input), ctrl);
}

friend auto inspect(auto& f, feather_parser& x) -> bool {
return f.object(x).fields();
}

auto optimize(event_order order) -> std::unique_ptr<plugin_parser> override {
// parse feather
//
}

private:
std::optional<std::vector<int>> fields;
};

class feather_printer final : public plugin_printer {
Expand Down
37 changes: 4 additions & 33 deletions libtenzir/builtins/operators/select.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
// SPDX-FileCopyrightText: (c) 2021 The Tenzir Contributors
// SPDX-License-Identifier: BSD-3-Clause

#include "tenzir/fwd.hpp"

#include "tenzir/location.hpp"

#include <tenzir/arrow_table_slice.hpp>
#include <tenzir/concept/convertible/data.hpp>
#include <tenzir/concept/convertible/to.hpp>
Expand All @@ -32,7 +28,7 @@ namespace {
/// The configuration of a select pipeline operator.
struct configuration {
/// The key suffixes of the fields to keep.
std::optional<std::vector<std::string>> fields = {};
std::vector<std::string> fields = {};

/// Support type inspection for easy parsing with convertible.
template <class Inspector>
Expand Down Expand Up @@ -62,8 +58,7 @@ class select_operator final
auto initialize(const type& schema, operator_control_plane&) const
-> caf::expected<state_type> override {
auto indices = state_type{};
// TODO: IF CONFIG IS EMPTY
for (const auto& field : config_.fields.value()) {
for (const auto& field : config_.fields) {
for (auto index : schema.resolve(field)) {
indices.push_back(std::move(index));
}
Expand All @@ -84,24 +79,10 @@ class select_operator final

auto optimize(expression const& filter, event_order order) const
-> optimize_result override {
return optimize_result{filter, order, nullptr, config_.fields};
(void)filter;
return optimize_result::order_invariant(*this, order);
}

// todo: selection
// auto optimize(selection const& filter, event_order order) const
// -> optimize_result override {
// return optimize_result{trivially_true_expression(), order, nullptr,
// config_.fields};
// }

// friend auto inspect(auto& f, select_operator& x) -> bool {
// if (auto dbg = as_debug_writer(f)) {
// return dbg->fmt_value("({} @ {:?})", x.expr_.inner, x.expr_.source);
// }
// return f.apply(x.expr_, x.config_);

// }

friend auto inspect(auto& f, select_operator& x) -> bool {
return f.apply(x.config_);
}
Expand All @@ -123,7 +104,6 @@ class plugin final : public virtual operator_plugin<select_operator> {
parsers::optional_ws_or_comment, parsers::extractor,
parsers::extractor_char, parsers::extractor_list;
const auto* f = pipeline.begin();
std::string x(f);
const auto* const l = pipeline.end();
const auto p = required_ws_or_comment >> extractor_list
>> optional_ws_or_comment >> end_of_pipeline_operator;
Expand All @@ -136,15 +116,6 @@ class plugin final : public virtual operator_plugin<select_operator> {
pipeline)),
};
}
// auto expr = located<tenzir::expression>();
// auto field = data{x}; //how to use field extractor
// auto keep_field = data{true}; //better way here
// auto selection = predicate(field, relational_operator::equal,
// keep_field); auto expr = located<tenzir::expression>{selection,
// location::unknown}; // what is source

// TODO: parse by space
config.fields.value().push_back(x);
return {
std::string_view{f, l},
std::make_unique<select_operator>(std::move(config)),
Expand Down
19 changes: 4 additions & 15 deletions libtenzir/include/tenzir/expression.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <caf/variant.hpp>

#include <memory>
#include <optional>
#include <string>
#include <type_traits>
#include <vector>
Expand Down Expand Up @@ -138,13 +137,6 @@ auto inspect(Inspector& f, data_extractor& x) {
using operand = caf::variant<meta_extractor, field_extractor, type_extractor,
data_extractor, data>;

struct selection {
std::optional<std::vector<std::string>> fields = {};
selection() = default;
selection(std::optional<std::vector<std::string>> fields)
: fields{std::move(fields)} {
}
};
/// A predicate with two operands evaluated under a relational operator.
struct predicate : detail::totally_ordered<predicate> {
predicate() = default;
Expand Down Expand Up @@ -251,9 +243,8 @@ auto inspect(Inspector& f, negation& x) {
return f.object(x).pretty_name("negation").fields(f.field("expr", x.expr()));
}

class optimization_filter {};
/// A query expression.
class expression : detail::totally_ordered<expression>, optimization_filter {
class expression : detail::totally_ordered<expression> {
public:
using types = caf::detail::type_list<caf::none_t, conjunction, disjunction,
negation, predicate>;
Expand All @@ -269,9 +260,8 @@ class expression : detail::totally_ordered<expression>, optimization_filter {
template <class T>
requires(detail::contains_type_v<types, std::decay_t<T>>)
expression(T&& x) : node_(std::forward<T>(x)) {
if constexpr (detail::is_any_v<std::decay_t<T>, conjunction, disjunction>) {
if constexpr (detail::is_any_v<std::decay_t<T>, conjunction, disjunction>)
TENZIR_ASSERT(!caf::get<std::decay_t<T>>(node_).empty());
}
}

/// @cond PRIVATE
Expand Down Expand Up @@ -339,11 +329,10 @@ struct predicate_transformer {
if constexpr (std::is_convertible_v<result_type, typename T::value_type>) {
result.push_back(std::move(x));
} else {
if (!x) {
if (!x)
return x;
} else {
else
result.push_back(std::move(*x));
}
}
}
return result;
Expand Down
20 changes: 2 additions & 18 deletions libtenzir/include/tenzir/pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <fmt/core.h>

#include <memory>
#include <optional>
#include <type_traits>
#include <variant>

Expand Down Expand Up @@ -318,21 +317,10 @@ class operator_base {
/// which is implied by `sink <=> OPT | sink`. If `order = schema`, this
/// resolves to `sink <=> interleave | pass | sink`, which follows from what
/// we may assume about `sink`.
// virtual auto optimize(expression const& filter, event_order order) const
// -> optimize_result
// = 0;

// virtual auto optimize(optimizati const& filter, event_order order) const
// -> optimize_result
// = 0;

virtual auto optimize(expression const& filter, event_order order) const
-> optimize_result
= 0;

virtual auto optimize(selection const& filter, event_order order) const
-> optimize_result;

/// Returns the location of the operator.
virtual auto location() const -> operator_location {
return operator_location::anywhere;
Expand Down Expand Up @@ -416,16 +404,12 @@ struct optimize_result {
std::optional<expression> filter;
event_order order;
operator_ptr replacement;
std::optional<std::vector<std::string>> selection;

optimize_result(std::optional<expression> filter, event_order order,
operator_ptr replacement,
std::optional<std::vector<std::string>> selection
= std::nullopt)
operator_ptr replacement)
: filter{std::move(filter)},
order{order},
replacement{std::move(replacement)},
selection{std::move(selection)} {
replacement{std::move(replacement)} {
}

/// Always valid if the transformation performed by the operator does not
Expand Down
9 changes: 0 additions & 9 deletions libtenzir/src/exec_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ auto format_metric(const metric& metric) -> std::string {

auto add_implicit_source_and_sink(pipeline pipe, exec_config const& config)
-> caf::expected<pipeline> {
for (auto &&i: pipe.operators()) {
TENZIR_WARN("xxx {}", i->name());
}
if (pipe.infer_type<void>()) {
// Don't add implicit source.
} else if (pipe.infer_type<chunk_ptr>()
Expand Down Expand Up @@ -177,9 +174,6 @@ auto add_implicit_source_and_sink(pipeline pipe, exec_config const& config)
fmt::format("expected pipeline to be closed after "
"adding implicit source and sink"));
}
for (auto &&i: pipe.operators()) {
TENZIR_WARN("raw {}", i->name());
}
return pipe;
}

Expand All @@ -204,15 +198,12 @@ auto exec_pipeline(std::string content,
return {};
}
auto pipe = tql::to_pipeline(std::move(*parsed));
TENZIR_WARN("unoptimized: {:#?}", pipe);
auto implicit_pipe = add_implicit_source_and_sink(std::move(pipe), cfg);
if (not implicit_pipe) {
return std::move(implicit_pipe.error());
}
pipe = std::move(*implicit_pipe);

pipe = pipe.optimize_if_closed();
TENZIR_WARN("optimized: {:#?}", pipe);
auto self = caf::scoped_actor{sys};
auto result = caf::expected<void>{};
auto metrics = std::vector<metric>{};
Expand Down
28 changes: 1 addition & 27 deletions libtenzir/src/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,13 @@

#include "tenzir/collect.hpp"
#include "tenzir/diagnostics.hpp"
#include "tenzir/expression.hpp"
#include "tenzir/modules.hpp"
#include "tenzir/plugin.hpp"
#include "tenzir/tql/parser.hpp"

#include <caf/detail/stringification_inspector.hpp>
#include <caf/fwd.hpp>

#include <optional>

namespace tenzir {

class local_control_plane final : public operator_control_plane {
Expand Down Expand Up @@ -114,9 +111,8 @@ auto pipeline::internal_parse(std::string_view repr)
auto pipeline::internal_parse_as_operator(std::string_view repr)
-> caf::expected<operator_ptr> {
auto result = internal_parse(repr);
if (not result) {
if (not result)
return std::move(result.error());
}
return std::make_unique<pipeline>(std::move(*result));
}

Expand Down Expand Up @@ -153,7 +149,6 @@ auto pipeline::optimize_if_closed() const -> pipeline {
return *this;
}
auto [filter, pipe] = optimize_into_filter();

if (filter != trivially_true_expression()) {
// This could also be an assertion as it always points to an error in the
// operator implementation, but we try to continue with the original
Expand Down Expand Up @@ -194,20 +189,12 @@ auto pipeline::optimize(expression const& filter, event_order order) const
-> optimize_result {
auto current_filter = filter;
auto current_order = order;
auto current_selection = selection();
// Collect the optimized pipeline in reversed order.
auto result = std::vector<operator_ptr>{};
for (auto it = operators_.rbegin(); it != operators_.rend(); ++it) {
TENZIR_ASSERT(*it);
auto const& op = **it;
TENZIR_WARN("operation: {}", op.name());

auto opt = op.optimize(current_filter, current_order);

if (current_selection.fields.has_value()) {
auto opt = op.optimize(current_filter, current_order);
}

if (opt.filter) {
current_filter = std::move(*opt.filter);
} else if (current_filter != trivially_true_expression()) {
Expand All @@ -220,20 +207,12 @@ auto pipeline::optimize(expression const& filter, event_order order) const
result.push_back(std::move(ops[0]));
current_filter = trivially_true_expression();
}
if (opt.selection) {
current_selection.fields = opt.selection;
}
if (opt.replacement) {
result.push_back(std::move(opt.replacement));
}
current_order = opt.order;
}
std::reverse(result.begin(), result.end());
for (auto it = result.rbegin(); it != result.rend(); ++it) {
TENZIR_ASSERT(*it);
auto const& op = **it;
TENZIR_WARN("operation: {}", op.name());
}
return optimize_result{current_filter, current_order,
std::make_unique<pipeline>(std::move(result))};
}
Expand Down Expand Up @@ -353,11 +332,6 @@ auto operator_base::infer_type_impl(operator_type input) const
*output);
}

auto operator_base::optimize(selection const& filter, event_order order) const
-> optimize_result {
return optimize_result{trivially_true_expression(), order, nullptr};
}

auto pipeline::is_closed() const -> bool {
return !!check_type<void, void>();
}
Expand Down

0 comments on commit 9c32232

Please sign in to comment.