Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions examples/hello_world.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use fnck_sql::db::DataBaseBuilder;
use fnck_sql::errors::DatabaseError;
use fnck_sql::implement_from_tuple;
use fnck_sql::types::tuple::Tuple;
use fnck_sql::types::tuple::{SchemaRef, Tuple};
use fnck_sql::types::value::DataValue;
use fnck_sql::types::LogicalType;
use itertools::Itertools;
Expand Down Expand Up @@ -38,11 +38,10 @@ async fn main() -> Result<(), DatabaseError> {
let _ = database
.run("insert into my_struct values(0, 0), (1, 1)")
.await?;
let tuples = database
.run("select * from my_struct")
.await?
let (schema, tuples) = database.run("select * from my_struct").await?;
let tuples = tuples
.into_iter()
.map(MyStruct::from)
.map(|tuple| MyStruct::from((&schema, tuple)))
.collect_vec();

println!("{:#?}", tuples);
Expand Down
11 changes: 5 additions & 6 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use clap::Parser;
use fnck_sql::db::{DBTransaction, DataBaseBuilder, Database};
use fnck_sql::errors::DatabaseError;
use fnck_sql::storage::kip::KipStorage;
use fnck_sql::types::tuple::Tuple;
use fnck_sql::types::tuple::{Schema, Tuple};
use fnck_sql::types::LogicalType;
use futures::stream;
use log::{error, info, LevelFilter};
Expand Down Expand Up @@ -146,28 +146,27 @@ impl SimpleQueryHandler for SessionBackend {
_ => {
let mut guard = self.tx.lock().await;

let tuples = if let Some(transaction) = guard.as_mut() {
let (schema, tuples) = if let Some(transaction) = guard.as_mut() {
transaction.run(query).await
} else {
self.inner.run(query).await
}
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;

Ok(vec![Response::Query(encode_tuples(tuples)?)])
Ok(vec![Response::Query(encode_tuples(&schema, tuples)?)])
}
}
}
}

