Skip to content

Commit

Permalink
feat: refine log query AST (#5316)
Browse files Browse the repository at this point in the history
* draft

Signed-off-by: Ruihang Xia <[email protected]>

* impl planner part

Signed-off-by: Ruihang Xia <[email protected]>

* feat: tweak aggr func

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy

Signed-off-by: Ruihang Xia <[email protected]>

* todo about context

Signed-off-by: Ruihang Xia <[email protected]>

* fix test

Signed-off-by: Ruihang Xia <[email protected]>

* rename log expr

Signed-off-by: Ruihang Xia <[email protected]>

* sign todo

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Jan 15, 2025
1 parent 121ec79 commit 4f29e50
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 27 deletions.
78 changes: 73 additions & 5 deletions src/log-query/src/log_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,85 @@ use crate::error::{
/// GreptimeDB's log query request.
#[derive(Debug, Serialize, Deserialize)]
pub struct LogQuery {
// Global query parameters
/// A fully qualified table name to query logs from.
pub table: TableName,
/// Specifies the time range for the log query. See [`TimeFilter`] for more details.
pub time_filter: TimeFilter,
/// Columns with filters to query.
pub columns: Vec<ColumnFilters>,
/// Controls row skipping and fetch count for logs.
/// Controls row skipping and fetch on the result set.
pub limit: Limit,
/// Adjacent lines to return.
/// Columns to return in the result set.
///
/// The columns can be either from the original log or derived from processing exprs.
/// Default (empty) means all columns.
///
/// TODO(ruihang): Do we need negative select?
pub columns: Vec<String>,

// Filters
/// Conjunction of filters to apply for the raw logs.
///
/// Filters here can only refer to the columns from the original log.
pub filters: Vec<ColumnFilters>,
/// Adjacent lines to return. Applies to all filters above.
///
/// TODO(ruihang): Do we need per-filter context?
pub context: Context,

// Processors
/// Expressions to calculate after filter.
pub exprs: Vec<LogExpr>,
}

/// Expression to calculate on log after filtering.
#[derive(Debug, Serialize, Deserialize)]
pub enum LogExpr {
NamedIdent(String),
PositionalIdent(usize),
Literal(String),
ScalarFunc {
name: String,
args: Vec<LogExpr>,
},
AggrFunc {
name: String,
args: Vec<LogExpr>,
/// Optional range function parameter. Stands for the time range for both step and align.
range: Option<String>,
by: Vec<LogExpr>,
},
Decompose {
expr: Box<LogExpr>,
/// JSON, CSV, etc.
schema: String,
/// Fields with type name to extract from the decomposed value.
fields: Vec<(String, String)>,
},
BinaryOp {
left: Box<LogExpr>,
op: String,
right: Box<LogExpr>,
},
Alias {
expr: Box<LogExpr>,
alias: String,
},
Filter {
expr: Box<LogExpr>,
filter: ContentFilter,
},
}

impl Default for LogQuery {
fn default() -> Self {
Self {
table: TableName::new("", "", ""),
time_filter: Default::default(),
columns: vec![],
filters: vec![],
limit: Limit::default(),
context: Default::default(),
columns: vec![],
exprs: vec![],
}
}
}
Expand Down Expand Up @@ -232,6 +291,7 @@ pub struct ColumnFilters {

#[derive(Debug, Serialize, Deserialize)]
pub enum ContentFilter {
// Search-based filters
/// Only match the exact content.
///
/// For example, if the content is "pale blue dot", the filter "pale" or "pale blue" will match.
Expand All @@ -246,6 +306,14 @@ pub enum ContentFilter {
Contains(String),
/// Match the content with a regex pattern. The pattern should be a valid Rust regex.
Regex(String),

// Value-based filters
/// Content exists, a.k.a. not null.
Exist,
Between(String, String),
// TODO(ruihang): arithmetic operations

// Compound filters
Compound(Vec<ContentFilter>, BinaryOperator),
}

Expand Down
59 changes: 37 additions & 22 deletions src/query/src/log_query/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,11 @@ impl LogQueryPlanner {
// Time filter
filters.push(self.build_time_filter(&query.time_filter, &schema)?);

// Column filters and projections
let mut projected_columns = Vec::new();
for column_filter in &query.columns {
// Column filters
for column_filter in &query.filters {
if let Some(expr) = self.build_column_filter(column_filter)? {
filters.push(expr);
}
projected_columns.push(col(&column_filter.column_name));
}

// Apply filters
Expand All @@ -87,9 +85,12 @@ impl LogQueryPlanner {
}

// Apply projections
plan_builder = plan_builder
.project(projected_columns)
.context(DataFusionPlanningSnafu)?;
if !query.columns.is_empty() {
let projected_columns = query.columns.iter().map(col).collect::<Vec<_>>();
plan_builder = plan_builder
.project(projected_columns)
.context(DataFusionPlanningSnafu)?;
}

// Apply limit
plan_builder = plan_builder
Expand Down Expand Up @@ -159,6 +160,17 @@ impl LogQueryPlanner {
}
.build(),
),
log_query::ContentFilter::Exist => {
Ok(col(&column_filter.column_name).is_not_null())
}
log_query::ContentFilter::Between(lower, upper) => {
Ok(col(&column_filter.column_name)
.gt_eq(lit(ScalarValue::Utf8(Some(escape_like_pattern(lower)))))
.and(
col(&column_filter.column_name)
.lt_eq(lit(ScalarValue::Utf8(Some(escape_like_pattern(upper))))),
))
}
log_query::ContentFilter::Compound(..) => Err::<Expr, _>(
UnimplementedSnafu {
feature: "compound filter",
Expand Down Expand Up @@ -267,7 +279,7 @@ mod tests {
end: Some("2021-01-02T00:00:00Z".to_string()),
span: None,
},
columns: vec![ColumnFilters {
filters: vec![ColumnFilters {
column_name: "message".to_string(),
filters: vec![ContentFilter::Contains("error".to_string())],
}],
Expand All @@ -276,13 +288,14 @@ mod tests {
fetch: Some(100),
},
context: Context::None,
columns: vec![],
exprs: vec![],
};

let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Limit: skip=0, fetch=100 [message:Utf8]\
\n Projection: greptime.public.test_table.message [message:Utf8]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
let expected = "Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";

assert_eq!(plan.display_indent_schema().to_string(), expected);
}
Expand Down Expand Up @@ -380,7 +393,7 @@ mod tests {
end: Some("2021-01-02T00:00:00Z".to_string()),
span: None,
},
columns: vec![ColumnFilters {
filters: vec![ColumnFilters {
column_name: "message".to_string(),
filters: vec![ContentFilter::Contains("error".to_string())],
}],
Expand All @@ -389,13 +402,14 @@ mod tests {
fetch: None,
},
context: Context::None,
columns: vec![],
exprs: vec![],
};

let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Limit: skip=10, fetch=1000 [message:Utf8]\
\n Projection: greptime.public.test_table.message [message:Utf8]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
let expected = "Limit: skip=10, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";

assert_eq!(plan.display_indent_schema().to_string(), expected);
}
Expand All @@ -413,7 +427,7 @@ mod tests {
end: Some("2021-01-02T00:00:00Z".to_string()),
span: None,
},
columns: vec![ColumnFilters {
filters: vec![ColumnFilters {
column_name: "message".to_string(),
filters: vec![ContentFilter::Contains("error".to_string())],
}],
Expand All @@ -422,13 +436,14 @@ mod tests {
fetch: None,
},
context: Context::None,
columns: vec![],
exprs: vec![],
};

let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Limit: skip=0, fetch=1000 [message:Utf8]\
\n Projection: greptime.public.test_table.message [message:Utf8]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
let expected = "Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";

assert_eq!(plan.display_indent_schema().to_string(), expected);
}
Expand Down

0 comments on commit 4f29e50

Please sign in to comment.