Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pipeline dispatcher part 2: execution #5409

Open
wants to merge 39 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
0e45744
fmt: correct format
sunng87 Jan 14, 2025
aa63b87
test: add negative tests
sunng87 Jan 15, 2025
6c226b5
feat: Add pipeline dispatching and execution output handling
sunng87 Jan 16, 2025
63f7909
refactor: Enhance ingest function to correctly process original data …
sunng87 Jan 17, 2025
9fd8535
refactor: call greptime_identity with intermediate values
sunng87 Jan 20, 2025
a9bc720
fix: typo
sunng87 Jan 20, 2025
81e57a3
test: port tests to refactored apis
sunng87 Jan 20, 2025
d37b59d
refactor: adapt dryrun api call
sunng87 Jan 20, 2025
dd40c09
refactor: move pipeline execution code to a separated module
sunng87 Jan 21, 2025
233d57f
refactor: update otlp pipeline execution path
sunng87 Jan 21, 2025
af64c06
fmt: format imports
sunng87 Jan 21, 2025
a980314
fix: compilation
sunng87 Jan 21, 2025
5bd8798
fix: resolve residual issues
sunng87 Jan 21, 2025
b43a6c8
refactor: address review comments
sunng87 Jan 22, 2025
ad05a39
chore: use btreemap as pipeline intermediate status trait modify
paomian Jan 23, 2025
3b5682b
Merge branch 'main' into feature/pipeline-dispatcher-2
sunng87 Jan 23, 2025
82ada54
Merge pull request #2 from paomian/feature/btreemap-pipeline
sunng87 Jan 23, 2025
13268f9
refactor: update dispatcher to accept BTreeMap
sunng87 Jan 23, 2025
a214812
refactor: update identity pipeline
sunng87 Jan 23, 2025
c7e08eb
refactor: use new input for pipeline
sunng87 Jan 23, 2025
eb6e8d2
chore: wip
paomian Jan 24, 2025
fe3290d
Merge pull request #3 from paomian/feature/btreemap-pipeline
sunng87 Jan 24, 2025
3f08d15
refactor: use updated prepare api
sunng87 Jan 24, 2025
daa9ec1
refactor: improve error and header name
sunng87 Jan 25, 2025
885cf2e
Merge branch 'main' into feature/pipeline-dispatcher-2
sunng87 Jan 26, 2025
df5c35d
feat: port flatten to new api
sunng87 Jan 26, 2025
8e1b6e9
chore: update pipeline api
paomian Jan 26, 2025
c803209
chore: fix transform and some pipeline test
paomian Jan 26, 2025
3a2c17f
Merge pull request #4 from paomian/feature/btreemap-pipeline
sunng87 Jan 26, 2025
9de70d3
refactor: reimplement cmcd
sunng87 Jan 26, 2025
f289318
refactor: update csv processor
sunng87 Jan 26, 2025
60a6421
fmt: update format
sunng87 Jan 26, 2025
1a29cfe
chore: fix regex and dissect processor
paomian Jan 27, 2025
448f94d
chore: fix test
paomian Jan 27, 2025
99b4db8
Merge pull request #5 from paomian/feature/btreemap-pipeline
sunng87 Jan 27, 2025
b5d2969
test: add integration test for http pipeline
sunng87 Jan 27, 2025
fc4b3f1
refactor: improve regex pipeline
sunng87 Jan 28, 2025
592f2f4
refactor: improve required field check
sunng87 Jan 28, 2025
0cc6ccd
Merge branch 'main' into feature/pipeline-dispatcher-2
sunng87 Jan 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
612 changes: 435 additions & 177 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 13 additions & 3 deletions src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use common_telemetry::tracing;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::PipelineWay;
use pipeline::{GreptimePipelineParams, PipelineWay};
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
use servers::query_handler::OpenTelemetryProtocolHandler;
use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef};
use session::context::QueryContextRef;
use snafu::ResultExt;

Expand Down Expand Up @@ -112,8 +112,10 @@ impl OpenTelemetryProtocolHandler for Instance {
#[tracing::instrument(skip_all)]
async fn logs(
&self,
pipeline_handler: PipelineHandlerRef,
request: ExportLogsServiceRequest,
pipeline: PipelineWay,
pipeline_params: GreptimePipelineParams,
table_name: String,
ctx: QueryContextRef,
) -> ServerResult<Output> {
Expand All @@ -128,7 +130,15 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;

let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?;
let (requests, rows) = otlp::logs::to_grpc_insert_requests(
request,
pipeline,
pipeline_params,
table_name,
&ctx,
pipeline_handler,
)
.await?;

let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
Expand Down
11 changes: 6 additions & 5 deletions src/pipeline/benches/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@
// limitations under the License.

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Result};
use pipeline::{json_to_intermediate_state, parse, Content, GreptimeTransformer, Pipeline, Result};
use serde_json::{Deserializer, Value};

fn processor_mut(
pipeline: &Pipeline<GreptimeTransformer>,
input_values: Vec<Value>,
) -> Result<Vec<greptime_proto::v1::Row>> {
let mut payload = pipeline.init_intermediate_state();
let mut result = Vec::with_capacity(input_values.len());

for v in input_values {
pipeline.prepare(v, &mut payload)?;
let r = pipeline.exec_mut(&mut payload)?;
let mut payload = json_to_intermediate_state(v).unwrap();
let r = pipeline
.exec_mut(&mut payload)?
.into_transformed()
.expect("expect transformed result ");
result.push(r);
pipeline.reset_intermediate_state(&mut payload);
}

Ok(result)
Expand Down
27 changes: 24 additions & 3 deletions src/pipeline/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;

use common_telemetry::debug;
use snafu::OptionExt;
use yaml_rust::Yaml;

use crate::etl::error::{Error, Result};
use crate::etl_error::{
FieldRequiredForDispatcherSnafu, TablePartRequiredForDispatcherRuleSnafu,
use crate::etl::error::{
Error, FieldRequiredForDispatcherSnafu, Result, TablePartRequiredForDispatcherRuleSnafu,
ValueRequiredForDispatcherRuleSnafu,
};
use crate::Value;
Expand Down Expand Up @@ -84,6 +86,7 @@ impl TryFrom<&Yaml> for Dispatcher {
.as_str()
.map(|s| s.to_string())
.context(TablePartRequiredForDispatcherRuleSnafu)?;

let pipeline = rule[PIPELINE].as_str().map(|s| s.to_string());

if rule[VALUE].is_badvalue() {
Expand All @@ -105,3 +108,21 @@ impl TryFrom<&Yaml> for Dispatcher {
Ok(Dispatcher { field, rules })
}
}

impl Dispatcher {
/// execute dispatcher and returns matched rule if any
pub(crate) fn exec(&self, data: &BTreeMap<String, Value>) -> Option<&Rule> {
if let Some(value) = data.get(&self.field) {
for rule in &self.rules {
if rule.value == *value {
return Some(rule);
}
}

None
} else {
debug!("field {} not found in keys {:?}", &self.field, data.keys());
None
}
}
}
Loading