fn encode_tuples<'a>(tuples: Vec<Tuple>) -> PgWireResult<QueryResponse<'a>> {
fn encode_tuples<'a>(schema: &Schema, tuples: Vec<Tuple>) -> PgWireResult<QueryResponse<'a>> {
if tuples.is_empty() {
return Ok(QueryResponse::new(Arc::new(vec![]), stream::empty()));
}

let mut results = Vec::with_capacity(tuples.len());
let schema = Arc::new(
tuples[0]
.schema_ref
schema
.iter()
.map(|column| {
let pg_type = into_pg_type(column.datatype())?;
Expand Down
2 changes: 1 addition & 1 deletion src/binder/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
return Ok(());
}
if matches!(expr, ScalarExpression::Alias { .. }) {
return self.validate_having_orderby(expr.unpack_alias());
return self.validate_having_orderby(expr.unpack_alias_ref());
}

Err(DatabaseError::AggMiss(
Expand Down
86 changes: 62 additions & 24 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use crate::catalog::ColumnCatalog;
use crate::catalog::{ColumnCatalog, ColumnRef};
use crate::errors::DatabaseError;
use crate::expression;
use crate::expression::agg::AggKind;
use itertools::Itertools;
use sqlparser::ast::{
BinaryOperator, DataType, Expr, Function, FunctionArg, FunctionArgExpr, Ident, UnaryOperator,
BinaryOperator, DataType, Expr, Function, FunctionArg, FunctionArgExpr, Ident, Query,
UnaryOperator,
};
use std::slice;
use std::sync::Arc;

use super::{lower_ident, Binder, QueryBindStep};
use super::{lower_ident, Binder, QueryBindStep, SubQueryType};
use crate::expression::function::{FunctionSummary, ScalarFunction};
use crate::expression::{AliasType, ScalarExpression};
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::value::DataValue;
use crate::types::LogicalType;
Expand Down Expand Up @@ -99,33 +101,40 @@ impl<'a, T: Transaction> Binder<'a, T> {
from_expr,
})
}
Expr::Subquery(query) => {
let mut sub_query = self.bind_query(query)?;
let sub_query_schema = sub_query.output_schema();

if sub_query_schema.len() != 1 {
return Err(DatabaseError::MisMatch(
"expects only one expression to be returned",
"the expression returned by the subquery",
));
}
let column = sub_query_schema[0].clone();
self.context.sub_query(sub_query);
Expr::Subquery(subquery) => {
let (sub_query, column) = self.bind_subquery(subquery)?;
self.context.sub_query(SubQueryType::SubQuery(sub_query));

if self.context.is_step(&QueryBindStep::Where) {
let mut alias_column = ColumnCatalog::clone(&column);
alias_column.set_table_name(self.context.temp_table());

Ok(ScalarExpression::Alias {
expr: Box::new(ScalarExpression::ColumnRef(column)),
alias: AliasType::Expr(Box::new(ScalarExpression::ColumnRef(Arc::new(
alias_column,
)))),
})
Ok(self.bind_temp_column(column))
} else {
Ok(ScalarExpression::ColumnRef(column))
}
}
Expr::InSubquery {
expr,
subquery,
negated,
} => {
let (sub_query, column) = self.bind_subquery(subquery)?;
self.context
.sub_query(SubQueryType::InSubQuery(*negated, sub_query));

if !self.context.is_step(&QueryBindStep::Where) {
return Err(DatabaseError::UnsupportedStmt(
"`in subquery` can only appear in `Where`".to_string(),
));
}

let alias_expr = self.bind_temp_column(column);

Ok(ScalarExpression::Binary {
op: expression::BinaryOperator::Eq,
left_expr: Box::new(self.bind_expr(expr)?),
right_expr: Box::new(alias_expr),
ty: LogicalType::Boolean,
})
}
Expr::Tuple(exprs) => {
let mut bond_exprs = Vec::with_capacity(exprs.len());

Expand Down Expand Up @@ -187,6 +196,35 @@ impl<'a, T: Transaction> Binder<'a, T> {
}
}

fn bind_temp_column(&mut self, column: ColumnRef) -> ScalarExpression {
let mut alias_column = ColumnCatalog::clone(&column);
alias_column.set_table_name(self.context.temp_table());

ScalarExpression::Alias {
expr: Box::new(ScalarExpression::ColumnRef(column)),
alias: AliasType::Expr(Box::new(ScalarExpression::ColumnRef(Arc::new(
alias_column,
)))),
}
}

fn bind_subquery(
&mut self,
subquery: &Query,
) -> Result<(LogicalPlan, Arc<ColumnCatalog>), DatabaseError> {
let mut sub_query = self.bind_query(subquery)?;
let sub_query_schema = sub_query.output_schema();

if sub_query_schema.len() != 1 {
return Err(DatabaseError::MisMatch(
"expects only one expression to be returned",
"the expression returned by the subquery",
));
}
let column = sub_query_schema[0].clone();
Ok((sub_query, column))
}

pub fn bind_like(
&mut self,
negated: bool,
Expand Down
16 changes: 13 additions & 3 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ pub enum QueryBindStep {
Limit,
}

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum SubQueryType {
SubQuery(LogicalPlan),
InSubQuery(bool, LogicalPlan),
}

#[derive(Clone)]
pub struct BinderContext<'a, T: Transaction> {
functions: &'a Functions,
Expand All @@ -60,7 +66,7 @@ pub struct BinderContext<'a, T: Transaction> {
pub(crate) agg_calls: Vec<ScalarExpression>,

bind_step: QueryBindStep,
sub_queries: HashMap<QueryBindStep, Vec<LogicalPlan>>,
sub_queries: HashMap<QueryBindStep, Vec<SubQueryType>>,

temp_table_id: usize,
pub(crate) allow_default: bool,
Expand Down Expand Up @@ -96,14 +102,18 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
&self.bind_step == bind_step
}

pub fn sub_query(&mut self, sub_query: LogicalPlan) {
pub fn step_now(&self) -> QueryBindStep {
self.bind_step
}

pub fn sub_query(&mut self, sub_query: SubQueryType) {
self.sub_queries
.entry(self.bind_step)
.or_default()
.push(sub_query)
}

pub fn sub_queries_at_now(&mut self) -> Option<Vec<LogicalPlan>> {
pub fn sub_queries_at_now(&mut self) -> Option<Vec<SubQueryType>> {
self.sub_queries.remove(&self.bind_step)
}

Expand Down
Loading