Skip to content

Commit 5cb0c79

Browse files
committed
Planner: support LATERAL subqueries
1 parent cc60278 commit 5cb0c79

File tree

5 files changed

+180
-33
lines changed

5 files changed

+180
-33
lines changed

datafusion/sql/src/planner.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
//! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST)
1919
use std::collections::HashMap;
20+
use std::ops::Deref;
2021
use std::sync::Arc;
2122
use std::vec;
2223

2324
use arrow_schema::*;
2425
use datafusion_common::{
25-
field_not_found, internal_err, plan_datafusion_err, SchemaError,
26+
field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, SchemaError,
2627
};
2728
use datafusion_expr::WindowUDF;
2829
use sqlparser::ast::TimezoneInfo;
@@ -149,6 +150,9 @@ pub struct PlannerContext {
149150
ctes: HashMap<String, Arc<LogicalPlan>>,
150151
/// The query schema of the outer query plan, used to resolve the columns in subquery
151152
outer_query_schema: Option<DFSchema>,
153+
/// The joined schemas of all FROM clauses planned so far. When planning LATERAL
154+
/// FROM clauses, this should become a suffix of the `outer_query_schema`.
155+
outer_from_schema: Option<DFSchema>,
152156
}
153157

154158
impl Default for PlannerContext {
@@ -164,6 +168,7 @@ impl PlannerContext {
164168
prepare_param_data_types: vec![],
165169
ctes: HashMap::new(),
166170
outer_query_schema: None,
171+
outer_from_schema: None,
167172
}
168173
}
169174

@@ -191,6 +196,25 @@ impl PlannerContext {
191196
schema
192197
}
193198

