Skip to content

Commit 5778712

Browse files
authored
Support Subquery on WHERE with IN/Not IN (#147)
* invalid: fix index's `tuple_ids` -> `tuple_id` * refactor: remove Tuple‘s Schema * fix: `TestFunction` * feat: support `LeftSemi` & `LeftAnti` * feat: support Subquery on `WHERE` with `IN/Not IN` * style: remove `Transaction::drop_column`'s `if_not_exists` * test: more case for `insubquery` * code fmt
1 parent e8fc5cd commit 5778712

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+796
-1008
lines changed

examples/hello_world.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use fnck_sql::db::DataBaseBuilder;
22
use fnck_sql::errors::DatabaseError;
33
use fnck_sql::implement_from_tuple;
4-
use fnck_sql::types::tuple::Tuple;
4+
use fnck_sql::types::tuple::{SchemaRef, Tuple};
55
use fnck_sql::types::value::DataValue;
66
use fnck_sql::types::LogicalType;
77
use itertools::Itertools;
@@ -38,11 +38,10 @@ async fn main() -> Result<(), DatabaseError> {
3838
let _ = database
3939
.run("insert into my_struct values(0, 0), (1, 1)")
4040
.await?;
41-
let tuples = database
42-
.run("select * from my_struct")
43-
.await?
41+
let (schema, tuples) = database.run("select * from my_struct").await?;
42+
let tuples = tuples
4443
.into_iter()
45-
.map(MyStruct::from)
44+
.map(|tuple| MyStruct::from((&schema, tuple)))
4645
.collect_vec();
4746

4847
println!("{:#?}", tuples);

src/bin/server.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use clap::Parser;
33
use fnck_sql::db::{DBTransaction, DataBaseBuilder, Database};
44
use fnck_sql::errors::DatabaseError;
55
use fnck_sql::storage::kip::KipStorage;
6-
use fnck_sql::types::tuple::Tuple;
6+
use fnck_sql::types::tuple::{Schema, Tuple};
77
use fnck_sql::types::LogicalType;
88
use futures::stream;
99
use log::{error, info, LevelFilter};
@@ -146,28 +146,27 @@ impl SimpleQueryHandler for SessionBackend {
146146
_ => {
147147
let mut guard = self.tx.lock().await;
148148

149-
let tuples = if let Some(transaction) = guard.as_mut() {
149+
let (schema, tuples) = if let Some(transaction) = guard.as_mut() {
150150
transaction.run(query).await
151151
} else {
152152
self.inner.run(query).await
153153
}
154154
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
155155

156-
Ok(vec![Response::Query(encode_tuples(tuples)?)])
156+
Ok(vec![Response::Query(encode_tuples(&schema, tuples)?)])
157157
}
158158
}
159159
}
160160
}
161161

162-
fn encode_tuples<'a>(tuples: Vec<Tuple>) -> PgWireResult<QueryResponse<'a>> {
162+
fn encode_tuples<'a>(schema: &Schema, tuples: Vec<Tuple>) -> PgWireResult<QueryResponse<'a>> {
163163
if tuples.is_empty() {
164164
return Ok(QueryResponse::new(Arc::new(vec![]), stream::empty()));
165165
}
166166

167167
let mut results = Vec::with_capacity(tuples.len());
168168
let schema = Arc::new(
169-
tuples[0]
170-
.schema_ref
169+
schema
171170
.iter()
172171
.map(|column| {
173172
let pg_type = into_pg_type(column.datatype())?;

src/binder/aggregate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
301301
return Ok(());
302302
}
303303
if matches!(expr, ScalarExpression::Alias { .. }) {
304-
return self.validate_having_orderby(expr.unpack_alias());
304+
return self.validate_having_orderby(expr.unpack_alias_ref());
305305
}
306306

307307
Err(DatabaseError::AggMiss(

src/binder/expr.rs

Lines changed: 62 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
1-
use crate::catalog::ColumnCatalog;
1+
use crate::catalog::{ColumnCatalog, ColumnRef};
22
use crate::errors::DatabaseError;
33
use crate::expression;
44
use crate::expression::agg::AggKind;
55
use itertools::Itertools;
66
use sqlparser::ast::{
7-
BinaryOperator, DataType, Expr, Function, FunctionArg, FunctionArgExpr, Ident, UnaryOperator,
7+
BinaryOperator, DataType, Expr, Function, FunctionArg, FunctionArgExpr, Ident, Query,
8+
UnaryOperator,
89
};
910
use std::slice;
1011
use std::sync::Arc;
1112

12-
use super::{lower_ident, Binder, QueryBindStep};
13+
use super::{lower_ident, Binder, QueryBindStep, SubQueryType};
1314
use crate::expression::function::{FunctionSummary, ScalarFunction};
1415
use crate::expression::{AliasType, ScalarExpression};
16+
use crate::planner::LogicalPlan;
1517
use crate::storage::Transaction;
1618
use crate::types::value::DataValue;
1719
use crate::types::LogicalType;
@@ -99,33 +101,40 @@ impl<'a, T: Transaction> Binder<'a, T> {
99101
from_expr,
100102
})
101103
}
102-
Expr::Subquery(query) => {
103-
let mut sub_query = self.bind_query(query)?;
104-
let sub_query_schema = sub_query.output_schema();
105-
106-
if sub_query_schema.len() != 1 {
107-
return Err(DatabaseError::MisMatch(
108-
"expects only one expression to be returned",
109-
"the expression returned by the subquery",
110-
));
111-
}
112-
let column = sub_query_schema[0].clone();
113-
self.context.sub_query(sub_query);
104+
Expr::Subquery(subquery) => {
105+
let (sub_query, column) = self.bind_subquery(subquery)?;
106+
self.context.sub_query(SubQueryType::SubQuery(sub_query));
114107

115108
if self.context.is_step(&QueryBindStep::Where) {
116-
let mut alias_column = ColumnCatalog::clone(&column);
117-
alias_column.set_table_name(self.context.temp_table());
118-
119-
Ok(ScalarExpression::Alias {
120-
expr: Box::new(ScalarExpression::ColumnRef(column)),
121-
alias: AliasType::Expr(Box::new(ScalarExpression::ColumnRef(Arc::new(
122-
alias_column,
123-
)))),
124-
})
109+
Ok(self.bind_temp_column(column))
125110
} else {
126111
Ok(ScalarExpression::ColumnRef(column))
127112
}
128113
}
114+
Expr::InSubquery {
115+
expr,
116+
subquery,
117+
negated,
118+
} => {
119+
let (sub_query, column) = self.bind_subquery(subquery)?;
120+
self.context
121+
.sub_query(SubQueryType::InSubQuery(*negated, sub_query));
122+
123+
if !self.context.is_step(&QueryBindStep::Where) {
124+
return Err(DatabaseError::UnsupportedStmt(
125+
"`in subquery` can only appear in `Where`".to_string(),
126+
));
127+
}
128+
129+
let alias_expr = self.bind_temp_column(column);
130+
131+
Ok(ScalarExpression::Binary {
132+
op: expression::BinaryOperator::Eq,
133+
left_expr: Box::new(self.bind_expr(expr)?),
134+
right_expr: Box::new(alias_expr),
135+
ty: LogicalType::Boolean,
136+
})
137+
}
129138
Expr::Tuple(exprs) => {
130139
let mut bond_exprs = Vec::with_capacity(exprs.len());
131140

@@ -187,6 +196,35 @@ impl<'a, T: Transaction> Binder<'a, T> {
187196
}
188197
}
189198

199+
fn bind_temp_column(&mut self, column: ColumnRef) -> ScalarExpression {
200+
let mut alias_column = ColumnCatalog::clone(&column);
201+
alias_column.set_table_name(self.context.temp_table());
202+
203+
ScalarExpression::Alias {
204+
expr: Box::new(ScalarExpression::ColumnRef(column)),
205+
alias: AliasType::Expr(Box::new(ScalarExpression::ColumnRef(Arc::new(
206+
alias_column,
207+
)))),
208+
}
209+
}
210+
211+
fn bind_subquery(
212+
&mut self,
213+
subquery: &Query,
214+
) -> Result<(LogicalPlan, Arc<ColumnCatalog>), DatabaseError> {
215+
let mut sub_query = self.bind_query(subquery)?;
216+
let sub_query_schema = sub_query.output_schema();
217+
218+
if sub_query_schema.len() != 1 {
219+
return Err(DatabaseError::MisMatch(
220+
"expects only one expression to be returned",
221+
"the expression returned by the subquery",
222+
));
223+
}
224+
let column = sub_query_schema[0].clone();
225+
Ok((sub_query, column))
226+
}
227+
190228
pub fn bind_like(
191229
&mut self,
192230
negated: bool,

src/binder/mod.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ pub enum QueryBindStep {
4646
Limit,
4747
}
4848

49+
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
50+
pub enum SubQueryType {
51+
SubQuery(LogicalPlan),
52+
InSubQuery(bool, LogicalPlan),
53+
}
54+
4955
#[derive(Clone)]
5056
pub struct BinderContext<'a, T: Transaction> {
5157
functions: &'a Functions,
@@ -60,7 +66,7 @@ pub struct BinderContext<'a, T: Transaction> {
6066
pub(crate) agg_calls: Vec<ScalarExpression>,
6167

6268
bind_step: QueryBindStep,
63-
sub_queries: HashMap<QueryBindStep, Vec<LogicalPlan>>,
69+
sub_queries: HashMap<QueryBindStep, Vec<SubQueryType>>,
6470

6571
temp_table_id: usize,
6672
pub(crate) allow_default: bool,
@@ -96,14 +102,18 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
96102
&self.bind_step == bind_step
97103
}
98104

99-
pub fn sub_query(&mut self, sub_query: LogicalPlan) {
105+
pub fn step_now(&self) -> QueryBindStep {
106+
self.bind_step
107+
}
108+
109+
pub fn sub_query(&mut self, sub_query: SubQueryType) {
100110
self.sub_queries
101111
.entry(self.bind_step)
102112
.or_default()
103113
.push(sub_query)
104114
}
105115

106-
pub fn sub_queries_at_now(&mut self) -> Option<Vec<LogicalPlan>> {
116+
pub fn sub_queries_at_now(&mut self) -> Option<Vec<SubQueryType>> {
107117
self.sub_queries.remove(&self.bind_step)
108118
}
109119

0 commit comments

Comments
 (0)