Skip to content

Commit 7f0a71f

Browse files
committed
refactor: Refactor Tuple's id to make Update support complex queries
1 parent 660dd14 commit 7f0a71f

37 files changed

+674
-198
lines changed

kite_sql_serde_macros/src/reference_serialization.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,26 @@ fn process_type(ty: &Type) -> TokenStream {
4747
}
4848
}
4949
}
50+
"BTreeMap" => {
51+
if let PathArguments::AngleBracketed(AngleBracketedGenericArguments {
52+
args, ..
53+
}) = &path.segments.last().unwrap().arguments
54+
{
55+
let mut iter = args.iter();
56+
if let (
57+
Some(GenericArgument::Type(inner_ty_0)),
58+
Some(GenericArgument::Type(inner_ty_1)),
59+
) = (iter.next(), iter.next())
60+
{
61+
let inner_processed_0 = process_type(inner_ty_0);
62+
let inner_processed_1 = process_type(inner_ty_1);
63+
64+
return quote! {
65+
#ident::<#inner_processed_0, #inner_processed_1>
66+
};
67+
}
68+
}
69+
}
5070
_ => {}
5171
}
5272

@@ -72,7 +92,7 @@ pub(crate) fn handle(ast: DeriveInput) -> Result<TokenStream, Error> {
7292

7393
let field_name = field_opts
7494
.ident
75-
.unwrap_or_else(|| Ident::new(&format!("filed_{}", i), Span::call_site()));
95+
.unwrap_or_else(|| Ident::new(&format!("field_{}", i), Span::call_site()));
7696
let ty = process_type(&field_opts.ty);
7797

7898
encode_fields.push(quote! {
@@ -135,7 +155,7 @@ pub(crate) fn handle(ast: DeriveInput) -> Result<TokenStream, Error> {
135155

136156
let field_name = field_opts
137157
.ident
138-
.unwrap_or_else(|| Ident::new(&format!("filed_{}", i), Span::call_site()));
158+
.unwrap_or_else(|| Ident::new(&format!("field_{}", i), Span::call_site()));
139159
let ty = process_type(&field_opts.ty);
140160

141161
encode_fields.push(quote! {

src/binder/alter_table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
3030
if_not_exists,
3131
column_def,
3232
} => {
33-
let plan = TableScanOperator::build(table_name.clone(), table);
33+
let plan = TableScanOperator::build(table_name.clone(), table, true);
3434
let column = self.bind_column(column_def, None)?;
3535

3636
if !is_valid_identifier(column.name()) {
@@ -52,7 +52,7 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
5252
if_exists,
5353
..
5454
} => {
55-
let plan = TableScanOperator::build(table_name.clone(), table);
55+
let plan = TableScanOperator::build(table_name.clone(), table, true);
5656
let column_name = column_name.value.clone();
5757

5858
LogicalPlan::new(

src/binder/analyze.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
2626
.ok_or(DatabaseError::TableNotFound)?;
2727
let index_metas = table.indexes.clone();
2828

29-
let scan_op = TableScanOperator::build(table_name.clone(), table);
29+
let scan_op = TableScanOperator::build(table_name.clone(), table, false);
3030
Ok(LogicalPlan::new(
3131
Operator::Analyze(AnalyzeOperator {
3232
table_name,

src/binder/copy.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
105105
target: ext_source,
106106
schema_ref,
107107
}),
108-
Childrens::Only(TableScanOperator::build(table_name, table)),
108+
Childrens::Only(TableScanOperator::build(table_name, table, false)),
109109
))
110110
} else {
111111
// COPY <dest_table> FROM <source_file>

src/binder/create_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
3535
.source_and_bind(table_name.clone(), None, None, false)?
3636
.ok_or(DatabaseError::SourceNotFound)?;
3737
let plan = match source {
38-
Source::Table(table) => TableScanOperator::build(table_name.clone(), table),
38+
Source::Table(table) => TableScanOperator::build(table_name.clone(), table, true),
3939
Source::View(view) => LogicalPlan::clone(&view.plan),
4040
};
4141
let mut columns = Vec::with_capacity(exprs.len());

src/binder/delete.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
3737
.iter()
3838
.map(|(_, column)| column.clone())
3939
.collect_vec();
40-
let mut plan = TableScanOperator::build(table_name.clone(), table);
40+
let mut plan = TableScanOperator::build(table_name.clone(), table, true);
4141

4242
if let Some(alias_idents) = alias_idents {
4343
plan =

src/binder/expr.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -538,13 +538,12 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
538538
if args.len() != 1 {
539539
return Err(DatabaseError::MisMatch("number of avg() parameters", "1"));
540540
}
541-
let ty = args[0].return_type();
542541

543542
return Ok(ScalarExpression::AggCall {
544543
distinct: func.distinct,
545544
kind: AggKind::Avg,
546545
args,
547-
ty,
546+
ty: LogicalType::Double,
548547
});
549548
}
550549
"if" => {

src/binder/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ pub struct Binder<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>>
317317
context: BinderContext<'a, T>,
318318
table_schema_buf: HashMap<TableName, Option<SchemaOutput>>,
319319
args: &'a A,
320+
with_pk: Option<TableName>,
320321
pub(crate) parent: Option<&'b Binder<'a, 'b, T, A>>,
321322
}
322323

@@ -330,10 +331,22 @@ impl<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '
330331
context,
331332
table_schema_buf: Default::default(),
332333
args,
334+
with_pk: None,
333335
parent,
334336
}
335337
}
336338

339+
pub fn with_pk(&mut self, table_name: TableName) {
340+
self.with_pk = Some(table_name);
341+
}
342+
343+
pub fn is_scan_with_pk(&self, table_name: &TableName) -> bool {
344+
if let Some(with_pk_table) = self.with_pk.as_ref() {
345+
return with_pk_table == table_name;
346+
}
347+
false
348+
}
349+
337350
pub fn bind(&mut self, stmt: &Statement) -> Result<LogicalPlan, DatabaseError> {
338351
let plan = match stmt {
339352
Statement::Query(query) => self.bind_query(query)?,

src/binder/select.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,12 +389,13 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
389389
alias_idents = Some(columns);
390390
}
391391

392+
let with_pk = self.is_scan_with_pk(&table_name);
392393
let source = self
393394
.context
394395
.source_and_bind(table_name.clone(), table_alias.as_ref(), join_type, false)?
395396
.ok_or(DatabaseError::SourceNotFound)?;
396397
let mut plan = match source {
397-
Source::Table(table) => TableScanOperator::build(table_name.clone(), table),
398+
Source::Table(table) => TableScanOperator::build(table_name.clone(), table, with_pk),
398399
Source::View(view) => LogicalPlan::clone(&view.plan),
399400
};
400401

src/binder/update.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
2121
self.context.allow_default = true;
2222
if let TableFactor::Table { name, .. } = &to.relation {
2323
let table_name = Arc::new(lower_case_name(name)?);
24+
self.with_pk(table_name.clone());
2425

2526
let mut plan = self.bind_table_ref(to)?;
2627

0 commit comments

Comments
 (0)