diff --git a/src/log-query/src/log_query.rs b/src/log-query/src/log_query.rs index a79efc78eeca..988c9c27a9b4 100644 --- a/src/log-query/src/log_query.rs +++ b/src/log-query/src/log_query.rs @@ -24,16 +24,73 @@ use crate::error::{ /// GreptimeDB's log query request. #[derive(Debug, Serialize, Deserialize)] pub struct LogQuery { + // Global query parameters /// A fully qualified table name to query logs from. pub table: TableName, /// Specifies the time range for the log query. See [`TimeFilter`] for more details. pub time_filter: TimeFilter, - /// Columns with filters to query. - pub columns: Vec, - /// Controls row skipping and fetch count for logs. + /// Controls row skipping and fetch on the result set. pub limit: Limit, - /// Adjacent lines to return. + /// Columns to return in the result set. + /// + /// The columns can be either from the original log or derived from processing exprs. + /// Default (empty) means all columns. + /// + /// TODO(ruihang): Do we need negative select? + pub columns: Vec, + + // Filters + /// Conjunction of filters to apply for the raw logs. + /// + /// Filters here can only refer to the columns from the original log. + pub filters: Vec, + /// Adjacent lines to return. Applies to all filters above. + /// + /// TODO(ruihang): Do we need per-filter context? pub context: Context, + + // Processors + /// Expressions to calculate after filter. + pub exprs: Vec, +} + +/// Expression to calculate on log after filtering. +#[derive(Debug, Serialize, Deserialize)] +pub enum LogExpr { + NamedIdent(String), + PositionalIdent(usize), + Literal(String), + ScalarFunc { + name: String, + args: Vec, + }, + AggrFunc { + name: String, + args: Vec, + /// Optional range function parameter. Stands for the time range for both step and align. + range: Option, + by: Vec, + }, + Decompose { + expr: Box, + /// JSON, CSV, etc. + schema: String, + /// Fields with type name to extract from the decomposed value. + fields: Vec<(String, String)>, + }, + BinaryOp { + left: Box, + op: String, + right: Box, + }, + Alias { + expr: Box, + alias: String, + }, + Filter { + expr: Box, + filter: ContentFilter, + }, } impl Default for LogQuery { @@ -41,9 +98,11 @@ impl Default for LogQuery { Self { table: TableName::new("", "", ""), time_filter: Default::default(), - columns: vec![], + filters: vec![], limit: Limit::default(), context: Default::default(), + columns: vec![], + exprs: vec![], } } } @@ -232,6 +291,7 @@ pub struct ColumnFilters { #[derive(Debug, Serialize, Deserialize)] pub enum ContentFilter { + // Search-based filters /// Only match the exact content. /// /// For example, if the content is "pale blue dot", the filter "pale" or "pale blue" will match. @@ -246,6 +306,14 @@ pub enum ContentFilter { Contains(String), /// Match the content with a regex pattern. The pattern should be a valid Rust regex. Regex(String), + + // Value-based filters + /// Content exists, a.k.a. not null. + Exist, + Between(String, String), + // TODO(ruihang): arithmetic operations + + // Compound filters Compound(Vec, BinaryOperator), } diff --git a/src/query/src/log_query/planner.rs b/src/query/src/log_query/planner.rs index e19356e44400..79474fab53cb 100644 --- a/src/query/src/log_query/planner.rs +++ b/src/query/src/log_query/planner.rs @@ -69,13 +69,11 @@ impl LogQueryPlanner { // Time filter filters.push(self.build_time_filter(&query.time_filter, &schema)?); - // Column filters and projections - let mut projected_columns = Vec::new(); - for column_filter in &query.columns { + // Column filters + for column_filter in &query.filters { if let Some(expr) = self.build_column_filter(column_filter)? { filters.push(expr); } - projected_columns.push(col(&column_filter.column_name)); } // Apply filters @@ -87,9 +85,12 @@ impl LogQueryPlanner { } // Apply projections - plan_builder = plan_builder - .project(projected_columns) - .context(DataFusionPlanningSnafu)?; + if !query.columns.is_empty() { + let projected_columns = query.columns.iter().map(col).collect::>(); + plan_builder = plan_builder + .project(projected_columns) + .context(DataFusionPlanningSnafu)?; + } // Apply limit plan_builder = plan_builder @@ -159,6 +160,17 @@ impl LogQueryPlanner { } .build(), ), + log_query::ContentFilter::Exist => { + Ok(col(&column_filter.column_name).is_not_null()) + } + log_query::ContentFilter::Between(lower, upper) => { + Ok(col(&column_filter.column_name) + .gt_eq(lit(ScalarValue::Utf8(Some(escape_like_pattern(lower))))) + .and( + col(&column_filter.column_name) + .lt_eq(lit(ScalarValue::Utf8(Some(escape_like_pattern(upper))))), + )) + } log_query::ContentFilter::Compound(..) => Err::( UnimplementedSnafu { feature: "compound filter", @@ -267,7 +279,7 @@ mod tests { end: Some("2021-01-02T00:00:00Z".to_string()), span: None, }, - columns: vec![ColumnFilters { + filters: vec![ColumnFilters { column_name: "message".to_string(), filters: vec![ContentFilter::Contains("error".to_string())], }], @@ -276,13 +288,14 @@ mod tests { fetch: Some(100), }, context: Context::None, + columns: vec![], + exprs: vec![], }; let plan = planner.query_to_plan(log_query).await.unwrap(); - let expected = "Limit: skip=0, fetch=100 [message:Utf8]\ -\n Projection: greptime.public.test_table.message [message:Utf8]\ -\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ -\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; + let expected = "Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ +\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ +\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; assert_eq!(plan.display_indent_schema().to_string(), expected); } @@ -380,7 +393,7 @@ mod tests { end: Some("2021-01-02T00:00:00Z".to_string()), span: None, }, - columns: vec![ColumnFilters { + filters: vec![ColumnFilters { column_name: "message".to_string(), filters: vec![ContentFilter::Contains("error".to_string())], }], @@ -389,13 +402,14 @@ mod tests { fetch: None, }, context: Context::None, + columns: vec![], + exprs: vec![], }; let plan = planner.query_to_plan(log_query).await.unwrap(); - let expected = "Limit: skip=10, fetch=1000 [message:Utf8]\ -\n Projection: greptime.public.test_table.message [message:Utf8]\ -\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ -\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; + let expected = "Limit: skip=10, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ +\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ +\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; assert_eq!(plan.display_indent_schema().to_string(), expected); } @@ -413,7 +427,7 @@ mod tests { end: Some("2021-01-02T00:00:00Z".to_string()), span: None, }, - columns: vec![ColumnFilters { + filters: vec![ColumnFilters { column_name: "message".to_string(), filters: vec![ContentFilter::Contains("error".to_string())], }], @@ -422,13 +436,14 @@ mod tests { fetch: None, }, context: Context::None, + columns: vec![], + exprs: vec![], }; let plan = planner.query_to_plan(log_query).await.unwrap(); - let expected = "Limit: skip=0, fetch=1000 [message:Utf8]\ -\n Projection: greptime.public.test_table.message [message:Utf8]\ -\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ -\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; + let expected = "Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ +\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ +\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; assert_eq!(plan.display_indent_schema().to_string(), expected); }