Skip to content

Commit 8fc7b6c

Browse files
committed
refactor: change from rules to direct processing InSubquery in subquery rewriter to avoid incorrect matching plan
Signed-off-by: Kould <[email protected]>
1 parent 17ca5a2 commit 8fc7b6c

File tree

5 files changed

+194
-304
lines changed

5 files changed

+194
-304
lines changed

src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::plans::CastExpr;
3737
use crate::plans::ComparisonOp;
3838
use crate::plans::ConstantExpr;
3939
use crate::plans::EvalScalar;
40+
use crate::plans::Exchange;
4041
use crate::plans::Filter;
4142
use crate::plans::FunctionCall;
4243
use crate::plans::Join;
@@ -51,8 +52,11 @@ use crate::plans::SubqueryType;
5152
use crate::plans::UDAFCall;
5253
use crate::plans::UDFCall;
5354
use crate::plans::UDFLambdaCall;
55+
use crate::plans::VisitorMut;
5456
use crate::plans::WindowFuncType;
5557
use crate::Binder;
58+
use crate::ColumnBinding;
59+
use crate::ColumnEntry;
5660
use crate::IndexType;
5761
use crate::MetadataRef;
5862

@@ -271,6 +275,11 @@ impl SubqueryRewriter {
271275
from_count_func: false,
272276
};
273277
let (s_expr, result) = if prop.outer_columns.is_empty() {
278+
if let Some((scalar_expr, s_expr)) =
279+
self.try_eliminate_in_subquery(s_expr, &subquery)?
280+
{
281+
return Ok((scalar_expr, s_expr));
282+
}
274283
self.try_rewrite_uncorrelated_subquery(
275284
s_expr,
276285
&subquery,
@@ -453,6 +462,191 @@ impl SubqueryRewriter {
453462
}
454463
}
455464

465+
fn try_eliminate_in_subquery(
466+
&self,
467+
left: &SExpr,
468+
subquery: &SubqueryExpr,
469+
) -> Result<Option<(ScalarExpr, SExpr)>> {
470+
#[derive(Debug)]
471+
struct SubqueryBodyVisitor {
472+
replacer: BindingReplacer,
473+
failed: bool,
474+
new_s_expr: Option<SExpr>,
475+
}
476+
477+
impl SubqueryBodyVisitor {
478+
fn new(new_column_bindings: HashMap<IndexType, ColumnBinding>, expr: SExpr) -> Self {
479+
Self {
480+
replacer: BindingReplacer {
481+
new_column_bindings,
482+
},
483+
failed: false,
484+
new_s_expr: Some(expr),
485+
}
486+
}
487+
488+
fn visit(&mut self, s_expr: &SExpr) -> Result<()> {
489+
for visit in s_expr.children() {
490+
self.visit(visit)?;
491+
}
492+
if self.failed {
493+
return Ok(());
494+
}
495+
if !matches!(
496+
s_expr.plan(),
497+
RelOperator::Scan(_)
498+
| RelOperator::Filter(_)
499+
| RelOperator::EvalScalar(_)
500+
| RelOperator::Sort(_)
501+
| RelOperator::Exchange(_)
502+
) {
503+
self.failed = true;
504+
return Ok(());
505+
}
506+
match s_expr.plan() {
507+
RelOperator::Scan(_) | RelOperator::Sort(_) => (),
508+
RelOperator::EvalScalar(eval_scalar) => {
509+
let mut eval_scalar = eval_scalar.clone();
510+
511+
for scalar_item in eval_scalar.items.iter_mut() {
512+
self.replacer.visit(&mut scalar_item.scalar)?;
513+
}
514+
self.build_s_expr(RelOperator::EvalScalar(eval_scalar));
515+
}
516+
RelOperator::Filter(filter) => {
517+
let mut filter = filter.clone();
518+
519+
for scalar_expr in filter.predicates.iter_mut() {
520+
self.replacer.visit(scalar_expr)?;
521+
}
522+
self.build_s_expr(RelOperator::Filter(filter));
523+
}
524+
RelOperator::Exchange(exchange) => {
525+
let mut exchange = exchange.clone();
526+
if let Exchange::Hash(scalar_exprs) = &mut exchange {
527+
for scalar_expr in scalar_exprs {
528+
self.replacer.visit(scalar_expr)?;
529+
}
530+
}
531+
self.build_s_expr(RelOperator::Exchange(exchange));
532+
}
533+
_ => {
534+
self.failed = true;
535+
return Ok(());
536+
}
537+
}
538+
Ok(())
539+
}
540+
541+
fn build_s_expr(&mut self, op: RelOperator) {
542+
self.new_s_expr = Some(match self.new_s_expr.take() {
543+
None => SExpr::create_leaf(Arc::new(op)),
544+
Some(child_expr) => SExpr::create_unary(Arc::new(op), Arc::new(child_expr)),
545+
});
546+
}
547+
548+
fn result(self) -> Option<SExpr> {
549+
(!self.failed).then_some(self.new_s_expr).flatten()
550+
}
551+
}
552+
553+
#[derive(Debug)]
554+
struct BindingReplacer {
555+
new_column_bindings: HashMap<IndexType, ColumnBinding>,
556+
}
557+
558+
impl VisitorMut<'_> for BindingReplacer {
559+
fn visit_bound_column_ref(&mut self, col: &mut BoundColumnRef) -> Result<()> {
560+
if let Some(new_binding) = self.new_column_bindings.get(&col.column.index) {
561+
col.column = new_binding.clone();
562+
}
563+
Ok(())
564+
}
565+
}
566+
567+
let right_expr_binding = &subquery.output_column;
568+
let (SubqueryType::Any, Some(box ScalarExpr::BoundColumnRef(left_column))) =
569+
(&subquery.typ, &subquery.child_expr)
570+
else {
571+
return Ok(None);
572+
};
573+
let (Some(left_table_index), Some(right_table_index)) = (
574+
left_column.column.table_index,
575+
right_expr_binding.table_index,
576+
) else {
577+
return Ok(None);
578+
};
579+
// restore possible aliases or duplicate loaded tables by `source_table_id` to determine whether they are the same columns of the same table
580+
let (Some(left_source_binding), Some(right_source_binding)) = (
581+
left_column.column.as_source(),
582+
right_expr_binding.as_source(),
583+
) else {
584+
return Ok(None);
585+
};
586+
if left_source_binding != right_source_binding {
587+
return Ok(None);
588+
}
589+
let new_column_bindings = {
590+
let guard = self.metadata.read();
591+
592+
let left_columns = guard.columns_by_table_index(left_table_index);
593+
let right_columns = guard.columns_by_table_index(right_table_index);
594+
let left_table = guard.table(left_table_index);
595+
// filter table function
596+
if left_table.database() == "system"
597+
|| guard.table(right_table_index).database() == "system"
598+
{
599+
return Ok(None);
600+
}
601+
let left_source_table_index = guard
602+
.get_source_table_index(Some(left_table.database()), left_table.table().name());
603+
604+
if left_columns.len() != right_columns.len() {
605+
return Ok(None);
606+
}
607+
let mut new_column_bindings = HashMap::with_capacity(left_columns.len());
608+
for (left_entry, right_entry) in left_columns.into_iter().zip(right_columns.into_iter())
609+
{
610+
let (
611+
ColumnEntry::BaseTableColumn(left_column),
612+
ColumnEntry::BaseTableColumn(right_column),
613+
) = (left_entry, right_entry)
614+
else {
615+
return Ok(None);
616+
};
617+
new_column_bindings.insert(
618+
right_column.column_index,
619+
ColumnBindingBuilder::new(
620+
left_column.column_name,
621+
left_column.column_index,
622+
Box::new(DataType::from(&left_column.data_type)),
623+
Visibility::Visible,
624+
)
625+
.table_name(Some(left_table.table().name().to_string()))
626+
.database_name(Some(left_table.database().to_string()))
627+
.table_index(Some(left_table_index))
628+
.source_table_index(left_source_table_index)
629+
.column_position(left_column.column_position)
630+
.virtual_expr(left_column.virtual_expr)
631+
.build(),
632+
);
633+
}
634+
new_column_bindings
635+
};
636+
// restore ColumnBinding of same table in Subquery to table in Left
637+
let mut body_visitor = SubqueryBodyVisitor::new(new_column_bindings, left.clone());
638+
body_visitor.visit(subquery.subquery.as_ref())?;
639+
Ok(body_visitor.result().map(|s_expr| {
640+
(
641+
ScalarExpr::ConstantExpr(ConstantExpr {
642+
span: None,
643+
value: Scalar::Boolean(true),
644+
}),
645+
s_expr,
646+
)
647+
}))
648+
}
649+
456650
fn try_rewrite_uncorrelated_subquery(
457651
&mut self,
458652
left: &SExpr,

src/query/sql/src/planner/optimizer/rule/factory.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ use super::transform::RuleEagerAggregation;
5252
use super::transform::RuleLeftExchangeJoin;
5353
use super::RuleID;
5454
use super::RulePtr;
55-
use crate::optimizer::rule::rewrite::RuleEliminateSubquery;
5655
use crate::optimizer::rule::rewrite::RuleMergeFilterIntoMutation;
5756
use crate::optimizer::OptimizerContext;
5857

@@ -63,7 +62,6 @@ impl RuleFactory {
6362
match id {
6463
RuleID::EliminateUnion => Ok(Box::new(RuleEliminateUnion::new(ctx.metadata))),
6564
RuleID::EliminateEvalScalar => Ok(Box::new(RuleEliminateEvalScalar::new(ctx.metadata))),
66-
RuleID::EliminateSubquery => Ok(Box::new(RuleEliminateSubquery::new(ctx.metadata))),
6765
RuleID::FilterNulls => Ok(Box::new(RuleFilterNulls::new(
6866
ctx.enable_distributed_optimization,
6967
))),

src/query/sql/src/planner/optimizer/rule/rewrite/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ mod rule_commute_join;
1818
mod rule_eliminate_eval_scalar;
1919
mod rule_eliminate_filter;
2020
mod rule_eliminate_sort;
21-
mod rule_eliminate_subquery;
2221
mod rule_eliminate_union;
2322
mod rule_filter_nulls;
2423
mod rule_fold_count_aggregate;
@@ -54,7 +53,6 @@ pub use rule_commute_join::RuleCommuteJoin;
5453
pub use rule_eliminate_eval_scalar::RuleEliminateEvalScalar;
5554
pub use rule_eliminate_filter::RuleEliminateFilter;
5655
pub use rule_eliminate_sort::RuleEliminateSort;
57-
pub use rule_eliminate_subquery::RuleEliminateSubquery;
5856
pub use rule_eliminate_union::RuleEliminateUnion;
5957
pub use rule_filter_nulls::RuleFilterNulls;
6058
pub use rule_fold_count_aggregate::RuleFoldCountAggregate;

0 commit comments

Comments
 (0)