199+
/// sets the FROM schema, returning the existing one, if
200+
/// any
201+
pub fn set_outer_from_schema(
202+
&mut self,
203+
mut schema: Option<DFSchema>,
204+
) -> Option<DFSchema> {
205+
std::mem::swap(&mut self.outer_from_schema, &mut schema);
206+
schema
207+
}
208+
209+
/// extends the FROM schema, returning the existing one, if any
210+
pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
211+
self.outer_from_schema = match self.outer_from_schema.as_ref() {
212+
Some(from_schema) => Some(from_schema.join(schema)?),
213+
None => Some(schema.deref().clone()),
214+
};
215+
Ok(())
216+
}
217+
194218
/// Return the types of parameters (`$1`, `$2`, etc) if known
195219
pub fn prepare_param_data_types(&self) -> &[DataType] {
196220
&self.prepare_param_data_types

datafusion/sql/src/relation/join.rs

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
1919
use datafusion_common::{not_impl_err, Column, Result};
2020
use datafusion_expr::{JoinType, LogicalPlan, LogicalPlanBuilder};
21-
use sqlparser::ast::{Join, JoinConstraint, JoinOperator, TableWithJoins};
21+
use sqlparser::ast::{Join, JoinConstraint, JoinOperator, TableFactor, TableWithJoins};
2222
use std::collections::HashSet;
2323

2424
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
@@ -33,28 +33,20 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
3333
// So always use original global CTEs to plan CTEs in from clause.
3434
// Btw, don't need to add CTEs in from to global CTEs.
3535
let origin_planner_context = planner_context.clone();
36-
let left = self.create_relation(t.relation, planner_context)?;
37-
match t.joins.len() {
38-
0 => {
39-
*planner_context = origin_planner_context;
40-
Ok(left)
41-
}
42-
_ => {
43-
let mut joins = t.joins.into_iter();
36+
let mut left = if is_lateral_factor(&t.relation) {
37+
self.create_relation_subquery(t.relation, planner_context)?
38+
} else {
39+
self.create_relation(t.relation, planner_context)?
40+
};
41+
if !t.joins.is_empty() {
42+
for join in t.joins {
4443
*planner_context = origin_planner_context.clone();
45-
let mut left = self.parse_relation_join(
46-
left,
47-
joins.next().unwrap(), // length of joins > 0
48-
planner_context,
49-
)?;
50-
for join in joins {
51-
*planner_context = origin_planner_context.clone();
52-
left = self.parse_relation_join(left, join, planner_context)?;
53-
}
54-
*planner_context = origin_planner_context;
55-
Ok(left)
44+
planner_context.extend_outer_from_schema(left.schema())?;
45+
left = self.parse_relation_join(left, join, planner_context)?;
5646
}
5747
}
48+
*planner_context = origin_planner_context;
49+
Ok(left)
5850
}
5951

6052
fn parse_relation_join(
@@ -63,7 +55,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
6355
join: Join,
6456
planner_context: &mut PlannerContext,
6557
) -> Result<LogicalPlan> {
66-
let right = self.create_relation(join.relation, planner_context)?;
58+
let right = if is_lateral_join(&join)? {
59+
self.create_relation_subquery(join.relation, planner_context)?
60+
} else {
61+
self.create_relation(join.relation, planner_context)?
62+
};
6763
match join.join_operator {
6864
JoinOperator::LeftOuter(constraint) => {
6965
self.parse_join(left, right, constraint, JoinType::Left, planner_context)
@@ -167,3 +163,30 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
167163
}
168164
}
169165
}
166+
167+
/// Return `true` iff the given [`TableFactor`] is lateral.
168+
pub(crate) fn is_lateral_factor(factor: &TableFactor) -> bool {
169+
match factor {
170+
TableFactor::Derived { lateral, .. } => *lateral,
171+
TableFactor::Function { lateral, .. } => *lateral,
172+
_ => false,
173+
}
174+
}
175+
176+
/// Return `true` iff the given [`Join`] is lateral.
177+
pub(crate) fn is_lateral_join(join: &Join) -> Result<bool> {
178+
let is_lateral_syntax = is_lateral_factor(&join.relation);
179+
let is_apply_syntax = match join.join_operator {
180+
JoinOperator::FullOuter(..)
181+
| JoinOperator::RightOuter(..)
182+
| JoinOperator::RightAnti(..)
183+
| JoinOperator::RightSemi(..)
184+
if is_lateral_syntax =>
185+
{
186+
return not_impl_err!("NONE constraint is not supported");
187+
}
188+
JoinOperator::CrossApply | JoinOperator::OuterApply => true,
189+
_ => false,
190+
};
191+
Ok(is_lateral_syntax || is_apply_syntax)
192+
}

datafusion/sql/src/relation/mod.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::sync::Arc;
19+
1820
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
1921
use datafusion_common::{not_impl_err, plan_err, DFSchema, Result, TableReference};
22+
use datafusion_expr::builder::subquery_alias;
2023
use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder};
24+
use datafusion_expr::{Subquery, SubqueryAlias};
2125
use sqlparser::ast::{FunctionArg, FunctionArgExpr, TableFactor};
2226

2327
mod join;
@@ -143,4 +147,39 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
143147
Ok(plan)
144148
}
145149
}
150+
151+
pub(crate) fn create_relation_subquery(
152+
&self,
153+
subquery: TableFactor,
154+
planner_context: &mut PlannerContext,
155+
) -> Result<LogicalPlan> {
156+
let old_from_sch = planner_context.set_outer_from_schema(None).unwrap(); // set in plan_table_with_joins
157+
let new_outer_sch = match planner_context.outer_query_schema() {
158+
Some(lhs) => Some(lhs.join(&old_from_sch)?),
159+
None => Some(old_from_sch.clone()),
160+
};
161+
let old_outer_sch = planner_context.set_outer_query_schema(new_outer_sch);
162+
163+
let plan = self.create_relation(subquery, planner_context)?;
164+
let outer_ref_columns = plan.all_out_ref_exprs();
165+
166+
planner_context.set_outer_query_schema(old_outer_sch);
167+
planner_context.set_outer_from_schema(Some(old_from_sch));
168+
169+
match plan {
170+
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
171+
subquery_alias(
172+
LogicalPlan::Subquery(Subquery {
173+
subquery: input,
174+
outer_ref_columns,
175+
}),
176+
alias,
177+
)
178+
}
179+
plan => Ok(LogicalPlan::Subquery(Subquery {
180+
subquery: Arc::new(plan),
181+
outer_ref_columns,
182+
})),
183+
}
184+
}
146185
}

datafusion/sql/src/select.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::utils::{
2525
resolve_positions_to_exprs,
2626
};
2727

