Skip to content

Commit 965b7ab

Browse files
committed
chore: try_eliminate_in_subquery move to decorrelate.rs
Signed-off-by: Kould <[email protected]>
1 parent 50cdea1 commit 965b7ab

File tree

4 files changed

+199
-164
lines changed

4 files changed

+199
-164
lines changed

src/query/sql/src/planner/optimizer/decorrelate/decorrelate.rs

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::collections::BTreeSet;
16+
use std::collections::HashMap;
1617
use std::collections::HashSet;
1718
use std::sync::Arc;
1819

@@ -52,6 +53,9 @@ use crate::plans::RelOperator;
5253
use crate::plans::ScalarExpr;
5354
use crate::plans::SubqueryExpr;
5455
use crate::plans::SubqueryType;
56+
use crate::plans::VisitorMut;
57+
use crate::ColumnBinding;
58+
use crate::ColumnEntry;
5559
use crate::IndexType;
5660
use crate::MetadataRef;
5761

@@ -828,4 +832,178 @@ impl SubqueryRewriter {
828832

829833
Ok(None)
830834
}
835+
836+
pub(crate) fn try_eliminate_in_subquery(
837+
&self,
838+
left: &SExpr,
839+
subquery: &SubqueryExpr,
840+
is_conjunctive_predicate: bool,
841+
) -> Result<Option<ScalarExpr>> {
842+
#[derive(Debug)]
843+
struct BindingReplacer {
844+
new_column_bindings: HashMap<IndexType, ColumnBinding>,
845+
}
846+
847+
impl VisitorMut<'_> for BindingReplacer {
848+
fn visit_bound_column_ref(&mut self, col: &mut BoundColumnRef) -> Result<()> {
849+
if let Some(new_binding) = self.new_column_bindings.get(&col.column.index) {
850+
col.column = new_binding.clone();
851+
}
852+
Ok(())
853+
}
854+
}
855+
856+
// way to address such a pattern for main query.
857+
//
858+
// (1) Scan
859+
//
860+
// (2) Filter
861+
// \
862+
// Scan
863+
let main_query_matchers = [
864+
Matcher::MatchOp {
865+
op_type: RelOp::Scan,
866+
children: vec![],
867+
},
868+
Matcher::MatchOp {
869+
op_type: RelOp::Filter,
870+
children: vec![Matcher::MatchOp {
871+
op_type: RelOp::Scan,
872+
children: vec![],
873+
}],
874+
},
875+
];
876+
// way to address such a pattern for subquery.
877+
//
878+
// (1) EvalScalar
879+
// \
880+
// Scan
881+
//
882+
// (2) EvalScalar
883+
// \
884+
// Filter
885+
// \
886+
// Scan
887+
let subquery_matchers = [
888+
Matcher::MatchOp {
889+
op_type: RelOp::EvalScalar,
890+
children: vec![Matcher::MatchOp {
891+
op_type: RelOp::Scan,
892+
children: vec![],
893+
}],
894+
},
895+
Matcher::MatchOp {
896+
op_type: RelOp::EvalScalar,
897+
children: vec![Matcher::MatchOp {
898+
op_type: RelOp::Filter,
899+
children: vec![Matcher::MatchOp {
900+
op_type: RelOp::Scan,
901+
children: vec![],
902+
}],
903+
}],
904+
},
905+
];
906+
let right_expr_binding = &subquery.output_column;
907+
let (SubqueryType::Any, Some(box ScalarExpr::BoundColumnRef(left_column))) =
908+
(&subquery.typ, &subquery.child_expr)
909+
else {
910+
return Ok(None);
911+
};
912+
if !is_conjunctive_predicate {
913+
return Ok(None);
914+
}
915+
let (Some(left_table_index), Some(right_table_index)) = (
916+
left_column.column.table_index,
917+
right_expr_binding.table_index,
918+
) else {
919+
return Ok(None);
920+
};
921+
// restore possible aliases or duplicate loaded tables by `source_table_id` to determine whether they are the same columns of the same table
922+
let (Some(left_source_binding), Some(right_source_binding)) = (
923+
left_column.column.as_source(),
924+
right_expr_binding.as_source(),
925+
) else {
926+
return Ok(None);
927+
};
928+
if !main_query_matchers
929+
.iter()
930+
.any(|matcher| matcher.matches(left))
931+
|| !subquery_matchers
932+
.iter()
933+
.any(|matcher| matcher.matches(&subquery.subquery))
934+
|| left_source_binding != right_source_binding
935+
{
936+
return Ok(None);
937+
}
938+
let new_column_bindings = {
939+
let guard = self.metadata.read();
940+
941+
let left_columns = guard.columns_by_table_index(left_table_index);
942+
let right_columns = guard.columns_by_table_index(right_table_index);
943+
let left_table = guard.table(left_table_index);
944+
let left_source_table_index = guard
945+
.get_source_table_index(Some(left_table.database()), left_table.table().name());
946+
947+
if left_columns.len() != right_columns.len() {
948+
return Ok(None);
949+
}
950+
let mut new_column_bindings = HashMap::with_capacity(left_columns.len());
951+
for (left_entry, right_entry) in left_columns.into_iter().zip(right_columns.into_iter())
952+
{
953+
let (
954+
ColumnEntry::BaseTableColumn(left_column),
955+
ColumnEntry::BaseTableColumn(right_column),
956+
) = (left_entry, right_entry)
957+
else {
958+
return Ok(None);
959+
};
960+
new_column_bindings.insert(
961+
right_column.column_index,
962+
ColumnBindingBuilder::new(
963+
left_column.column_name,
964+
left_column.column_index,
965+
Box::new(DataType::from(&left_column.data_type)),
966+
Visibility::Visible,
967+
)
968+
.table_name(Some(left_table.table().name().to_string()))
969+
.database_name(Some(left_table.database().to_string()))
970+
.table_index(Some(left_table_index))
971+
.source_table_index(left_source_table_index)
972+
.column_position(left_column.column_position)
973+
.virtual_expr(left_column.virtual_expr)
974+
.build(),
975+
);
976+
}
977+
new_column_bindings
978+
};
979+
let mut scalar_expr = if left_column.column.data_type.is_nullable() {
980+
ScalarExpr::FunctionCall(FunctionCall {
981+
span: None,
982+
func_name: "is_not_null".to_string(),
983+
params: vec![],
984+
arguments: vec![ScalarExpr::BoundColumnRef(left_column.clone())],
985+
})
986+
} else {
987+
ScalarExpr::ConstantExpr(ConstantExpr {
988+
span: None,
989+
value: Scalar::Boolean(true),
990+
})
991+
};
992+
if let RelOperator::Filter(filter) = subquery.subquery.child(0)?.plan() {
993+
let mut replacer = BindingReplacer {
994+
new_column_bindings,
995+
};
996+
for mut expr in filter.predicates.iter().cloned() {
997+
// restore ColumnBinding of same table in Subquery to table in Left
998+
replacer.visit(&mut expr)?;
999+
scalar_expr = ScalarExpr::FunctionCall(FunctionCall {
1000+
span: None,
1001+
func_name: "and".to_string(),
1002+
params: vec![],
1003+
arguments: vec![scalar_expr, expr],
1004+
});
1005+
}
1006+
}
1007+
Ok(Some(scalar_expr))
1008+
}
8311009
}

