|
13 | 13 | // limitations under the License.
|
14 | 14 |
|
15 | 15 | use std::collections::BTreeSet;
|
| 16 | +use std::collections::HashMap; |
16 | 17 | use std::collections::HashSet;
|
17 | 18 | use std::sync::Arc;
|
18 | 19 |
|
@@ -52,6 +53,9 @@ use crate::plans::RelOperator;
|
52 | 53 | use crate::plans::ScalarExpr;
|
53 | 54 | use crate::plans::SubqueryExpr;
|
54 | 55 | use crate::plans::SubqueryType;
|
| 56 | +use crate::plans::VisitorMut; |
| 57 | +use crate::ColumnBinding; |
| 58 | +use crate::ColumnEntry; |
55 | 59 | use crate::IndexType;
|
56 | 60 | use crate::MetadataRef;
|
57 | 61 |
|
@@ -828,4 +832,157 @@ impl SubqueryRewriter {
|
828 | 832 |
|
829 | 833 | Ok(None)
|
830 | 834 | }
|
| 835 | + |
| 836 | + pub(crate) fn try_eliminate_in_subquery( |
| 837 | + &self, |
| 838 | + left: &SExpr, |
| 839 | + subquery: &SubqueryExpr, |
| 840 | + is_conjunctive_predicate: bool, |
| 841 | + ) -> Result<Option<ScalarExpr>> { |
| 842 | + #[derive(Debug)] |
| 843 | + struct BindingReplacer { |
| 844 | + new_column_bindings: HashMap<IndexType, ColumnBinding>, |
| 845 | + } |
| 846 | + |
| 847 | + impl VisitorMut<'_> for BindingReplacer { |
| 848 | + fn visit_bound_column_ref(&mut self, col: &mut BoundColumnRef) -> Result<()> { |
| 849 | + if let Some(new_binding) = self.new_column_bindings.get(&col.column.index) { |
| 850 | + col.column = new_binding.clone(); |
| 851 | + } |
| 852 | + Ok(()) |
| 853 | + } |
| 854 | + } |
| 855 | + |
| 856 | + // way to address such a pattern for subquery. |
| 857 | + // |
| 858 | + // (1) EvalScalar |
| 859 | + // \ |
| 860 | + // Scan |
| 861 | + // |
| 862 | + // (2) EvalScalar |
| 863 | + // \ |
| 864 | + // Filter |
| 865 | + // \ |
| 866 | + // Scan |
| 867 | + let subquery_matchers = [ |
| 868 | + Matcher::MatchOp { |
| 869 | + op_type: RelOp::EvalScalar, |
| 870 | + children: vec![Matcher::MatchOp { |
| 871 | + op_type: RelOp::Scan, |
| 872 | + children: vec![], |
| 873 | + }], |
| 874 | + }, |
| 875 | + Matcher::MatchOp { |
| 876 | + op_type: RelOp::EvalScalar, |
| 877 | + children: vec![Matcher::MatchOp { |
| 878 | + op_type: RelOp::Filter, |
| 879 | + children: vec![Matcher::MatchOp { |
| 880 | + op_type: RelOp::Scan, |
| 881 | + children: vec![], |
| 882 | + }], |
| 883 | + }], |
| 884 | + }, |
| 885 | + ]; |
| 886 | + let right_expr_binding = &subquery.output_column; |
| 887 | + let (SubqueryType::Any, Some(box ScalarExpr::BoundColumnRef(left_column))) = |
| 888 | + (&subquery.typ, &subquery.child_expr) |
| 889 | + else { |
| 890 | + return Ok(None); |
| 891 | + }; |
| 892 | + if !is_conjunctive_predicate { |
| 893 | + return Ok(None); |
| 894 | + } |
| 895 | + let (Some(left_table_index), Some(right_table_index)) = ( |
| 896 | + left_column.column.table_index, |
| 897 | + right_expr_binding.table_index, |
| 898 | + ) else { |
| 899 | + return Ok(None); |
| 900 | + }; |
| 901 | + // restore possible aliases or duplicate loaded tables by `source_table_id` to determine whether they are the same columns of the same table |
| 902 | + let (Some(left_source_binding), Some(right_source_binding)) = ( |
| 903 | + left_column.column.as_source(), |
| 904 | + right_expr_binding.as_source(), |
| 905 | + ) else { |
| 906 | + return Ok(None); |
| 907 | + }; |
| 908 | + if !matches!(left.plan(), RelOperator::Scan(_)) |
| 909 | + || left_column.column.data_type.is_nullable() |
| 910 | + || !subquery_matchers |
| 911 | + .iter() |
| 912 | + .any(|matcher| matcher.matches(&subquery.subquery)) |
| 913 | + || left_source_binding != right_source_binding |
| 914 | + { |
| 915 | + return Ok(None); |
| 916 | + } |
| 917 | + let new_column_bindings = { |
| 918 | + let guard = self.metadata.read(); |
| 919 | + |
| 920 | + let left_columns = guard.columns_by_table_index(left_table_index); |
| 921 | + let right_columns = guard.columns_by_table_index(right_table_index); |
| 922 | + let left_table = guard.table(left_table_index); |
| 923 | + let left_source_table_index = guard |
| 924 | + .get_source_table_index(Some(left_table.database()), left_table.table().name()); |
| 925 | + |
| 926 | + if left_columns.len() != right_columns.len() { |
| 927 | + return Ok(None); |
| 928 | + } |
| 929 | + let mut new_column_bindings = HashMap::with_capacity(left_columns.len()); |
| 930 | + for (left_entry, right_entry) in left_columns.into_iter().zip(right_columns.into_iter()) |
| 931 | + { |
| 932 | + let ( |
| 933 | + ColumnEntry::BaseTableColumn(left_column), |
| 934 | + ColumnEntry::BaseTableColumn(right_column), |
| 935 | + ) = (left_entry, right_entry) |
| 936 | + else { |
| 937 | + return Ok(None); |
| 938 | + }; |
| 939 | + new_column_bindings.insert( |
| 940 | + right_column.column_index, |
| 941 | + ColumnBindingBuilder::new( |
| 942 | + left_column.column_name, |
| 943 | + left_column.column_index, |
| 944 | + Box::new(DataType::from(&left_column.data_type)), |
| 945 | + Visibility::Visible, |
| 946 | + ) |
| 947 | + .table_name(Some(left_table.table().name().to_string())) |
| 948 | + .database_name(Some(left_table.database().to_string())) |
| 949 | + .table_index(Some(left_table_index)) |
| 950 | + .source_table_index(left_source_table_index) |
| 951 | + .column_position(left_column.column_position) |
| 952 | + .virtual_expr(left_column.virtual_expr) |
| 953 | + .build(), |
| 954 | + ); |
| 955 | + } |
| 956 | + new_column_bindings |
| 957 | + }; |
| 958 | + let mut scalar_expr = if left_column.column.data_type.is_nullable() { |
| 959 | + ScalarExpr::FunctionCall(FunctionCall { |
| 960 | + span: None, |
| 961 | + func_name: "is_not_null".to_string(), |
| 962 | + params: vec![], |
| 963 | + arguments: vec![ScalarExpr::BoundColumnRef(left_column.clone())], |
| 964 | + }) |
| 965 | + } else { |
| 966 | + ScalarExpr::ConstantExpr(ConstantExpr { |
| 967 | + span: None, |
| 968 | + value: Scalar::Boolean(true), |
| 969 | + }) |
| 970 | + }; |
| 971 | + if let RelOperator::Filter(filter) = subquery.subquery.child(0)?.plan() { |
| 972 | + let mut replacer = BindingReplacer { |
| 973 | + new_column_bindings, |
| 974 | + }; |
| 975 | + for mut expr in filter.predicates.iter().cloned() { |
| 976 | + // restore ColumnBinding of same table in Subquery to table in Left |
| 977 | + replacer.visit(&mut expr)?; |
| 978 | + scalar_expr = ScalarExpr::FunctionCall(FunctionCall { |
| 979 | + span: None, |
| 980 | + func_name: "and".to_string(), |
| 981 | + params: vec![], |
| 982 | + arguments: vec![scalar_expr, expr], |
| 983 | + }); |
| 984 | + } |
| 985 | + } |
| 986 | + Ok(Some(scalar_expr)) |
| 987 | + } |
831 | 988 | }
|
0 commit comments