28-
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
28+
use datafusion_common::{not_impl_err, plan_err, DFSchema, DataFusionError, Result};
2929
use datafusion_common::{Column, UnnestOptions};
3030
use datafusion_expr::expr::Alias;
3131
use datafusion_expr::expr_rewriter::{
@@ -380,19 +380,30 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
380380
match from.len() {
381381
0 => Ok(LogicalPlanBuilder::empty(true).build()?),
382382
1 => {
383-
let from = from.remove(0);
384-
self.plan_table_with_joins(from, planner_context)
383+
let input = from.remove(0);
384+
self.plan_table_with_joins(input, planner_context)
385385
}
386386
_ => {
387-
let mut plans = from
388-
.into_iter()
389-
.map(|t| self.plan_table_with_joins(t, planner_context));
390-
391-
let mut left = LogicalPlanBuilder::from(plans.next().unwrap()?);
392-
393-
for right in plans {
394-
left = left.cross_join(right?)?;
387+
let mut from = from.into_iter();
388+
389+
let mut left = LogicalPlanBuilder::from({
390+
let input = from.next().unwrap();
391+
self.plan_table_with_joins(input, planner_context)?
392+
});
393+
let old_outer_from_schema = {
394+
let left_schema = Some(DFSchema::clone(left.schema()));
395+
planner_context.set_outer_from_schema(left_schema)
396+
};
397+
for input in from {
398+
// Join `input` with the current result (`left`).
399+
let right = self.plan_table_with_joins(input, planner_context)?;
400+
left = left.cross_join(right)?;
401+
// Update the outer FROM schema.
402+
let left_schema = Some(DFSchema::clone(left.schema()));
403+
planner_context.set_outer_from_schema(left_schema);
395404
}
405+
planner_context.set_outer_from_schema(old_outer_from_schema);
406+
396407
Ok(left.build()?)
397408
}
398409
}

datafusion/sql/tests/sql_integration.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3188,6 +3188,56 @@ fn join_on_complex_condition() {
31883188
quick_test(sql, expected);
31893189
}
31903190

3191+
#[test]
3192+
fn lateral_comma_join() {
3193+
let sql = "SELECT j1_string, j2_string FROM
3194+
j1, \
3195+
LATERAL (SELECT * FROM j2 WHERE j1_id < j2_id) AS j2";
3196+
let expected = "Projection: j1.j1_string, j2.j2_string\
3197+
\n CrossJoin:\
3198+
\n TableScan: j1\
3199+
\n SubqueryAlias: j2\
3200+
\n Subquery:\
3201+
\n Projection: j2.j2_id, j2.j2_string\
3202+
\n Filter: outer_ref(j1.j1_id) < j2.j2_id\
3203+
\n TableScan: j2";
3204+
quick_test(sql, expected);
3205+
}
3206+
3207+
#[test]
3208+
fn lateral_left_join() {
3209+
let sql = "SELECT j1_string, j2_string FROM
3210+
j1 \
3211+
LEFT JOIN LATERAL (SELECT * FROM j2 WHERE j1_id < j2_id) AS j2 ON(true);";
3212+
let expected = "Projection: j1.j1_string, j2.j2_string\
3213+
\n Left Join: Filter: Boolean(true)\
3214+
\n TableScan: j1\
3215+
\n SubqueryAlias: j2\
3216+
\n Subquery:\
3217+
\n Projection: j2.j2_id, j2.j2_string\
3218+
\n Filter: outer_ref(j1.j1_id) < j2.j2_id\
3219+
\n TableScan: j2";
3220+
quick_test(sql, expected);
3221+
}
3222+
3223+
#[test]
3224+
fn lateral_nested_left_join() {
3225+
let sql = "SELECT * FROM
3226+
j1, \
3227+
(j2 LEFT JOIN LATERAL (SELECT * FROM j3 WHERE j1_id + j2_id = j3_id) AS j3 ON(true))";
3228+
let expected = "Projection: j1.j1_id, j1.j1_string, j2.j2_id, j2.j2_string, j3.j3_id, j3.j3_string\
3229+
\n CrossJoin:\
3230+
\n TableScan: j1\
3231+
\n Left Join: Filter: Boolean(true)\
3232+
\n TableScan: j2\
3233+
\n SubqueryAlias: j3\
3234+
\n Subquery:\
3235+
\n Projection: j3.j3_id, j3.j3_string\
3236+
\n Filter: outer_ref(j1.j1_id) + outer_ref(j2.j2_id) = j3.j3_id\
3237+
\n TableScan: j3";
3238+
quick_test(sql, expected);
3239+
}
3240+
31913241
#[test]
31923242
fn hive_aggregate_with_filter() -> Result<()> {
31933243
let dialect = &HiveDialect {};

0 commit comments

Comments
 (0)