src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs

Lines changed: 0 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use databend_common_functions::aggregates::AggregateCountFunction;
2828
use crate::binder::wrap_cast;
2929
use crate::binder::ColumnBindingBuilder;
3030
use crate::binder::Visibility;
31-
use crate::optimizer::extract::Matcher;
3231
use crate::optimizer::RelExpr;
3332
use crate::optimizer::SExpr;
3433
use crate::plans::Aggregate;
@@ -44,7 +43,6 @@ use crate::plans::Join;
4443
use crate::plans::JoinEquiCondition;
4544
use crate::plans::JoinType;
4645
use crate::plans::Limit;
47-
use crate::plans::RelOp;
4846
use crate::plans::RelOperator;
4947
use crate::plans::ScalarExpr;
5048
use crate::plans::ScalarItem;
@@ -53,11 +51,8 @@ use crate::plans::SubqueryType;
5351
use crate::plans::UDAFCall;
5452
use crate::plans::UDFCall;
5553
use crate::plans::UDFLambdaCall;
56-
use crate::plans::VisitorMut;
5754
use crate::plans::WindowFuncType;
5855
use crate::Binder;
59-
use crate::ColumnBinding;
60-
use crate::ColumnEntry;
6156
use crate::IndexType;
6257
use crate::MetadataRef;
6358

