Skip to content

Commit 2c071d5

Browse files
committed
feat: subquery to join on not in
Signed-off-by: Kould <[email protected]>
1 parent 83da17f commit 2c071d5

File tree

14 files changed

+511
-30
lines changed

14 files changed

+511
-30
lines changed

src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ impl Binder {
164164
{
165165
let change_type = get_change_type(&table_name_alias);
166166
if change_type.is_some() {
167-
let table_index = self.metadata.write().add_table(
167+
let (table_index, source_table_index) = self.metadata.write().add_table(
168168
catalog,
169169
database.clone(),
170170
table_meta,
@@ -178,6 +178,7 @@ impl Binder {
178178
bind_context,
179179
database.as_str(),
180180
table_index,
181+
source_table_index,
181182
change_type,
182183
sample,
183184
)?;
@@ -281,7 +282,7 @@ impl Binder {
281282
}
282283
}
283284
_ => {
284-
let table_index = self.metadata.write().add_table(
285+
let (table_index, source_table_index) = self.metadata.write().add_table(
285286
catalog.clone(),
286287
database.clone(),
287288
table_meta.clone(),
@@ -296,6 +297,7 @@ impl Binder {
296297
bind_context,
297298
database.as_str(),
298299
table_index,
300+
source_table_index,
299301
None,
300302
sample,
301303
)?;

src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ impl Binder {
143143
} else {
144144
None
145145
};
146-
let table_index = self.metadata.write().add_table(
146+
let (table_index, source_table_index) = self.metadata.write().add_table(
147147
CATALOG_DEFAULT.to_string(),
148148
"system".to_string(),
149149
table.clone(),
@@ -154,8 +154,14 @@ impl Binder {
154154
None,
155155
);
156156

157-
let (s_expr, mut bind_context) =
158-
self.bind_base_table(bind_context, "system", table_index, None, sample)?;
157+
let (s_expr, mut bind_context) = self.bind_base_table(
158+
bind_context,
159+
"system",
160+
table_index,
161+
source_table_index,
162+
None,
163+
sample,
164+
)?;
159165
if let Some(alias) = alias {
160166
bind_context.apply_table_alias(alias, &self.name_resolution_ctx)?;
161167
}
@@ -205,7 +211,7 @@ impl Binder {
205211
None
206212
};
207213

208-
let table_index = self.metadata.write().add_table(
214+
let (table_index, source_table_index) = self.metadata.write().add_table(
209215
CATALOG_DEFAULT.to_string(),
210216
"system".to_string(),
211217
table.clone(),
@@ -216,8 +222,14 @@ impl Binder {
216222
None,
217223
);
218224

219-
let (s_expr, mut bind_context) =
220-
self.bind_base_table(bind_context, "system", table_index, None, &None)?;
225+
let (s_expr, mut bind_context) = self.bind_base_table(
226+
bind_context,
227+
"system",
228+
table_index,
229+
source_table_index,
230+
None,
231+
&None,
232+
)?;
221233
if let Some(alias) = alias {
222234
bind_context.apply_table_alias(alias, &self.name_resolution_ctx)?;
223235
}

src/query/sql/src/planner/binder/column_binding.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ pub struct ColumnBinding {
2828
pub column_position: Option<usize>,
2929
/// Table index of this `ColumnBinding` in current context
3030
pub table_index: Option<IndexType>,
31+
/// Source table index of this `ColumnBinding` in current context
32+
pub source_table_index: Option<IndexType>,
3133
/// Column name of this `ColumnBinding` in current context
3234
pub column_name: String,
3335
/// Column index of ColumnBinding
@@ -72,6 +74,7 @@ impl ColumnBinding {
7274
table_name: None,
7375
column_position: None,
7476
table_index: None,
77+
source_table_index: None,
7578
column_name: name,
7679
index,
7780
data_type,
@@ -83,6 +86,25 @@ impl ColumnBinding {
8386
pub fn is_dummy(&self) -> bool {
8487
self.index >= DummyColumnType::Other.type_identifier()
8588
}
89+
90+
// Only table_index and column_position are retained to determine whether two columns are the same column of the same table.
91+
// Avoid situations where aliases and other situations may cause inability to judge
92+
pub fn as_source(&self) -> Option<ColumnBinding> {
93+
self.source_table_index
94+
.or(self.table_index)
95+
.map(|table_index| ColumnBinding {
96+
database_name: None,
97+
table_name: None,
98+
column_position: self.column_position,
99+
table_index: Some(table_index),
100+
source_table_index: None,
101+
column_name: "".to_string(),
102+
index: 0,
103+
data_type: self.data_type.clone(),
104+
visibility: self.visibility.clone(),
105+
virtual_expr: None,
106+
})
107+
}
86108
}
87109

88110
impl ColumnIndex for ColumnBinding {}
@@ -96,6 +118,8 @@ pub struct ColumnBindingBuilder {
96118
pub column_position: Option<usize>,
97119
/// Table index of this `ColumnBinding` in current context
98120
pub table_index: Option<IndexType>,
121+
/// Source table index of this `ColumnBinding` in current context
122+
pub source_table_index: Option<IndexType>,
99123
/// Column name of this `ColumnBinding` in current context
100124
pub column_name: String,
101125
/// Column index of ColumnBinding
@@ -120,6 +144,7 @@ impl ColumnBindingBuilder {
120144
table_name: None,
121145
column_position: None,
122146
table_index: None,
147+
source_table_index: None,
123148
column_name,
124149
index,
125150
data_type,
@@ -148,6 +173,11 @@ impl ColumnBindingBuilder {
148173
self
149174
}
150175

176+
pub fn source_table_index(mut self, index: Option<IndexType>) -> ColumnBindingBuilder {
177+
self.source_table_index = index;
178+
self
179+
}
180+
151181
pub fn virtual_expr(mut self, virtual_expr: Option<String>) -> ColumnBindingBuilder {
152182
self.virtual_expr = virtual_expr;
153183
self
@@ -159,6 +189,7 @@ impl ColumnBindingBuilder {
159189
table_name: self.table_name,
160190
column_position: self.column_position,
161191
table_index: self.table_index,
192+
source_table_index: self.source_table_index,
162193
column_name: self.column_name,
163194
index: self.index,
164195
data_type: self.data_type,

src/query/sql/src/planner/binder/table.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ impl Binder {
132132
None
133133
};
134134

135-
let table_index = self.metadata.write().add_table(
135+
let (table_index, source_table_index) = self.metadata.write().add_table(
136136
CATALOG_DEFAULT.to_string(),
137137
"system".to_string(),
138138
table.clone(),
@@ -143,8 +143,14 @@ impl Binder {
143143
None,
144144
);
145145

146-
let (s_expr, mut bind_context) =
147-
self.bind_base_table(bind_context, "system", table_index, None, &None)?;
146+
let (s_expr, mut bind_context) = self.bind_base_table(
147+
bind_context,
148+
"system",
149+
table_index,
150+
source_table_index,
151+
None,
152+
&None,
153+
)?;
148154
if let Some(alias) = alias {
149155
bind_context.apply_table_alias(alias, &self.name_resolution_ctx)?;
150156
}
@@ -353,6 +359,7 @@ impl Binder {
353359
bind_context: &BindContext,
354360
database_name: &str,
355361
table_index: IndexType,
362+
source_table_index: Option<IndexType>,
356363
change_type: Option<ChangeType>,
357364
sample: &Option<SampleConfig>,
358365
) -> Result<(SExpr, BindContext)> {
@@ -388,6 +395,7 @@ impl Binder {
388395
.table_name(Some(table_name.to_string()))
389396
.database_name(Some(database_name.to_string()))
390397
.table_index(Some(*table_index))
398+
.source_table_index(source_table_index)
391399
.column_position(*column_position)
392400
.virtual_expr(virtual_expr.clone())
393401
.build();

src/query/sql/src/planner/dataframe.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ impl Dataframe {
9393
query_ctx.clone().get_abort_checker(),
9494
)?;
9595

96-
let table_index = metadata.write().add_table(
96+
let (table_index, source_table_index) = metadata.write().add_table(
9797
CATALOG_DEFAULT.to_owned(),
9898
database.to_string(),
9999
table_meta,
@@ -104,7 +104,14 @@ impl Dataframe {
104104
None,
105105
);
106106

107-
binder.bind_base_table(&bind_context, database, table_index, None, &None)
107+
binder.bind_base_table(
108+
&bind_context,
109+
database,
110+
table_index,
111+
source_table_index,
112+
None,
113+
&None,
114+
)
108115
} else {
109116
binder.bind_table_reference(&mut bind_context, &table)
110117
}?;

src/query/sql/src/planner/expression_parser.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ use crate::Visibility;
6060
pub fn bind_table(table_meta: Arc<dyn Table>) -> Result<(BindContext, MetadataRef)> {
6161
let mut bind_context = BindContext::new();
6262
let metadata = Arc::new(RwLock::new(Metadata::default()));
63-
let table_index = metadata.write().add_table(
63+
let (table_index, _) = metadata.write().add_table(
6464
CATALOG_DEFAULT.to_owned(),
6565
"default".to_string(),
6666
table_meta,

src/query/sql/src/planner/metadata.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,25 @@ impl Metadata {
123123
.map(|table| table.index)
124124
}
125125

126+
fn get_source_table_index(
127+
&self,
128+
database_name: Option<&str>,
129+
table_name: &str,
130+
) -> Option<IndexType> {
131+
self.tables
132+
.iter()
133+
.find(|table| match database_name {
134+
Some(database_name) => {
135+
table.database == database_name && table.name == table_name
136+
|| table.alias_name == Some(table_name.to_string())
137+
}
138+
None => {
139+
table.name == table_name || table.alias_name == Some(table_name.to_string())
140+
}
141+
})
142+
.map(|table| table.index)
143+
}
144+
126145
pub fn column(&self, index: IndexType) -> &ColumnEntry {
127146
self.columns
128147
.get(index)
@@ -358,9 +377,11 @@ impl Metadata {
358377
source_of_index: bool,
359378
source_of_stage: bool,
360379
cte_suffix_name: Option<String>,
361-
) -> IndexType {
380+
) -> (IndexType, Option<IndexType>) {
362381
let table_name = table_meta.name().to_string();
363382
let table_name = Self::remove_cte_suffix(table_name, cte_suffix_name);
383+
let source_table_index =
384+
self.get_source_table_index(Some(database.as_str()), table_name.as_str());
364385

365386
let table_index = self.tables.len();
366387
// If exists table alias name, use it instead of origin name
@@ -455,7 +476,7 @@ impl Metadata {
455476
}
456477
}
457478

458-
table_index
479+
(table_index, source_table_index)
459480
}
460481

461482
pub fn change_derived_column_alias(&mut self, index: IndexType, alias: String) {

src/query/sql/src/planner/optimizer/aggregate/normalize_aggregate.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ impl RuleNormalizeAggregateOptimizer {
143143
column: ColumnBinding {
144144
table_name: None,
145145
table_index: None,
146+
source_table_index: None,
146147
database_name: None,
147148
column_position: None,
148149
index: work_index,

src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -601,20 +601,20 @@ impl SubqueryRewriter {
601601
SubqueryType::Any => {
602602
let output_column = subquery.output_column.clone();
603603
let column_name = format!("subquery_{}", output_column.index);
604-
let left_condition = wrap_cast(
605-
&ScalarExpr::BoundColumnRef(BoundColumnRef {
606-
span: subquery.span,
607-
column: ColumnBindingBuilder::new(
608-
column_name,
609-
output_column.index,
610-
output_column.data_type,
611-
Visibility::Visible,
612-
)
613-
.table_index(output_column.table_index)
614-
.build(),
615-
}),
616-
&subquery.data_type,
617-
);
604+
let mut left_condition = ScalarExpr::BoundColumnRef(BoundColumnRef {
605+
span: subquery.span,
606+
column: ColumnBindingBuilder::new(
607+
column_name,
608+
output_column.index,
609+
output_column.data_type,
610+
Visibility::Visible,
611+
)
612+
.table_index(output_column.table_index)
613+
.build(),
614+
});
615+
if !left_condition.data_type()?.eq(&subquery.data_type) {
616+
left_condition = wrap_cast(&left_condition, &subquery.data_type)
617+
}
618618
let child_expr = *subquery.child_expr.as_ref().unwrap().clone();
619619
let op = *subquery.compare_op.as_ref().unwrap();
620620
let (right_condition, is_non_equi_condition) =

src/query/sql/src/planner/optimizer/rule/factory.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use super::rewrite::RulePushDownSortEvalScalar;
4646
use super::rewrite::RulePushDownSortScan;
4747
use super::rewrite::RuleSemiToInnerJoin;
4848
use super::rewrite::RuleSplitAggregate;
49+
use super::rewrite::RuleSubqueryNotInToIn;
4950
use super::rewrite::RuleTryApplyAggIndex;
5051
use super::transform::RuleCommuteJoinBaseTable;
5152
use super::transform::RuleEagerAggregation;
@@ -111,6 +112,7 @@ impl RuleFactory {
111112
RuleID::MergeFilterIntoMutation => {
112113
Ok(Box::new(RuleMergeFilterIntoMutation::new(ctx.metadata)))
113114
}
115+
RuleID::SubqueryNotInToIn => Ok(Box::new(RuleSubqueryNotInToIn::new(ctx.metadata))),
114116
}
115117
}
116118
}

0 commit comments

Comments
 (0)