From 495bf50e918d049aaf17e8e37bbe90974b476a60 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Fri, 15 Mar 2024 21:00:40 +0800 Subject: [PATCH 1/7] add support for nested loop join --- src/binder/select.rs | 7 +- src/execution/volcano/dql/join/mod.rs | 1 + .../volcano/dql/join/nested_loop_join.rs | 662 ++++++++++++++++++ src/execution/volcano/mod.rs | 9 +- tests/slt/sql_2016/E061_09.slt | 41 +- 5 files changed, 697 insertions(+), 23 deletions(-) create mode 100644 src/execution/volcano/dql/join/nested_loop_join.rs diff --git a/src/binder/select.rs b/src/binder/select.rs index 7e2da9d7..15227fec 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -791,7 +791,12 @@ impl<'a, T: Transaction> Binder<'a, T> { )?; } BinaryOperator::Or => { - todo!("`NestLoopJoin` is not supported yet") + accum_filter.push(ScalarExpression::Binary { + left_expr, + right_expr, + op, + ty, + }); } _ => { if left_expr.referenced_columns(true).iter().all(|column| { diff --git a/src/execution/volcano/dql/join/mod.rs b/src/execution/volcano/dql/join/mod.rs index 33781bd0..907dddcb 100644 --- a/src/execution/volcano/dql/join/mod.rs +++ b/src/execution/volcano/dql/join/mod.rs @@ -1,6 +1,7 @@ use crate::planner::operator::join::JoinType; pub(crate) mod hash_join; +pub(crate) mod nested_loop_join; pub fn joins_nullable(join_type: &JoinType) -> (bool, bool) { match join_type { diff --git a/src/execution/volcano/dql/join/nested_loop_join.rs b/src/execution/volcano/dql/join/nested_loop_join.rs new file mode 100644 index 00000000..a4ec2141 --- /dev/null +++ b/src/execution/volcano/dql/join/nested_loop_join.rs @@ -0,0 +1,662 @@ +use std::sync::Arc; + +use crate::catalog::{ColumnCatalog, ColumnRef}; +use crate::errors::DatabaseError; +use crate::execution::volcano::{build_read, BoxedExecutor, ReadExecutor}; +use crate::expression::ScalarExpression; +use crate::planner::operator::join::{JoinCondition, JoinOperator, JoinType}; +use crate::planner::LogicalPlan; +use crate::storage::Transaction; +use crate::types::tuple::{Schema, SchemaRef, Tuple}; +use crate::types::value::{DataValue, ValueRef, NULL_VALUE}; +use futures_async_stream::try_stream; +use itertools::Itertools; + +use super::joins_nullable; + +/// Equivalent condition +struct EqualCondition { + on_left_keys: Vec, + on_right_keys: Vec, + left_schema: SchemaRef, + right_schema: SchemaRef, +} + +impl EqualCondition { + /// Constructs a new `EqualCondition` + /// If the `on_left_keys` and `on_right_keys` are empty, it means no equivalent condition + /// Note: `on_left_keys` and `on_right_keys` are either all empty or none of them. + fn new( + on_left_keys: Vec, + on_right_keys: Vec, + left_schema: Arc, + right_schema: Arc, + ) -> EqualCondition { + if !on_left_keys.is_empty() && on_left_keys.len() != on_right_keys.len() { + unreachable!("Unexpected join on condition.") + } + EqualCondition { + on_left_keys, + on_right_keys, + left_schema, + right_schema, + } + } + + fn equals(&self, left_tuple: &Tuple, right_tuple: &Tuple) -> Result { + if self.on_left_keys.is_empty() { + return Ok(true); + } + let eval_keys = |on_keys: &[ScalarExpression], + tuple: &Tuple, + schema: &SchemaRef| + -> Result, DatabaseError> { + let mut values = Vec::with_capacity(on_keys.len()); + for expr in on_keys { + values.push(expr.eval(tuple, schema)?); + } + Ok(values) + }; + + let left_values = eval_keys(&self.on_left_keys, left_tuple, &self.left_schema)?; + let right_values = eval_keys(&self.on_right_keys, right_tuple, &self.right_schema)?; + Ok(left_values == right_values) + } +} + +/// NestedLoopJoin using nested loop join algorithm to execute a join operation. +/// One input will be selected to be the inner table and the other will be the outer +/// +------------------------------------------------------------------+ +/// | JoinType | Inner-table | Outer-table | +/// |--------------------------------|----------------|----------------| +/// | Inner/Left/LeftSemi/LeftAnti | right | left | +/// |--------------------------------|----------------|----------------| +/// | Right/RightSemi/RightAnti/Full | left | right | +/// |--------------------------------|----------------|----------------| +/// | Full | not supported | not supported | +/// +------------------------------------------------------------------+ +pub struct NestedLoopJoin { + left_input: LogicalPlan, + right_input: LogicalPlan, + full_schema_ref: SchemaRef, + ty: JoinType, + filter: Option, + eq_cond: EqualCondition, +} + +impl From<(JoinOperator, LogicalPlan, LogicalPlan)> for NestedLoopJoin { + fn from( + (JoinOperator { on, join_type, .. }, left_input, right_input): ( + JoinOperator, + LogicalPlan, + LogicalPlan, + ), + ) -> Self { + let ((mut on_left_keys, mut on_right_keys), filter) = match on { + JoinCondition::On { on, filter } => (on.into_iter().unzip(), filter), + JoinCondition::None => ((vec![], vec![]), None), + }; + + let (mut left_input, mut right_input) = (left_input, right_input); + if matches!(join_type, JoinType::RightOuter) { + std::mem::swap(&mut left_input, &mut right_input); + std::mem::swap(&mut on_left_keys, &mut on_right_keys); + } + + let left_schema = left_input.output_schema().clone(); + let right_schema = right_input.output_schema().clone(); + let full_schema_ref = Self::merge_schema(&left_schema, &right_schema, join_type); + + let eq_cond = EqualCondition::new( + on_left_keys, + on_right_keys, + left_schema.clone(), + right_schema.clone(), + ); + + NestedLoopJoin { + ty: join_type, + left_input, + right_input, + full_schema_ref, + filter, + eq_cond, + } + } +} + +impl ReadExecutor for NestedLoopJoin { + fn execute(self, transaction: &T) -> BoxedExecutor { + self._execute(transaction) + } +} + +impl NestedLoopJoin { + #[try_stream(boxed, ok = Tuple, error = DatabaseError)] + pub async fn _execute(self, transaction: &T) { + let NestedLoopJoin { + ty, + left_input, + right_input, + full_schema_ref, + filter, + eq_cond, + .. + } = self; + + if matches!(self.ty, JoinType::Full) { + unreachable!("{} cannot be handled in nested loop join", self.ty) + } + + #[for_await] + for tuple in build_read(left_input, transaction) { + let left_tuple: Tuple = tuple?; + let mut has_matched = false; + + #[for_await] + for right_tuple in build_read(right_input.clone(), transaction) { + let right_tuple: Tuple = right_tuple?; + + let tuple = match (filter.as_ref(), eq_cond.equals(&left_tuple, &right_tuple)?) { + (None, true) if matches!(ty, JoinType::RightOuter) => { + Self::emit_tuple(&right_tuple, &left_tuple, ty, true) + } + (None, true) => Self::emit_tuple(&left_tuple, &right_tuple, ty, true), + (Some(filter), true) => { + let new_tuple = Tuple { + id: None, + values: left_tuple + .values + .iter() + .cloned() + .chain(right_tuple.clone().values) + .collect_vec(), + }; + let value = filter.eval(&new_tuple, &full_schema_ref)?; + match value.as_ref() { + DataValue::Boolean(Some(true)) => { + let tuple = match ty { + JoinType::LeftAnti => None, + JoinType::LeftSemi if has_matched => None, + JoinType::RightOuter => { + Self::emit_tuple(&right_tuple, &left_tuple, ty, true) + } + _ => Self::emit_tuple(&left_tuple, &right_tuple, ty, true), + }; + has_matched = true; + tuple + } + DataValue::Boolean(Some(_) | None) => None, + _ => unreachable!(), + } + } + _ => None, + }; + + if let Some(tuple) = tuple { + yield tuple; + if matches!(ty, JoinType::LeftSemi) { + break; + } + } + if matches!(ty, JoinType::LeftAnti) && has_matched { + break; + } + } + + // handle no matched tuple case + let tuple = match ty { + JoinType::LeftAnti if !has_matched => Some(left_tuple.clone()), + JoinType::LeftOuter | JoinType::LeftSemi | JoinType::RightOuter if !has_matched => { + Self::emit_tuple(&left_tuple, &left_tuple, ty, false) + } + _ => None, + }; + if let Some(tuple) = tuple { + yield tuple + } + } + } + + /// Emit a tuple according to the join type. + /// + /// `left_tuple`: left tuple to be included. + /// `right_tuple` right tuple to be included. + /// `ty`: the type of join + /// `is_match`: whether `left_tuple` and `right_tuple` are matched + fn emit_tuple( + left_tuple: &Tuple, + right_tuple: &Tuple, + ty: JoinType, + is_matched: bool, + ) -> Option { + let left_len = left_tuple.values.len(); + let mut values = left_tuple + .values + .iter() + .cloned() + .chain(right_tuple.values.clone()) + .collect_vec(); + match ty { + JoinType::Inner | JoinType::Cross | JoinType::LeftSemi if !is_matched => values.clear(), + JoinType::LeftOuter if !is_matched => { + values + .iter_mut() + .skip(left_len) + .for_each(|v| *v = NULL_VALUE.clone()); + } + JoinType::RightOuter if !is_matched => { + (0..left_len).for_each(|i| { + values[i] = NULL_VALUE.clone(); + }); + } + JoinType::LeftSemi => values.truncate(left_len), + JoinType::LeftAnti => { + if is_matched { + values.clear(); + } else { + values.truncate(left_len); + } + } + JoinType::Full => todo!("Not support now."), + _ => (), + }; + + if values.is_empty() { + return None; + } + + Some(Tuple { id: None, values }) + } + + fn merge_schema( + left_schema: &[ColumnRef], + right_schema: &[ColumnRef], + ty: JoinType, + ) -> Arc> { + let (left_force_nullable, right_force_nullable) = joins_nullable(&ty); + + let mut join_schema = vec![]; + for column in left_schema.iter() { + let mut temp = ColumnCatalog::clone(column); + temp.nullable = left_force_nullable; + join_schema.push(Arc::new(temp)); + } + for column in right_schema.iter() { + let mut temp = ColumnCatalog::clone(column); + temp.nullable = right_force_nullable; + join_schema.push(Arc::new(temp)); + } + Arc::new(join_schema) + } +} + +#[cfg(test)] +mod test { + + use super::*; + use crate::catalog::{ColumnCatalog, ColumnDesc}; + use crate::execution::volcano::dql::test::build_integers; + use crate::execution::volcano::{try_collect, ReadExecutor}; + use crate::expression::ScalarExpression; + use crate::planner::operator::values::ValuesOperator; + use crate::planner::operator::Operator; + use crate::storage::kip::KipStorage; + use crate::storage::Storage; + use crate::types::value::DataValue; + use crate::types::LogicalType; + use std::collections::HashSet; + use std::sync::Arc; + use tempfile::TempDir; + + fn build_join_values( + eq: bool, + ) -> ( + Vec<(ScalarExpression, ScalarExpression)>, + LogicalPlan, + LogicalPlan, + ScalarExpression, + ) { + let desc = ColumnDesc::new(LogicalType::Integer, false, false, None); + + let t1_columns = vec![ + Arc::new(ColumnCatalog::new("c1".to_string(), true, desc.clone())), + Arc::new(ColumnCatalog::new("c2".to_string(), true, desc.clone())), + Arc::new(ColumnCatalog::new("c3".to_string(), true, desc.clone())), + ]; + + let t2_columns = vec![ + Arc::new(ColumnCatalog::new("c4".to_string(), true, desc.clone())), + Arc::new(ColumnCatalog::new("c5".to_string(), true, desc.clone())), + Arc::new(ColumnCatalog::new("c6".to_string(), true, desc.clone())), + ]; + + let on_keys = if eq { + vec![( + ScalarExpression::ColumnRef(t1_columns[1].clone()), + ScalarExpression::ColumnRef(t2_columns[1].clone()), + )] + } else { + vec![] + }; + + let values_t1 = LogicalPlan { + operator: Operator::Values(ValuesOperator { + rows: vec![ + vec![ + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Int32(Some(2))), + Arc::new(DataValue::Int32(Some(4))), + ], + vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(Some(2))), + Arc::new(DataValue::Int32(Some(5))), + ], + vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(Some(3))), + Arc::new(DataValue::Int32(Some(5))), + ], + vec![ + Arc::new(DataValue::Int32(Some(3))), + Arc::new(DataValue::Int32(Some(5))), + Arc::new(DataValue::Int32(Some(7))), + ], + ], + schema_ref: Arc::new(t1_columns), + }), + childrens: vec![], + physical_option: None, + _output_schema_ref: None, + }; + + let values_t2 = LogicalPlan { + operator: Operator::Values(ValuesOperator { + rows: vec![ + vec![ + Arc::new(DataValue::Int32(Some(0))), + Arc::new(DataValue::Int32(Some(2))), + Arc::new(DataValue::Int32(Some(4))), + ], + vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(Some(3))), + Arc::new(DataValue::Int32(Some(5))), + ], + vec![ + Arc::new(DataValue::Int32(Some(4))), + Arc::new(DataValue::Int32(Some(6))), + Arc::new(DataValue::Int32(Some(8))), + ], + vec![ + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(Some(1))), + Arc::new(DataValue::Int32(Some(1))), + ], + ], + schema_ref: Arc::new(t2_columns), + }), + childrens: vec![], + physical_option: None, + _output_schema_ref: None, + }; + + let filter = ScalarExpression::Binary { + op: crate::expression::BinaryOperator::Gt, + left_expr: Box::new(ScalarExpression::ColumnRef(Arc::new(ColumnCatalog::new( + "c1".to_owned(), + true, + desc.clone(), + )))), + right_expr: Box::new(ScalarExpression::ColumnRef(Arc::new(ColumnCatalog::new( + "c4".to_owned(), + true, + desc.clone(), + )))), + ty: LogicalType::Integer, + }; + + (on_keys, values_t1, values_t2, filter) + } + + fn valid_result(expected: &mut HashSet>>, actual: &[Tuple]) { + assert_eq!(actual.len(), expected.len()); + + for tuple in actual { + let values = tuple + .values + .iter() + .map(|v| { + if matches!(v.as_ref(), DataValue::Null) { + Arc::new(DataValue::Int32(None)) + } else { + v.clone() + } + }) + .collect_vec(); + assert!(expected.remove(&values)); + } + + assert!(expected.is_empty()); + } + + #[tokio::test] + async fn test_nested_inner_join() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = KipStorage::new(temp_dir.path()).await?; + let transaction = storage.transaction().await?; + let (keys, left, right, filter) = build_join_values(true); + let op = JoinOperator { + on: JoinCondition::On { + on: keys, + filter: Some(filter), + }, + join_type: JoinType::Inner, + }; + let mut executor = NestedLoopJoin::from((op, left, right)).execute(&transaction); + let tuples = try_collect(&mut executor).await?; + + let mut expected_set = HashSet::with_capacity(1); + let tuple = build_integers(vec![Some(1), Some(2), Some(5), Some(0), Some(2), Some(4)]); + expected_set.insert(tuple); + + valid_result(&mut expected_set, &tuples); + + Ok(()) + } + + #[tokio::test] + async fn test_nested_left_out_join() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = KipStorage::new(temp_dir.path()).await?; + let transaction = storage.transaction().await?; + let (keys, left, right, filter) = build_join_values(true); + let op = JoinOperator { + on: JoinCondition::On { + on: keys, + filter: Some(filter), + }, + join_type: JoinType::LeftOuter, + }; + let mut executor = NestedLoopJoin::from((op, left, right)).execute(&transaction); + let tuples = try_collect(&mut executor).await?; + + assert_eq!( + tuples[0].values, + build_integers(vec![Some(0), Some(2), Some(4), None, None, None]) + ); + + let mut expected_set = HashSet::with_capacity(4); + let tuple = build_integers(vec![Some(0), Some(2), Some(4), None, None, None]); + expected_set.insert(tuple); + let tuple = build_integers(vec![Some(1), Some(2), Some(5), Some(0), Some(2), Some(4)]); + expected_set.insert(tuple); + + let tuple = build_integers(vec![Some(1), Some(3), Some(5), None, None, None]); + expected_set.insert(tuple); + let tuple = build_integers(vec![Some(3), Some(5), Some(7), None, None, None]); + expected_set.insert(tuple); + + valid_result(&mut expected_set, &tuples); + + Ok(()) + } + + #[tokio::test] + async fn test_nested_cross_join_with_on() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = KipStorage::new(temp_dir.path()).await?; + let transaction = storage.transaction().await?; + let (keys, left, right, filter) = build_join_values(true); + let op = JoinOperator { + on: JoinCondition::On { + on: keys, + filter: Some(filter), + }, + join_type: JoinType::Cross, + }; + let mut executor = NestedLoopJoin::from((op, left, right)).execute(&transaction); + let tuples = try_collect(&mut executor).await?; + + let mut expected_set = HashSet::with_capacity(1); + + let tuple = build_integers(vec![Some(1), Some(2), Some(5), Some(0), Some(2), Some(4)]); + expected_set.insert(tuple); + + valid_result(&mut expected_set, &tuples); + + Ok(()) + } + + #[tokio::test] + async fn test_nested_cross_join_without_filter() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = KipStorage::new(temp_dir.path()).await?; + let transaction = storage.transaction().await?; + let (keys, left, right, _) = build_join_values(true); + let op = JoinOperator { + on: JoinCondition::On { + on: keys, + filter: None, + }, + join_type: JoinType::Cross, + }; + let mut executor = NestedLoopJoin::from((op, left, right)).execute(&transaction); + let tuples = try_collect(&mut executor).await?; + + let mut expected_set = HashSet::with_capacity(3); + + let tuple = build_integers(vec![Some(0), Some(2), Some(4), Some(0), Some(2), Some(4)]); + expected_set.insert(tuple); + let tuple = build_integers(vec![Some(1), Some(2), Some(5), Some(0), Some(2), Some(4)]); + expected_set.insert(tuple); + let tuple = build_integers(vec![Some(1), Some(3), Some(5), Some(1), Some(3), Some(5)]); + expected_set.insert(tuple); + + valid_result(&mut expected_set, &tuples); + Ok(()) + } + + #[tokio::test] + async fn test_nested_cross_join_without_on() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = KipStorage::new(temp_dir.path()).await?; + let transaction = storage.transaction().await?; + let (keys, left, right, _) = build_join_values(false); + let op = JoinOperator { + on: JoinCondition::On { + on: keys, + filter: None, + }, + join_type: JoinType::Cross, + }; + let mut executor = NestedLoopJoin::from((op, left, right)).execute(&transaction); + let tuples = try_collect(&mut executor).await?; + + assert_eq!(tuples.len(), 16); + + Ok(()) + } + + #[tokio::test] + async fn test_nested_left_semi_join() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = KipStorage::new(temp_dir.path()).await?; + let transaction = storage.transaction().await?; + let (keys, left, right, filter) = build_join_values(true); + let op = JoinOperator { + on: JoinCondition::On { + on: keys, + filter: Some(filter), + }, + join_type: JoinType::LeftSemi, + }; + let mut executor = NestedLoopJoin::from((op, left, right)).execute(&transaction); + let tuples = try_collect(&mut executor).await?; + + let mut expected_set = HashSet::with_capacity(1); + expected_set.insert(build_integers(vec![Some(1), Some(2), Some(5)])); + + valid_result(&mut expected_set, &tuples); + + Ok(()) + } + + #[tokio::test] + async fn test_nested_left_anti_join() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = KipStorage::new(temp_dir.path()).await?; + let transaction = storage.transaction().await?; + let (keys, left, right, filter) = build_join_values(true); + let op = JoinOperator { + on: JoinCondition::On { + on: keys, + filter: Some(filter), + }, + join_type: JoinType::LeftAnti, + }; + let mut executor = NestedLoopJoin::from((op, left, right)).execute(&transaction); + let tuples = try_collect(&mut executor).await?; + + let mut expected_set = HashSet::with_capacity(3); + expected_set.insert(build_integers(vec![Some(0), Some(2), Some(4)])); + expected_set.insert(build_integers(vec![Some(1), Some(3), Some(5)])); + expected_set.insert(build_integers(vec![Some(3), Some(5), Some(7)])); + + valid_result(&mut expected_set, &tuples); + + Ok(()) + } + + #[tokio::test] + async fn test_nested_right_out_join() -> Result<(), DatabaseError> { + let temp_dir = TempDir::new().expect("unable to create temporary working directory"); + let storage = KipStorage::new(temp_dir.path()).await?; + let transaction = storage.transaction().await?; + let (keys, left, right, filter) = build_join_values(true); + let op = JoinOperator { + on: JoinCondition::On { + on: keys, + filter: Some(filter), + }, + join_type: JoinType::RightOuter, + }; + let mut executor = NestedLoopJoin::from((op, left, right)).execute(&transaction); + let tuples = try_collect(&mut executor).await?; + + let mut expected_set = HashSet::with_capacity(4); + let tuple = build_integers(vec![Some(1), Some(2), Some(5), Some(0), Some(2), Some(4)]); + expected_set.insert(tuple); + let tuple = build_integers(vec![None, None, None, Some(1), Some(3), Some(5)]); + expected_set.insert(tuple); + let tuple = build_integers(vec![None, None, None, Some(1), Some(1), Some(1)]); + expected_set.insert(tuple); + let tuple = build_integers(vec![None, None, None, Some(4), Some(6), Some(8)]); + expected_set.insert(tuple); + + valid_result(&mut expected_set, &tuples); + + Ok(()) + } +} diff --git a/src/execution/volcano/mod.rs b/src/execution/volcano/mod.rs index b7b4ed46..d228ed1d 100644 --- a/src/execution/volcano/mod.rs +++ b/src/execution/volcano/mod.rs @@ -28,6 +28,7 @@ use crate::execution::volcano::dql::show_table::ShowTables; use crate::execution::volcano::dql::sort::Sort; use crate::execution::volcano::dql::union::Union; use crate::execution::volcano::dql::values::Values; +use crate::planner::operator::join::JoinCondition; use crate::planner::operator::{Operator, PhysicalOption}; use crate::planner::LogicalPlan; use crate::storage::Transaction; @@ -37,6 +38,7 @@ use futures::stream::BoxStream; use futures::TryStreamExt; use self::ddl::add_column::AddColumn; +use self::dql::join::nested_loop_join::NestedLoopJoin; pub type BoxedExecutor<'a> = BoxStream<'a, Result>; @@ -75,7 +77,12 @@ pub fn build_read(plan: LogicalPlan, transaction: &T) -> BoxedEx let right_input = childrens.pop().unwrap(); let left_input = childrens.pop().unwrap(); - HashJoin::from((op, left_input, right_input)).execute(transaction) + match &op.on { + JoinCondition::On { on, .. } if !on.is_empty() => { + HashJoin::from((op, left_input, right_input)).execute(transaction) + } + _ => NestedLoopJoin::from((op, left_input, right_input)).execute(transaction), + } } Operator::Project(op) => { let input = childrens.pop().unwrap(); diff --git a/tests/slt/sql_2016/E061_09.slt b/tests/slt/sql_2016/E061_09.slt index b02d81ff..47b8d535 100644 --- a/tests/slt/sql_2016/E061_09.slt +++ b/tests/slt/sql_2016/E061_09.slt @@ -1,24 +1,23 @@ # E061-09: Subqueries in comparison predicate -# TODO: NestLoopJoin -# statement ok -# CREATE TABLE TABLE_E061_09_01_01 ( ID INT PRIMARY KEY, A INT ); +statement ok +CREATE TABLE TABLE_E061_09_01_01 ( ID INT PRIMARY KEY, A INT ); -# query I -# SELECT A FROM TABLE_E061_09_01_01 WHERE A < ( SELECT 1 ) +query I +SELECT A FROM TABLE_E061_09_01_01 WHERE A < ( SELECT 1 ) -# statement ok -# CREATE TABLE TABLE_E061_09_01_02 ( ID INT PRIMARY KEY, A INT ); +statement ok +CREATE TABLE TABLE_E061_09_01_02 ( ID INT PRIMARY KEY, A INT ); -# query I -# SELECT A FROM TABLE_E061_09_01_02 WHERE A <= ( SELECT 1 ) +query I +SELECT A FROM TABLE_E061_09_01_02 WHERE A <= ( SELECT 1 ) -# statement ok -# CREATE TABLE TABLE_E061_09_01_03 ( ID INT PRIMARY KEY, A INT ); +statement ok +CREATE TABLE TABLE_E061_09_01_03 ( ID INT PRIMARY KEY, A INT ); -# query I -# SELECT A FROM TABLE_E061_09_01_03 WHERE A <> ( SELECT 1 ) +query I +SELECT A FROM TABLE_E061_09_01_03 WHERE A <> ( SELECT 1 ) statement ok CREATE TABLE TABLE_E061_09_01_04 ( ID INT PRIMARY KEY, A INT ); @@ -26,14 +25,14 @@ CREATE TABLE TABLE_E061_09_01_04 ( ID INT PRIMARY KEY, A INT ); query I SELECT A FROM TABLE_E061_09_01_04 WHERE A = ( SELECT 1 ) -# statement ok -# CREATE TABLE TABLE_E061_09_01_05 ( ID INT PRIMARY KEY, A INT ); +statement ok +CREATE TABLE TABLE_E061_09_01_05 ( ID INT PRIMARY KEY, A INT ); -# query I -# SELECT A FROM TABLE_E061_09_01_05 WHERE A > ( SELECT 1 ) +query I +SELECT A FROM TABLE_E061_09_01_05 WHERE A > ( SELECT 1 ) -# statement ok -# CREATE TABLE TABLE_E061_09_01_06 ( ID INT PRIMARY KEY, A INT ); +statement ok +CREATE TABLE TABLE_E061_09_01_06 ( ID INT PRIMARY KEY, A INT ); -# query I -# SELECT A FROM TABLE_E061_09_01_06 WHERE A >= ( SELECT 1 ) +query I +SELECT A FROM TABLE_E061_09_01_06 WHERE A >= ( SELECT 1 ) From 2f2df06d814136845a133078b681ae74e48172dc Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Fri, 15 Mar 2024 23:22:00 +0800 Subject: [PATCH 2/7] fix:keep the order of tuple and schema --- .../volcano/dql/join/nested_loop_join.rs | 64 +++++++++++++------ 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/src/execution/volcano/dql/join/nested_loop_join.rs b/src/execution/volcano/dql/join/nested_loop_join.rs index a4ec2141..bdb62ce8 100644 --- a/src/execution/volcano/dql/join/nested_loop_join.rs +++ b/src/execution/volcano/dql/join/nested_loop_join.rs @@ -1,3 +1,7 @@ +//! Defines the nested loop join executor, it supports [`JoinType::Inner`], [`JoinType::LeftOuter`], +//! [`JoinType::LeftSemi`], [`JoinType::LeftAnti`], [`JoinType::RightOuter`], [`JoinType::Cross`]. +//! But [`JoinType::Full`] is not supported. + use std::sync::Arc; use crate::catalog::{ColumnCatalog, ColumnRef}; @@ -43,6 +47,9 @@ impl EqualCondition { } } + /// Compare left tuple and right tuple on equivalent condition + /// `left_tuple` must be from the [`NestedLoopJoin::left_input`] + /// `right_tuple` must be from the [`NestedLoopJoin::right_input`] fn equals(&self, left_tuple: &Tuple, right_tuple: &Tuple) -> Result { if self.on_left_keys.is_empty() { return Ok(true); @@ -66,7 +73,6 @@ impl EqualCondition { /// NestedLoopJoin using nested loop join algorithm to execute a join operation. /// One input will be selected to be the inner table and the other will be the outer -/// +------------------------------------------------------------------+ /// | JoinType | Inner-table | Outer-table | /// |--------------------------------|----------------|----------------| /// | Inner/Left/LeftSemi/LeftAnti | right | left | @@ -74,11 +80,10 @@ impl EqualCondition { /// | Right/RightSemi/RightAnti/Full | left | right | /// |--------------------------------|----------------|----------------| /// | Full | not supported | not supported | -/// +------------------------------------------------------------------+ pub struct NestedLoopJoin { left_input: LogicalPlan, right_input: LogicalPlan, - full_schema_ref: SchemaRef, + output_schema_ref: SchemaRef, ty: JoinType, filter: Option, eq_cond: EqualCondition, @@ -98,15 +103,16 @@ impl From<(JoinOperator, LogicalPlan, LogicalPlan)> for NestedLoopJoin { }; let (mut left_input, mut right_input) = (left_input, right_input); + let mut left_schema = left_input.output_schema().clone(); + let mut right_schema = right_input.output_schema().clone(); + let output_schema_ref = Self::merge_schema(&left_schema, &right_schema, join_type); + if matches!(join_type, JoinType::RightOuter) { std::mem::swap(&mut left_input, &mut right_input); std::mem::swap(&mut on_left_keys, &mut on_right_keys); + std::mem::swap(&mut left_schema, &mut right_schema); } - let left_schema = left_input.output_schema().clone(); - let right_schema = right_input.output_schema().clone(); - let full_schema_ref = Self::merge_schema(&left_schema, &right_schema, join_type); - let eq_cond = EqualCondition::new( on_left_keys, on_right_keys, @@ -118,7 +124,7 @@ impl From<(JoinOperator, LogicalPlan, LogicalPlan)> for NestedLoopJoin { ty: join_type, left_input, right_input, - full_schema_ref, + output_schema_ref, filter, eq_cond, } @@ -138,7 +144,7 @@ impl NestedLoopJoin { ty, left_input, right_input, - full_schema_ref, + output_schema_ref, filter, eq_cond, .. @@ -163,16 +169,8 @@ impl NestedLoopJoin { } (None, true) => Self::emit_tuple(&left_tuple, &right_tuple, ty, true), (Some(filter), true) => { - let new_tuple = Tuple { - id: None, - values: left_tuple - .values - .iter() - .cloned() - .chain(right_tuple.clone().values) - .collect_vec(), - }; - let value = filter.eval(&new_tuple, &full_schema_ref)?; + let new_tuple = Self::merge_tuple(&left_tuple, &right_tuple, &ty); + let value = filter.eval(&new_tuple, &output_schema_ref)?; match value.as_ref() { DataValue::Boolean(Some(true)) => { let tuple = match ty { @@ -223,7 +221,7 @@ impl NestedLoopJoin { /// `left_tuple`: left tuple to be included. /// `right_tuple` right tuple to be included. /// `ty`: the type of join - /// `is_match`: whether `left_tuple` and `right_tuple` are matched + /// `is_match`: whether [`NestedLoopJoin::left_input`] and [`NestedLoopJoin::right_input`] are matched fn emit_tuple( left_tuple: &Tuple, right_tuple: &Tuple, @@ -269,6 +267,32 @@ impl NestedLoopJoin { Some(Tuple { id: None, values }) } + /// Merge the two tuples. + /// `left_tuple` must be from the `NestedLoopJoin.left_input` + /// `right_tuple` must be from the `NestedLoopJoin.right_input` + fn merge_tuple(left_tuple: &Tuple, right_tuple: &Tuple, ty: &JoinType) -> Tuple { + match ty { + JoinType::RightOuter => Tuple { + id: None, + values: right_tuple + .values + .iter() + .cloned() + .chain(left_tuple.clone().values) + .collect_vec(), + }, + _ => Tuple { + id: None, + values: left_tuple + .values + .iter() + .cloned() + .chain(right_tuple.clone().values) + .collect_vec(), + }, + } + } + fn merge_schema( left_schema: &[ColumnRef], right_schema: &[ColumnRef], From 8f21af76a00a4708a10cd8f57457412a61e6a743 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Sat, 16 Mar 2024 14:29:09 +0800 Subject: [PATCH 3/7] fix:right join not matched case --- .../volcano/dql/join/nested_loop_join.rs | 33 ++++++----- tests/slt/join.slt | 58 +++++++++++++++++++ 2 files changed, 76 insertions(+), 15 deletions(-) diff --git a/src/execution/volcano/dql/join/nested_loop_join.rs b/src/execution/volcano/dql/join/nested_loop_join.rs index bdb62ce8..01baf31d 100644 --- a/src/execution/volcano/dql/join/nested_loop_join.rs +++ b/src/execution/volcano/dql/join/nested_loop_join.rs @@ -6,13 +6,14 @@ use std::sync::Arc; use crate::catalog::{ColumnCatalog, ColumnRef}; use crate::errors::DatabaseError; +use crate::execution::volcano::dql::projection::Projection; use crate::execution::volcano::{build_read, BoxedExecutor, ReadExecutor}; use crate::expression::ScalarExpression; use crate::planner::operator::join::{JoinCondition, JoinOperator, JoinType}; use crate::planner::LogicalPlan; use crate::storage::Transaction; use crate::types::tuple::{Schema, SchemaRef, Tuple}; -use crate::types::value::{DataValue, ValueRef, NULL_VALUE}; +use crate::types::value::{DataValue, NULL_VALUE}; use futures_async_stream::try_stream; use itertools::Itertools; @@ -54,19 +55,11 @@ impl EqualCondition { if self.on_left_keys.is_empty() { return Ok(true); } - let eval_keys = |on_keys: &[ScalarExpression], - tuple: &Tuple, - schema: &SchemaRef| - -> Result, DatabaseError> { - let mut values = Vec::with_capacity(on_keys.len()); - for expr in on_keys { - values.push(expr.eval(tuple, schema)?); - } - Ok(values) - }; + let left_values = + Projection::projection(left_tuple, &self.on_left_keys, &self.left_schema)?; + let right_values = + Projection::projection(right_tuple, &self.on_right_keys, &self.right_schema)?; - let left_values = eval_keys(&self.on_left_keys, left_tuple, &self.left_schema)?; - let right_values = eval_keys(&self.on_right_keys, right_tuple, &self.right_schema)?; Ok(left_values == right_values) } } @@ -154,6 +147,8 @@ impl NestedLoopJoin { unreachable!("{} cannot be handled in nested loop join", self.ty) } + let right_schema_len = eq_cond.right_schema.len(); + #[for_await] for tuple in build_read(left_input, transaction) { let left_tuple: Tuple = tuple?; @@ -185,7 +180,7 @@ impl NestedLoopJoin { tuple } DataValue::Boolean(Some(_) | None) => None, - _ => unreachable!(), + _ => return Err(DatabaseError::InvalidType), } } _ => None, @@ -206,7 +201,15 @@ impl NestedLoopJoin { let tuple = match ty { JoinType::LeftAnti if !has_matched => Some(left_tuple.clone()), JoinType::LeftOuter | JoinType::LeftSemi | JoinType::RightOuter if !has_matched => { - Self::emit_tuple(&left_tuple, &left_tuple, ty, false) + let right_tuple = Tuple { + id: None, + values: vec![NULL_VALUE.clone(); right_schema_len], + }; + if matches!(ty, JoinType::RightOuter) { + Self::emit_tuple(&right_tuple, &left_tuple, ty, false) + } else { + Self::emit_tuple(&left_tuple, &right_tuple, ty, false) + } } _ => None, }; diff --git a/tests/slt/join.slt b/tests/slt/join.slt index 9cf96475..06bc9781 100644 --- a/tests/slt/join.slt +++ b/tests/slt/join.slt @@ -128,3 +128,61 @@ select a.*, c.* from a inner join a as c using (id) 0 1 1 0 1 1 1 2 2 1 2 2 2 3 3 2 3 3 + +query IIIIIII rowsort +select a.*, b.* from a join b on v1 > v3 +---- +1 2 2 0 1 1 1 +1 2 2 3 1 1 5 +2 3 3 0 1 1 1 +2 3 3 1 2 2 2 +2 3 3 3 1 1 5 + +query IIIIIII rowsort +select a.*, b.* from a join b on v1 = 1 +---- +0 1 1 0 1 1 1 +0 1 1 1 2 2 2 +0 1 1 2 3 3 4 +0 1 1 3 1 1 5 + +query IIIIIII rowsort +select a.*, b.* from a left join b on v1 < v3 or v2 < v4 +---- +0 1 1 1 2 2 2 +0 1 1 2 3 3 4 +1 2 2 2 3 3 4 +2 3 3 null null null null + +query IIIIIII rowsort +select a.*, b.* from a right join b on v1 <> v3 and v2 < v4 +---- +0 1 1 1 2 2 2 +0 1 1 2 3 3 4 +1 2 2 2 3 3 4 +null null null 0 1 1 1 +null null null 3 1 1 5 + +query IIIIIII rowsort +select a.*, b.* from a cross join b +---- +0 1 1 0 1 1 1 +0 1 1 1 2 2 2 +0 1 1 2 3 3 4 +0 1 1 3 1 1 5 +1 2 2 0 1 1 1 +1 2 2 1 2 2 2 +1 2 2 2 3 3 4 +1 2 2 3 1 1 5 +2 3 3 0 1 1 1 +2 3 3 1 2 2 2 +2 3 3 2 3 3 4 +2 3 3 3 1 1 5 + +query III rowsort +select a.* from a where v1 >= (select 1) +---- +0 1 1 +1 2 2 +2 3 3 + From 35c0cabf77d9acccbdce6819a32ccaea78d83ada Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Sat, 16 Mar 2024 20:34:38 +0800 Subject: [PATCH 4/7] fix: expression ownership in the temp table of subquery --- src/binder/expr.rs | 30 ++++++++++++++++++------------ src/binder/select.rs | 7 +++++-- src/expression/mod.rs | 12 ++++++++++++ 3 files changed, 35 insertions(+), 14 deletions(-) diff --git a/src/binder/expr.rs b/src/binder/expr.rs index b021c904..84eedb8f 100644 --- a/src/binder/expr.rs +++ b/src/binder/expr.rs @@ -110,13 +110,13 @@ impl<'a, T: Transaction> Binder<'a, T> { }), Expr::Subquery(subquery) => { let (sub_query, column) = self.bind_subquery(subquery)?; - self.context.sub_query(SubQueryType::SubQuery(sub_query)); - - if self.context.is_step(&QueryBindStep::Where) { - Ok(self.bind_temp_column(column)) + let (expr, sub_query) = if !self.context.is_step(&QueryBindStep::Where) { + self.bind_temp_table(column, sub_query)? } else { - Ok(ScalarExpression::ColumnRef(column)) - } + (ScalarExpression::ColumnRef(column), sub_query) + }; + self.context.sub_query(SubQueryType::SubQuery(sub_query)); + Ok(expr) } Expr::InSubquery { expr, @@ -124,8 +124,6 @@ impl<'a, T: Transaction> Binder<'a, T> { negated, } => { let (sub_query, column) = self.bind_subquery(subquery)?; - self.context - .sub_query(SubQueryType::InSubQuery(*negated, sub_query)); if !self.context.is_step(&QueryBindStep::Where) { return Err(DatabaseError::UnsupportedStmt( @@ -133,7 +131,9 @@ impl<'a, T: Transaction> Binder<'a, T> { )); } - let alias_expr = self.bind_temp_column(column); + let (alias_expr, sub_query) = self.bind_temp_table(column, sub_query)?; + self.context + .sub_query(SubQueryType::InSubQuery(*negated, sub_query)); Ok(ScalarExpression::Binary { op: expression::BinaryOperator::Eq, @@ -203,16 +203,22 @@ impl<'a, T: Transaction> Binder<'a, T> { } } - fn bind_temp_column(&mut self, column: ColumnRef) -> ScalarExpression { + fn bind_temp_table( + &mut self, + column: ColumnRef, + sub_query: LogicalPlan, + ) -> Result<(ScalarExpression, LogicalPlan), DatabaseError> { let mut alias_column = ColumnCatalog::clone(&column); alias_column.set_table_name(self.context.temp_table()); - ScalarExpression::Alias { + let alias_expr = ScalarExpression::Alias { expr: Box::new(ScalarExpression::ColumnRef(column)), alias: AliasType::Expr(Box::new(ScalarExpression::ColumnRef(Arc::new( alias_column, )))), - } + }; + let alias_plan = self.bind_project(sub_query, vec![alias_expr.clone()])?; + Ok((alias_expr, alias_plan)) } fn bind_subquery( diff --git a/src/binder/select.rs b/src/binder/select.rs index 15227fec..3fa78df4 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -510,7 +510,7 @@ impl<'a, T: Transaction> Binder<'a, T> { Ok(FilterOperator::build(having, children, true)) } - fn bind_project( + pub(crate) fn bind_project( &mut self, children: LogicalPlan, select_list: Vec, @@ -722,7 +722,10 @@ impl<'a, T: Transaction> Binder<'a, T> { } => { match op { BinaryOperator::Eq => { - match (left_expr.unpack_alias_ref(), right_expr.unpack_alias_ref()) { + match ( + left_expr.unpack_alias_inner_ref().unpack_alias_ref(), + right_expr.unpack_alias_inner_ref().unpack_alias_ref(), + ) { // example: foo = bar (ScalarExpression::ColumnRef(l), ScalarExpression::ColumnRef(r)) => { // reorder left and right joins keys to pattern: (left, right) diff --git a/src/expression/mod.rs b/src/expression/mod.rs index 7c9cb659..ca833c4f 100644 --- a/src/expression/mod.rs +++ b/src/expression/mod.rs @@ -136,6 +136,18 @@ impl ScalarExpression { } } + pub fn unpack_alias_inner_ref(&self) -> &ScalarExpression { + if let ScalarExpression::Alias { + alias: AliasType::Expr(expr), + .. + } = self + { + expr.unpack_alias_inner_ref() + } else { + self + } + } + pub fn try_reference(&mut self, output_exprs: &[ScalarExpression]) { let fn_output_column = |expr: &ScalarExpression| expr.unpack_alias_ref().output_column(); let self_column = fn_output_column(self); From 6a0fcc208f23922e7cce80f6e27da46611bb1529 Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Sat, 16 Mar 2024 23:17:58 +0800 Subject: [PATCH 5/7] fix: bind error of the same fields in different tables in `in subquery` --- src/binder/expr.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/binder/expr.rs b/src/binder/expr.rs index 84eedb8f..a286f7e4 100644 --- a/src/binder/expr.rs +++ b/src/binder/expr.rs @@ -123,6 +123,7 @@ impl<'a, T: Transaction> Binder<'a, T> { subquery, negated, } => { + let left_expr = Box::new(self.bind_expr(expr)?); let (sub_query, column) = self.bind_subquery(subquery)?; if !self.context.is_step(&QueryBindStep::Where) { @@ -137,7 +138,7 @@ impl<'a, T: Transaction> Binder<'a, T> { Ok(ScalarExpression::Binary { op: expression::BinaryOperator::Eq, - left_expr: Box::new(self.bind_expr(expr)?), + left_expr, right_expr: Box::new(alias_expr), ty: LogicalType::Boolean, }) @@ -295,7 +296,7 @@ impl<'a, T: Transaction> Binder<'a, T> { } else { // handle col syntax let mut got_column = None; - for table_catalog in self.context.bind_table.values() { + for table_catalog in self.context.bind_table.values().rev() { if let Some(column_catalog) = table_catalog.get_column_by_name(&column_name) { got_column = Some(column_catalog); } From b35bf0894569fd6c9bcc249d430d80befd1f6307 Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Sun, 17 Mar 2024 02:30:15 +0800 Subject: [PATCH 6/7] fix: combine_operators.rs deletes the temp table alias of subquery --- src/binder/select.rs | 11 +++++----- src/expression/mod.rs | 22 +++++++++---------- .../rule/normalization/combine_operators.rs | 2 -- 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/src/binder/select.rs b/src/binder/select.rs index 3fa78df4..35e3cb3f 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -76,7 +76,7 @@ impl<'a, T: Transaction> Binder<'a, T> { // Resolve scalar function call. // TODO support SRF(Set-Returning Function). - let mut select_list = self.normalize_select_item(&select.projection)?; + let mut select_list = self.normalize_select_item(&select.projection, &plan)?; if let Some(predicate) = &select.selection { plan = self.bind_where(plan, predicate)?; @@ -341,6 +341,7 @@ impl<'a, T: Transaction> Binder<'a, T> { fn normalize_select_item( &mut self, items: &[SelectItem], + plan: &LogicalPlan, ) -> Result, DatabaseError> { let mut select_items = vec![]; @@ -359,6 +360,9 @@ impl<'a, T: Transaction> Binder<'a, T> { }); } SelectItem::Wildcard(_) => { + if let Operator::Project(op) = &plan.operator { + return Ok(op.exprs.clone()); + } for (table_name, _) in self.context.bind_table.keys() { self.bind_table_column_refs(&mut select_items, table_name.clone())?; } @@ -722,10 +726,7 @@ impl<'a, T: Transaction> Binder<'a, T> { } => { match op { BinaryOperator::Eq => { - match ( - left_expr.unpack_alias_inner_ref().unpack_alias_ref(), - right_expr.unpack_alias_inner_ref().unpack_alias_ref(), - ) { + match (left_expr.unpack_alias_ref(), right_expr.unpack_alias_ref()) { // example: foo = bar (ScalarExpression::ColumnRef(l), ScalarExpression::ColumnRef(r)) => { // reorder left and right joins keys to pattern: (left, right) diff --git a/src/expression/mod.rs b/src/expression/mod.rs index ca833c4f..78672a4c 100644 --- a/src/expression/mod.rs +++ b/src/expression/mod.rs @@ -121,7 +121,13 @@ pub enum ScalarExpression { impl ScalarExpression { pub fn unpack_alias(self) -> ScalarExpression { - if let ScalarExpression::Alias { expr, .. } = self { + if let ScalarExpression::Alias { + alias: AliasType::Expr(expr), + .. + } = self + { + expr.unpack_alias() + } else if let ScalarExpression::Alias { expr, .. } = self { expr.unpack_alias() } else { self @@ -129,27 +135,21 @@ impl ScalarExpression { } pub fn unpack_alias_ref(&self) -> &ScalarExpression { - if let ScalarExpression::Alias { expr, .. } = self { - expr.unpack_alias_ref() - } else { - self - } - } - - pub fn unpack_alias_inner_ref(&self) -> &ScalarExpression { if let ScalarExpression::Alias { alias: AliasType::Expr(expr), .. } = self { - expr.unpack_alias_inner_ref() + expr.unpack_alias_ref() + } else if let ScalarExpression::Alias { expr, .. } = self { + expr.unpack_alias_ref() } else { self } } pub fn try_reference(&mut self, output_exprs: &[ScalarExpression]) { - let fn_output_column = |expr: &ScalarExpression| expr.unpack_alias_ref().output_column(); + let fn_output_column = |expr: &ScalarExpression| expr.output_column(); let self_column = fn_output_column(self); if let Some((pos, _)) = output_exprs .iter() diff --git a/src/optimizer/rule/normalization/combine_operators.rs b/src/optimizer/rule/normalization/combine_operators.rs index 785b3106..2b59760b 100644 --- a/src/optimizer/rule/normalization/combine_operators.rs +++ b/src/optimizer/rule/normalization/combine_operators.rs @@ -61,8 +61,6 @@ impl NormalizationRule for CollapseProject { if let Operator::Project(child_op) = graph.operator(child_id) { if is_subset_exprs(&op.exprs, &child_op.exprs) { graph.remove_node(child_id, false); - } else { - graph.remove_node(node_id, false); } } } From 0a20b2255df858decdf8c2ac35619afc5d6ba691 Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Wed, 20 Mar 2024 00:02:54 +0800 Subject: [PATCH 7/7] test: add `F041_08.slt`/`sqlite.slt`/`subquery.slt` --- src/execution/volcano/mod.rs | 4 +- src/optimizer/rule/implementation/dql/join.rs | 40 +++++++++--- src/optimizer/rule/implementation/mod.rs | 6 +- src/planner/operator/mod.rs | 2 + tests/slt/crdb/sqlite.slt | 19 +++--- tests/slt/sql_2016/F041_08.slt | 62 +++++++++---------- tests/slt/subquery.slt | 9 ++- 7 files changed, 84 insertions(+), 58 deletions(-) diff --git a/src/execution/volcano/mod.rs b/src/execution/volcano/mod.rs index d228ed1d..472ace14 100644 --- a/src/execution/volcano/mod.rs +++ b/src/execution/volcano/mod.rs @@ -78,7 +78,9 @@ pub fn build_read(plan: LogicalPlan, transaction: &T) -> BoxedEx let left_input = childrens.pop().unwrap(); match &op.on { - JoinCondition::On { on, .. } if !on.is_empty() => { + JoinCondition::On { on, .. } + if !on.is_empty() && plan.physical_option == Some(PhysicalOption::HashJoin) => + { HashJoin::from((op, left_input, right_input)).execute(transaction) } _ => NestedLoopJoin::from((op, left_input, right_input)).execute(transaction), diff --git a/src/optimizer/rule/implementation/dql/join.rs b/src/optimizer/rule/implementation/dql/join.rs index a85d92c0..caf2edf4 100644 --- a/src/optimizer/rule/implementation/dql/join.rs +++ b/src/optimizer/rule/implementation/dql/join.rs @@ -3,8 +3,8 @@ use crate::optimizer::core::memo::{Expression, GroupExpression}; use crate::optimizer::core::pattern::{Pattern, PatternChildrenPredicate}; use crate::optimizer::core::rule::{ImplementationRule, MatchPattern}; use crate::optimizer::core::statistics_meta::StatisticMetaLoader; +use crate::planner::operator::join::{JoinCondition, JoinOperator}; use crate::planner::operator::{Operator, PhysicalOption}; -use crate::single_mapping; use crate::storage::Transaction; use lazy_static::lazy_static; @@ -18,10 +18,36 @@ lazy_static! { } #[derive(Clone)] -pub struct HashJoinImplementation; +pub struct JoinImplementation; -single_mapping!( - HashJoinImplementation, - JOIN_PATTERN, - PhysicalOption::HashJoin -); +impl MatchPattern for JoinImplementation { + fn pattern(&self) -> &Pattern { + &JOIN_PATTERN + } +} + +impl ImplementationRule for JoinImplementation { + fn to_expression( + &self, + op: &Operator, + _: &StatisticMetaLoader<'_, T>, + group_expr: &mut GroupExpression, + ) -> Result<(), DatabaseError> { + let mut physical_option = PhysicalOption::NestLoopJoin; + + if let Operator::Join(JoinOperator { + on: JoinCondition::On { on, .. }, + .. + }) = op + { + if !on.is_empty() { + physical_option = PhysicalOption::HashJoin; + } + } + group_expr.append_expr(Expression { + op: physical_option, + cost: None, + }); + Ok(()) + } +} diff --git a/src/optimizer/rule/implementation/mod.rs b/src/optimizer/rule/implementation/mod.rs index fd0efbf3..9f35b245 100644 --- a/src/optimizer/rule/implementation/mod.rs +++ b/src/optimizer/rule/implementation/mod.rs @@ -24,7 +24,7 @@ use crate::optimizer::rule::implementation::dql::aggregate::{ }; use crate::optimizer::rule::implementation::dql::dummy::DummyImplementation; use crate::optimizer::rule::implementation::dql::filter::FilterImplementation; -use crate::optimizer::rule::implementation::dql::join::HashJoinImplementation; +use crate::optimizer::rule::implementation::dql::join::JoinImplementation; use crate::optimizer::rule::implementation::dql::limit::LimitImplementation; use crate::optimizer::rule::implementation::dql::projection::ProjectionImplementation; use crate::optimizer::rule::implementation::dql::scan::{ @@ -71,7 +71,7 @@ impl MatchPattern for ImplementationRuleImpl { ImplementationRuleImpl::SimpleAggregate => SimpleAggregateImplementation.pattern(), ImplementationRuleImpl::Dummy => DummyImplementation.pattern(), ImplementationRuleImpl::Filter => FilterImplementation.pattern(), - ImplementationRuleImpl::HashJoin => HashJoinImplementation.pattern(), + ImplementationRuleImpl::HashJoin => JoinImplementation.pattern(), ImplementationRuleImpl::Limit => LimitImplementation.pattern(), ImplementationRuleImpl::Projection => ProjectionImplementation.pattern(), ImplementationRuleImpl::SeqScan => SeqScanImplementation.pattern(), @@ -114,7 +114,7 @@ impl ImplementationRule for ImplementationRuleImpl { FilterImplementation.to_expression(operator, loader, group_expr)? } ImplementationRuleImpl::HashJoin => { - HashJoinImplementation.to_expression(operator, loader, group_expr)? + JoinImplementation.to_expression(operator, loader, group_expr)? } ImplementationRuleImpl::Limit => { LimitImplementation.to_expression(operator, loader, group_expr)? diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index bc2ba605..e7ae73bd 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -88,6 +88,7 @@ pub enum PhysicalOption { HashAggregate, Filter, HashJoin, + NestLoopJoin, Project, SeqScan, IndexScan(IndexInfo), @@ -270,6 +271,7 @@ impl fmt::Display for PhysicalOption { PhysicalOption::HashAggregate => write!(f, "HashAggregate"), PhysicalOption::Filter => write!(f, "Filter"), PhysicalOption::HashJoin => write!(f, "HashJoin"), + PhysicalOption::NestLoopJoin => write!(f, "NestLoopJoin"), PhysicalOption::Project => write!(f, "Project"), PhysicalOption::SeqScan => write!(f, "SeqScan"), PhysicalOption::IndexScan(index) => write!(f, "IndexScan By {}", index), diff --git a/tests/slt/crdb/sqlite.slt b/tests/slt/crdb/sqlite.slt index cc5a7337..b37840ea 100644 --- a/tests/slt/crdb/sqlite.slt +++ b/tests/slt/crdb/sqlite.slt @@ -30,16 +30,15 @@ CREATE TABLE tab64784(pk INTEGER primary key, col0 INTEGER, col1 FLOAT, col2 VAR statement ok INSERT INTO tab64784 VALUES(0,212,202.62,'nshdy',212,208.79,'wsxfc'),(1,213,203.64,'xwfuo',213,209.26,'lyswz'),(2,214,204.82,'jnued',216,210.48,'qczzf'),(3,215,205.40,'jtijf',217,211.96,'dpugl'),(4,216,206.3,'dpdzk',219,212.43,'xfirg'),(5,218,207.43,'qpwyw',220,213.50,'fmgky'),(6,219,208.3,'uooxb',221,215.30,'xpmdy'),(7,220,209.54,'ndtbb',225,218.8,'ivqyw'),(8,221,210.65,'zjpts',226,219.82,'sezsm'),(9,222,211.57,'slaxq',227,220.91,'bdqyb') -# TODO: Support `NestLoopJoin` -# query II -# SELECT pk, col0 FROM tab64784 WHERE (col0 IN (SELECT col3 FROM tab64784 WHERE col3 IS NULL OR (col1 < 22.54) OR col4 > 85.74) OR ((col4 IS NULL)) AND col3 < 8 OR (col4 > 82.93 AND (col0 <= 61) AND col0 > 94 AND col0 > 15)) ORDER BY 2 DESC -# ---- -# 0 212 -# 1 213 -# 4 216 -# 6 219 -# 7 220 -# 8 221 +query II +SELECT pk, col0 FROM tab64784 WHERE (col0 IN (SELECT col3 FROM tab64784 WHERE col3 IS NULL OR (col1 < 22.54) OR col4 > 85.74) OR ((col4 IS NULL)) AND col3 < 8 OR (col4 > 82.93 AND (col0 <= 61) AND col0 > 94 AND col0 > 15)) ORDER BY 2 DESC +---- +0 212 +1 213 +4 216 +6 219 +7 220 +8 221 statement ok drop table tab64784 diff --git a/tests/slt/sql_2016/F041_08.slt b/tests/slt/sql_2016/F041_08.slt index d54dce2a..50e31c01 100644 --- a/tests/slt/sql_2016/F041_08.slt +++ b/tests/slt/sql_2016/F041_08.slt @@ -1,33 +1,31 @@ # F041-08: All comparison operators are supported (rather than just =) -# TODO: NestLoopJoin - -# statement ok -# CREATE TABLE TABLE_F041_08_01_011 ( ID INT PRIMARY KEY, A INTEGER ); +statement ok +CREATE TABLE TABLE_F041_08_01_011 ( ID INT PRIMARY KEY, A INTEGER ); -# statement ok -# CREATE TABLE TABLE_F041_08_01_012 ( ID INT PRIMARY KEY, B INTEGER ); +statement ok +CREATE TABLE TABLE_F041_08_01_012 ( ID INT PRIMARY KEY, B INTEGER ); -# query II -# SELECT TABLE_F041_08_01_011.A, TABLE_F041_08_01_012.B FROM TABLE_F041_08_01_011 JOIN TABLE_F041_08_01_012 ON TABLE_F041_08_01_011.A < TABLE_F041_08_01_012.B +query II +SELECT TABLE_F041_08_01_011.A, TABLE_F041_08_01_012.B FROM TABLE_F041_08_01_011 JOIN TABLE_F041_08_01_012 ON TABLE_F041_08_01_011.A < TABLE_F041_08_01_012.B -# statement ok -# CREATE TABLE TABLE_F041_08_01_021 ( ID INT PRIMARY KEY, A INTEGER ); +statement ok +CREATE TABLE TABLE_F041_08_01_021 ( ID INT PRIMARY KEY, A INTEGER ); -# statement ok -# CREATE TABLE TABLE_F041_08_01_022 ( ID INT PRIMARY KEY, B INTEGER ); +statement ok +CREATE TABLE TABLE_F041_08_01_022 ( ID INT PRIMARY KEY, B INTEGER ); -# query II -# SELECT TABLE_F041_08_01_021.A, TABLE_F041_08_01_022.B FROM TABLE_F041_08_01_021 JOIN TABLE_F041_08_01_022 ON TABLE_F041_08_01_021.A <= TABLE_F041_08_01_022.B +query II +SELECT TABLE_F041_08_01_021.A, TABLE_F041_08_01_022.B FROM TABLE_F041_08_01_021 JOIN TABLE_F041_08_01_022 ON TABLE_F041_08_01_021.A <= TABLE_F041_08_01_022.B -# statement ok -# CREATE TABLE TABLE_F041_08_01_031 ( ID INT PRIMARY KEY, A INTEGER ); +statement ok +CREATE TABLE TABLE_F041_08_01_031 ( ID INT PRIMARY KEY, A INTEGER ); -# statement ok -# CREATE TABLE TABLE_F041_08_01_032 ( ID INT PRIMARY KEY, B INTEGER ); +statement ok +CREATE TABLE TABLE_F041_08_01_032 ( ID INT PRIMARY KEY, B INTEGER ); -# query II -# SELECT TABLE_F041_08_01_031.A, TABLE_F041_08_01_032.B FROM TABLE_F041_08_01_031 JOIN TABLE_F041_08_01_032 ON TABLE_F041_08_01_031.A <> TABLE_F041_08_01_032.B +query II +SELECT TABLE_F041_08_01_031.A, TABLE_F041_08_01_032.B FROM TABLE_F041_08_01_031 JOIN TABLE_F041_08_01_032 ON TABLE_F041_08_01_031.A <> TABLE_F041_08_01_032.B statement ok CREATE TABLE TABLE_F041_08_01_041 ( ID INT PRIMARY KEY, A INTEGER ); @@ -38,20 +36,20 @@ CREATE TABLE TABLE_F041_08_01_042 ( ID INT PRIMARY KEY, B INTEGER ); query II SELECT TABLE_F041_08_01_041.A, TABLE_F041_08_01_042.B FROM TABLE_F041_08_01_041 JOIN TABLE_F041_08_01_042 ON TABLE_F041_08_01_041.A = TABLE_F041_08_01_042.B -# statement ok -# CREATE TABLE TABLE_F041_08_01_051 ( ID INT PRIMARY KEY, A INTEGER ); +statement ok +CREATE TABLE TABLE_F041_08_01_051 ( ID INT PRIMARY KEY, A INTEGER ); -# statement ok -# CREATE TABLE TABLE_F041_08_01_052 ( ID INT PRIMARY KEY, B INTEGER ); +statement ok +CREATE TABLE TABLE_F041_08_01_052 ( ID INT PRIMARY KEY, B INTEGER ); -# query II -# SELECT TABLE_F041_08_01_051.A, TABLE_F041_08_01_052.B FROM TABLE_F041_08_01_051 JOIN TABLE_F041_08_01_052 ON TABLE_F041_08_01_051.A > TABLE_F041_08_01_052.B +query II +SELECT TABLE_F041_08_01_051.A, TABLE_F041_08_01_052.B FROM TABLE_F041_08_01_051 JOIN TABLE_F041_08_01_052 ON TABLE_F041_08_01_051.A > TABLE_F041_08_01_052.B -# statement ok -# CREATE TABLE TABLE_F041_08_01_061 ( ID INT PRIMARY KEY, A INTEGER ); +statement ok +CREATE TABLE TABLE_F041_08_01_061 ( ID INT PRIMARY KEY, A INTEGER ); -# statement ok -# CREATE TABLE TABLE_F041_08_01_062 ( ID INT PRIMARY KEY, B INTEGER ); +statement ok +CREATE TABLE TABLE_F041_08_01_062 ( ID INT PRIMARY KEY, B INTEGER ); -# query II -# SELECT TABLE_F041_08_01_061.A, TABLE_F041_08_01_062.B FROM TABLE_F041_08_01_061 JOIN TABLE_F041_08_01_062 ON TABLE_F041_08_01_061.A >= TABLE_F041_08_01_062.B +query II +SELECT TABLE_F041_08_01_061.A, TABLE_F041_08_01_062.B FROM TABLE_F041_08_01_061 JOIN TABLE_F041_08_01_062 ON TABLE_F041_08_01_061.A >= TABLE_F041_08_01_062.B diff --git a/tests/slt/subquery.slt b/tests/slt/subquery.slt index c6f394d1..a90fe587 100644 --- a/tests/slt/subquery.slt +++ b/tests/slt/subquery.slt @@ -34,11 +34,10 @@ select x.a from (select -a as a from t1) as x; -1 -3 -# TODO: NestLoopJoin -# query III -# select * from t1 where a <= (select 4) and a > (select 1) -# ---- -# 1 3 4 +query III +select * from t1 where a <= (select 4) and a > (select 1) +---- +1 3 4 # query III # select * from t1 where a <= (select 4) and (-a + 1) < (select 1) - 1