@@ -462,147 +457,6 @@ impl SubqueryRewriter {
462457
}
463458
}
464459

465-
fn try_eliminate_in_subquery(
466-
&self,
467-
left: &SExpr,
468-
subquery: &SubqueryExpr,
469-
is_conjunctive_predicate: bool,
470-
) -> Result<Option<ScalarExpr>> {
471-
#[derive(Debug)]
472-
struct BindingReplacer {
473-
new_column_bindings: HashMap<IndexType, ColumnBinding>,
474-
}
475-
476-
impl VisitorMut<'_> for BindingReplacer {
477-
fn visit_bound_column_ref(&mut self, col: &mut BoundColumnRef) -> Result<()> {
478-
if let Some(new_binding) = self.new_column_bindings.get(&col.column.index) {
479-
col.column = new_binding.clone();
480-
}
481-
Ok(())
482-
}
483-
}
484-
485-
let matchers = [
486-
Matcher::MatchOp {
487-
op_type: RelOp::EvalScalar,
488-
children: vec![Matcher::MatchOp {
489-
op_type: RelOp::Scan,
490-
children: vec![],
491-
}],
492-
},
493-
Matcher::MatchOp {
494-
op_type: RelOp::EvalScalar,
495-
children: vec![Matcher::MatchOp {
496-
op_type: RelOp::Filter,
497-
children: vec![Matcher::MatchOp {
498-
op_type: RelOp::Scan,
499-
children: vec![],
500-
}],
501-
}],
502-
},
503-
];
504-
let right_expr_binding = &subquery.output_column;
505-
let (SubqueryType::Any, Some(box ScalarExpr::BoundColumnRef(left_column))) =
506-
(&subquery.typ, &subquery.child_expr)
507-
else {
508-
return Ok(None);
509-
};
510-
if !is_conjunctive_predicate {
511-
return Ok(None);
512-
}
513-
let (Some(left_table_index), Some(right_table_index)) = (
514-
left_column.column.table_index,
515-
right_expr_binding.table_index,
516-
) else {
517-
return Ok(None);
518-
};
519-
// restore possible aliases or duplicate loaded tables by `source_table_id` to determine whether they are the same columns of the same table
520-
let (Some(left_source_binding), Some(right_source_binding)) = (
521-
left_column.column.as_source(),
522-
right_expr_binding.as_source(),
523-
) else {
524-
return Ok(None);
525-
};
526-
if !matches!(left.plan(), RelOperator::Scan(_))
527-
|| matchers
528-
.iter()
529-
.all(|matcher| !matcher.matches(&subquery.subquery))
530-
|| left_source_binding != right_source_binding
531-
{
532-
return Ok(None);
533-
}
534-
let new_column_bindings = {
535-
let guard = self.metadata.read();
536-
537-
let left_columns = guard.columns_by_table_index(left_table_index);
538-
let right_columns = guard.columns_by_table_index(right_table_index);
539-
let left_table = guard.table(left_table_index);
540-
let left_source_table_index = guard
541-
.get_source_table_index(Some(left_table.database()), left_table.table().name());
542-
543-
if left_columns.len() != right_columns.len() {
544-
return Ok(None);
545-
}
546-
let mut new_column_bindings = HashMap::with_capacity(left_columns.len());
547-
for (left_entry, right_entry) in left_columns.into_iter().zip(right_columns.into_iter())
548-
{
549-
let (
550-
ColumnEntry::BaseTableColumn(left_column),
551-
ColumnEntry::BaseTableColumn(right_column),
552-
) = (left_entry, right_entry)
553-
else {
554-
return Ok(None);
555-
};
556-
new_column_bindings.insert(
557-
right_column.column_index,
558-
ColumnBindingBuilder::new(
559-
left_column.column_name,
560-
left_column.column_index,
561-
Box::new(DataType::from(&left_column.data_type)),
562-
Visibility::Visible,
563-
)
564-
.table_name(Some(left_table.table().name().to_string()))
565-
.database_name(Some(left_table.database().to_string()))
566-
.table_index(Some(left_table_index))
567-
.source_table_index(left_source_table_index)
568-
.column_position(left_column.column_position)
569-
.virtual_expr(left_column.virtual_expr)
570-
.build(),
571-
);
572-
}
573-
new_column_bindings
574-
};
575-
let mut scalar_expr = if left_column.column.data_type.is_nullable() {
576-
ScalarExpr::FunctionCall(FunctionCall {
577-
span: None,
578-
func_name: "is_not_null".to_string(),
579-
params: vec![],
580-
arguments: vec![ScalarExpr::BoundColumnRef(left_column.clone())],
581-
})
582-
} else {
583-
ScalarExpr::ConstantExpr(ConstantExpr {
584-
span: None,
585-
value: Scalar::Boolean(true),
586-
})
587-
};
588-
if let RelOperator::Filter(filter) = subquery.subquery.child(0)?.plan() {
589-
let mut replacer = BindingReplacer {
590-
new_column_bindings,
591-
};
592-
for mut expr in filter.predicates.iter().cloned() {
593-
// restore ColumnBinding of same table in Subquery to table in Left
594-
replacer.visit(&mut expr)?;
595-
scalar_expr = ScalarExpr::FunctionCall(FunctionCall {
596-
span: None,
597-
func_name: "and".to_string(),
598-
params: vec![],
599-
arguments: vec![scalar_expr, expr],
600-
});
601-
}
602-
}
603-
Ok(Some(scalar_expr))
604-
}
605-
606460
fn try_rewrite_uncorrelated_subquery(
607461
&mut self,
608462
left: &SExpr,

tests/sqllogictests/suites/mode/standalone/explain/explain.test

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1335,24 +1335,6 @@ create OR REPLACE table t1(a int, b int, c varchar(20));
13351335
statement ok
13361336
create OR REPLACE table t2(a int, b int, c varchar(20));
13371337

1338-
# eliminate subquery
1339-
query T
1340-
explain select t3.* from t1 as t3 where t3.c = 'D' and t3.b in (select t4.b from t1 as t4 where t4.a = 7);
1341-
----
1342-
Filter
1343-
├── output columns: [t3.a (#0), t3.b (#1), t3.c (#2)]
1344-
├── filters: [is_true(t3.a (#0) = 7), is_true(t3.c (#2) = 'D')]
1345-
├── estimated rows: 0.00
1346-
└── TableScan
1347-
├── table: default.default.t1
1348-
├── output columns: [a (#0), b (#1), c (#2)]
1349-
├── read rows: 0
1350-
├── read size: 0
1351-
├── partitions total: 0
1352-
├── partitions scanned: 0
1353-
├── push downs: [filters: [and_filters(t1.a (#0) = 7, t1.c (#2) = 'D')], limit: NONE]
1354-
└── estimated rows: 0.00
1355-
13561338
# scalar subquery and sort plan contains count() agg function.
13571339
query T
13581340
explain select * from t2 where c > (select c from t1 where t1.a = t2.a group by c order by count(a));

0 commit comments

Comments
 (0)