|
| 1 | +// Copyright 2021 Datafuse Labs |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +// you may not use this file except in compliance with the License. |
| 5 | +// You may obtain a copy of the License at |
| 6 | +// |
| 7 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +// |
| 9 | +// Unless required by applicable law or agreed to in writing, software |
| 10 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +// See the License for the specific language governing permissions and |
| 13 | +// limitations under the License. |
| 14 | + |
| 15 | +use std::sync::Arc; |
| 16 | + |
| 17 | +use databend_common_ast::ast::Engine; |
| 18 | +use databend_common_ast::parser::parse_sql; |
| 19 | +use databend_common_ast::parser::tokenize_sql; |
| 20 | +use databend_common_ast::parser::Dialect; |
| 21 | +use databend_common_catalog::catalog::CatalogManager; |
| 22 | +use databend_common_catalog::table_context::TableContext; |
| 23 | +use databend_common_exception::Result; |
| 24 | +use databend_common_expression::types::NumberDataType; |
| 25 | +use databend_common_expression::TableDataType; |
| 26 | +use databend_common_expression::TableField; |
| 27 | +use databend_common_expression::TableSchemaRefExt; |
| 28 | +use databend_common_meta_app::schema::CreateOption; |
| 29 | +use databend_common_sql::optimizer::SExpr; |
| 30 | +use databend_common_sql::optimizer::SubqueryRewriter; |
| 31 | +use databend_common_sql::plans::CreateTablePlan; |
| 32 | +use databend_common_sql::plans::Plan; |
| 33 | +use databend_common_sql::Binder; |
| 34 | +use databend_common_sql::Metadata; |
| 35 | +use databend_common_sql::MetadataRef; |
| 36 | +use databend_common_sql::NameResolutionContext; |
| 37 | +use databend_query::interpreters::CreateTableInterpreter; |
| 38 | +use databend_query::interpreters::Interpreter; |
| 39 | +use databend_query::test_kits::TestFixture; |
| 40 | +use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID; |
| 41 | +use databend_storages_common_table_meta::table::OPT_KEY_STORAGE_FORMAT; |
| 42 | +use parking_lot::RwLock; |
| 43 | + |
| 44 | +struct TestSuite { |
| 45 | + name: &'static str, |
| 46 | + // Test cases |
| 47 | + query: &'static str, |
| 48 | + // Expected results |
| 49 | + explain: &'static str, |
| 50 | +} |
| 51 | + |
| 52 | +fn get_test_suites() -> Vec<TestSuite> { |
| 53 | + vec![ |
| 54 | + // Elimination Successful |
| 55 | + TestSuite { |
| 56 | + name: "base", |
| 57 | + query: "SELECT t3.* FROM t1 t3 WHERE t3.b IN (SELECT t4.b FROM t1 t4 WHERE t4.a = 5);", |
| 58 | + explain: "EvalScalar |
| 59 | +├── scalars: [t3.a (#0) AS (#0), t3.b (#1) AS (#1), t3.c (#2) AS (#2)] |
| 60 | +└── Filter |
| 61 | + ├── filters: [and(true, eq(t1.a (#0), 5))] |
| 62 | + └── Scan |
| 63 | + ├── table: default.t1 |
| 64 | + ├── filters: [] |
| 65 | + ├── order by: [] |
| 66 | + └── limit: NONE\n", |
| 67 | + }, |
| 68 | + TestSuite { |
| 69 | + name: "filter merge", |
| 70 | + query: "select t3.* from t1 as t3 where t3.c = 'D' and t3.b in (select t4.b from t1 as t4 where t4.a = 7);", |
| 71 | + explain: "EvalScalar |
| 72 | +├── scalars: [t3.a (#0) AS (#0), t3.b (#1) AS (#1), t3.c (#2) AS (#2)] |
| 73 | +└── Filter |
| 74 | + ├── filters: [eq(t3.c (#2), 'D'), and(true, eq(t1.a (#0), 7))] |
| 75 | + └── Scan |
| 76 | + ├── table: default.t1 |
| 77 | + ├── filters: [] |
| 78 | + ├── order by: [] |
| 79 | + └── limit: NONE\n", |
| 80 | + }, |
| 81 | + TestSuite { |
| 82 | + name: "order by exists in subquery", |
| 83 | + query: "SELECT t3.* FROM t1 t3 WHERE t3.b IN (SELECT t4.b FROM t1 t4 WHERE t4.a > 10 ORDER BY t4.c);", |
| 84 | + explain: "EvalScalar |
| 85 | +├── scalars: [t3.a (#0) AS (#0), t3.b (#1) AS (#1), t3.c (#2) AS (#2)] |
| 86 | +└── Filter |
| 87 | + ├── filters: [and(true, gt(t1.a (#0), 10))] |
| 88 | + └── Scan |
| 89 | + ├── table: default.t1 |
| 90 | + ├── filters: [] |
| 91 | + ├── order by: [] |
| 92 | + └── limit: NONE\n", |
| 93 | + }, |
| 94 | + TestSuite { |
| 95 | + name: "complex predicate(and & or)", |
| 96 | + query: "SELECT t3.* FROM t1 t3 WHERE t3.c = 'X' AND t3.b IN (SELECT t4.b FROM t1 t4 WHERE t4.a = 3 OR t4.c = 'Y');", |
| 97 | + explain: "EvalScalar |
| 98 | +├── scalars: [t3.a (#0) AS (#0), t3.b (#1) AS (#1), t3.c (#2) AS (#2)] |
| 99 | +└── Filter |
| 100 | + ├── filters: [eq(t3.c (#2), 'X'), and(true, or(eq(t1.a (#0), 3), eq(t1.c (#2), 'Y')))] |
| 101 | + └── Scan |
| 102 | + ├── table: default.t1 |
| 103 | + ├── filters: [] |
| 104 | + ├── order by: [] |
| 105 | + └── limit: NONE\n", |
| 106 | + }, |
| 107 | + TestSuite { |
| 108 | + name: "complex predicate(between & is not null)", |
| 109 | + query: "SELECT t3.* FROM t1 t3 WHERE t3.b IN (SELECT t4.b FROM t1 t4 WHERE t4.a BETWEEN 5 AND 10 AND t4.c IS NOT NULL);", |
| 110 | + explain: "EvalScalar |
| 111 | +├── scalars: [t3.a (#0) AS (#0), t3.b (#1) AS (#1), t3.c (#2) AS (#2)] |
| 112 | +└── Filter |
| 113 | + ├── filters: [and(and(and(true, gte(t1.a (#0), 5)), lte(t1.a (#0), 10)), true)] |
| 114 | + └── Scan |
| 115 | + ├── table: default.t1 |
| 116 | + ├── filters: [] |
| 117 | + ├── order by: [] |
| 118 | + └── limit: NONE\n", |
| 119 | + }, |
| 120 | + |
| 121 | + // Eliminate Failure |
| 122 | + TestSuite { |
| 123 | + name: "join in the main query", |
| 124 | + query: "SELECT t3.* FROM t1 t3 JOIN t2 ON t3.a = t2.a WHERE t3.b IN (SELECT t4.b FROM t1 t4 WHERE t4.a = 5);", |
| 125 | + explain: "EvalScalar |
| 126 | +├── scalars: [t3.a (#0) AS (#0), t3.b (#1) AS (#1), t3.c (#2) AS (#2)] |
| 127 | +└── Filter |
| 128 | + ├── filters: [9 (#9)] |
| 129 | + └── Join(RightMark) |
| 130 | + ├── build keys: [subquery_7 (#7)] |
| 131 | + ├── probe keys: [t3.b (#1)] |
| 132 | + ├── other filters: [] |
| 133 | + ├── Join(Inner) |
| 134 | + │ ├── build keys: [t2.a (#3)] |
| 135 | + │ ├── probe keys: [t3.a (#0)] |
| 136 | + │ ├── other filters: [] |
| 137 | + │ ├── Scan |
| 138 | + │ │ ├── table: default.t1 |
| 139 | + │ │ ├── filters: [] |
| 140 | + │ │ ├── order by: [] |
| 141 | + │ │ └── limit: NONE |
| 142 | + │ └── Scan |
| 143 | + │ ├── table: default.t2 |
| 144 | + │ ├── filters: [] |
| 145 | + │ ├── order by: [] |
| 146 | + │ └── limit: NONE |
| 147 | + └── EvalScalar |
| 148 | + ├── scalars: [t4.b (#7) AS (#7)] |
| 149 | + └── Filter |
| 150 | + ├── filters: [eq(t4.a (#6), 5)] |
| 151 | + └── Scan |
| 152 | + ├── table: default.t1 |
| 153 | + ├── filters: [] |
| 154 | + ├── order by: [] |
| 155 | + └── limit: NONE\n", |
| 156 | + }, |
| 157 | + TestSuite { |
| 158 | + name: "group by in the subquery", |
| 159 | + query: "SELECT t3.* FROM t1 t3 WHERE t3.b IN (SELECT t4.b FROM t1 t4 GROUP BY t4.b HAVING COUNT(*) > 1);", |
| 160 | + explain: "EvalScalar |
| 161 | +├── scalars: [t3.a (#0) AS (#0), t3.b (#1) AS (#1), t3.c (#2) AS (#2)] |
| 162 | +└── Filter |
| 163 | + ├── filters: [7 (#7)] |
| 164 | + └── Join(RightMark) |
| 165 | + ├── build keys: [subquery_4 (#4)] |
| 166 | + ├── probe keys: [t3.b (#1)] |
| 167 | + ├── other filters: [] |
| 168 | + ├── Scan |
| 169 | + │ ├── table: default.t1 |
| 170 | + │ ├── filters: [] |
| 171 | + │ ├── order by: [] |
| 172 | + │ └── limit: NONE |
| 173 | + └── EvalScalar |
| 174 | + ├── scalars: [t4.b (#4) AS (#4)] |
| 175 | + └── Filter |
| 176 | + ├── filters: [gt(COUNT(*) (#6), 1)] |
| 177 | + └── Aggregate(Initial) |
| 178 | + ├── group items: [t4.b (#4)] |
| 179 | + ├── aggregate functions: [COUNT(*) (#6)] |
| 180 | + └── EvalScalar |
| 181 | + ├── scalars: [t4.b (#4) AS (#4)] |
| 182 | + └── Scan |
| 183 | + ├── table: default.t1 |
| 184 | + ├── filters: [] |
| 185 | + ├── order by: [] |
| 186 | + └── limit: NONE\n", |
| 187 | + }, |
| 188 | + TestSuite { |
| 189 | + name: "different tables", |
| 190 | + query: "SELECT t3.* FROM t1 t3 WHERE t3.b IN (SELECT t2.b FROM t2 WHERE t2.a = 3);", |
| 191 | + explain: "EvalScalar |
| 192 | +├── scalars: [t3.a (#0) AS (#0), t3.b (#1) AS (#1), t3.c (#2) AS (#2)] |
| 193 | +└── Filter |
| 194 | + ├── filters: [6 (#6)] |
| 195 | + └── Join(RightMark) |
| 196 | + ├── build keys: [subquery_4 (#4)] |
| 197 | + ├── probe keys: [t3.b (#1)] |
| 198 | + ├── other filters: [] |
| 199 | + ├── Scan |
| 200 | + │ ├── table: default.t1 |
| 201 | + │ ├── filters: [] |
| 202 | + │ ├── order by: [] |
| 203 | + │ └── limit: NONE |
| 204 | + └── EvalScalar |
| 205 | + ├── scalars: [t2.b (#4) AS (#4)] |
| 206 | + └── Filter |
| 207 | + ├── filters: [eq(t2.a (#3), 3)] |
| 208 | + └── Scan |
| 209 | + ├── table: default.t2 |
| 210 | + ├── filters: [] |
| 211 | + ├── order by: [] |
| 212 | + └── limit: NONE\n", |
| 213 | + }, |
| 214 | + TestSuite { |
| 215 | + name: "limit in the subquery", |
| 216 | + query: "SELECT t3.* FROM t1 t3 WHERE t3.b IN (SELECT t4.b FROM t1 t4 WHERE t4.a = 5 LIMIT 1);", |
| 217 | + explain: "EvalScalar |
| 218 | +├── scalars: [t3.a (#0) AS (#0), t3.b (#1) AS (#1), t3.c (#2) AS (#2)] |
| 219 | +└── Filter |
| 220 | + ├── filters: [6 (#6)] |
| 221 | + └── Join(RightMark) |
| 222 | + ├── build keys: [subquery_4 (#4)] |
| 223 | + ├── probe keys: [t3.b (#1)] |
| 224 | + ├── other filters: [] |
| 225 | + ├── Scan |
| 226 | + │ ├── table: default.t1 |
| 227 | + │ ├── filters: [] |
| 228 | + │ ├── order by: [] |
| 229 | + │ └── limit: NONE |
| 230 | + └── Limit |
| 231 | + ├── limit: [1] |
| 232 | + ├── offset: [0] |
| 233 | + └── EvalScalar |
| 234 | + ├── scalars: [t4.b (#4) AS (#4)] |
| 235 | + └── Filter |
| 236 | + ├── filters: [eq(t4.a (#3), 5)] |
| 237 | + └── Scan |
| 238 | + ├── table: default.t1 |
| 239 | + ├── filters: [] |
| 240 | + ├── order by: [] |
| 241 | + └── limit: NONE\n", |
| 242 | + }, |
| 243 | + TestSuite { |
| 244 | + name: "join in the subquery", |
| 245 | + query: "SELECT t3.* FROM t1 t3 WHERE t3.b IN (SELECT t4.b FROM t1 t4 JOIN t2 ON t4.a = t2.a);", |
| 246 | + explain: "EvalScalar |
| 247 | +├── scalars: [t3.a (#0) AS (#0), t3.b (#1) AS (#1), t3.c (#2) AS (#2)] |
| 248 | +└── Filter |
| 249 | + ├── filters: [9 (#9)] |
| 250 | + └── Join(RightMark) |
| 251 | + ├── build keys: [subquery_4 (#4)] |
| 252 | + ├── probe keys: [t3.b (#1)] |
| 253 | + ├── other filters: [] |
| 254 | + ├── Scan |
| 255 | + │ ├── table: default.t1 |
| 256 | + │ ├── filters: [] |
| 257 | + │ ├── order by: [] |
| 258 | + │ └── limit: NONE |
| 259 | + └── EvalScalar |
| 260 | + ├── scalars: [t4.b (#4) AS (#4)] |
| 261 | + └── Join(Inner) |
| 262 | + ├── build keys: [t2.a (#6)] |
| 263 | + ├── probe keys: [t4.a (#3)] |
| 264 | + ├── other filters: [] |
| 265 | + ├── Scan |
| 266 | + │ ├── table: default.t1 |
| 267 | + │ ├── filters: [] |
| 268 | + │ ├── order by: [] |
| 269 | + │ └── limit: NONE |
| 270 | + └── Scan |
| 271 | + ├── table: default.t2 |
| 272 | + ├── filters: [] |
| 273 | + ├── order by: [] |
| 274 | + └── limit: NONE\n", |
| 275 | + }, |
| 276 | + ] |
| 277 | +} |
| 278 | + |
| 279 | +fn create_table_plan(fixture: &TestFixture, format: &str) -> Vec<CreateTablePlan> { |
| 280 | + vec![ |
| 281 | + CreateTablePlan { |
| 282 | + create_option: CreateOption::Create, |
| 283 | + tenant: fixture.default_tenant(), |
| 284 | + catalog: fixture.default_catalog_name(), |
| 285 | + database: "default".to_string(), |
| 286 | + table: "t1".to_string(), |
| 287 | + schema: TableSchemaRefExt::create(vec![ |
| 288 | + TableField::new( |
| 289 | + "a", |
| 290 | + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::Int32))), |
| 291 | + ), |
| 292 | + TableField::new("b", TableDataType::Number(NumberDataType::Int32)), |
| 293 | + TableField::new("c", TableDataType::String), |
| 294 | + ]), |
| 295 | + engine: Engine::Fuse, |
| 296 | + engine_options: Default::default(), |
| 297 | + storage_params: None, |
| 298 | + options: [ |
| 299 | + // database id is required for FUSE |
| 300 | + (OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned()), |
| 301 | + (OPT_KEY_STORAGE_FORMAT.to_owned(), format.to_owned()), |
| 302 | + ] |
| 303 | + .into(), |
| 304 | + field_comments: vec![], |
| 305 | + as_select: None, |
| 306 | + cluster_key: None, |
| 307 | + inverted_indexes: None, |
| 308 | + attached_columns: None, |
| 309 | + }, |
| 310 | + CreateTablePlan { |
| 311 | + create_option: CreateOption::Create, |
| 312 | + tenant: fixture.default_tenant(), |
| 313 | + catalog: fixture.default_catalog_name(), |
| 314 | + database: "default".to_string(), |
| 315 | + table: "t2".to_string(), |
| 316 | + schema: TableSchemaRefExt::create(vec![ |
| 317 | + TableField::new( |
| 318 | + "a", |
| 319 | + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::Int32))), |
| 320 | + ), |
| 321 | + TableField::new("b", TableDataType::Number(NumberDataType::Int32)), |
| 322 | + TableField::new("c", TableDataType::String), |
| 323 | + ]), |
| 324 | + engine: Engine::Fuse, |
| 325 | + engine_options: Default::default(), |
| 326 | + storage_params: None, |
| 327 | + options: [ |
| 328 | + // database id is required for FUSE |
| 329 | + (OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned()), |
| 330 | + (OPT_KEY_STORAGE_FORMAT.to_owned(), format.to_owned()), |
| 331 | + ] |
| 332 | + .into(), |
| 333 | + field_comments: vec![], |
| 334 | + as_select: None, |
| 335 | + cluster_key: None, |
| 336 | + inverted_indexes: None, |
| 337 | + attached_columns: None, |
| 338 | + }, |
| 339 | + ] |
| 340 | +} |
| 341 | + |
| 342 | +#[tokio::test(flavor = "multi_thread")] |
| 343 | +async fn test_query_rewrite() -> Result<()> { |
| 344 | + test_query_rewrite_impl("parquet").await?; |
| 345 | + test_query_rewrite_impl("native").await |
| 346 | +} |
| 347 | + |
| 348 | +async fn test_query_rewrite_impl(format: &str) -> Result<()> { |
| 349 | + let fixture = TestFixture::setup().await?; |
| 350 | + |
| 351 | + let ctx = fixture.new_query_ctx().await?; |
| 352 | + for create_table in create_table_plan(&fixture, format) { |
| 353 | + let interpreter = CreateTableInterpreter::try_create(ctx.clone(), create_table)?; |
| 354 | + let _ = interpreter.execute(ctx.clone()).await?; |
| 355 | + } |
| 356 | + let test_suites = get_test_suites(); |
| 357 | + for suite in test_suites.into_iter() { |
| 358 | + let query = plan_sql(ctx.clone(), suite.query).await?; |
| 359 | + assert_eq!( |
| 360 | + query, suite.explain, |
| 361 | + "\n==== Case: {} ====\n\nQuery: \n{} \nExpected: \n{}", |
| 362 | + suite.name, query, suite.explain |
| 363 | + ); |
| 364 | + } |
| 365 | + |
| 366 | + Ok(()) |
| 367 | +} |
| 368 | + |
| 369 | +fn format_pretty(query: &SExpr, query_meta: &MetadataRef) -> Result<String> { |
| 370 | + let metadata = &*query_meta.read(); |
| 371 | + |
| 372 | + Ok(query.to_format_tree(metadata, false)?.format_pretty()?) |
| 373 | +} |
| 374 | + |
| 375 | +async fn plan_sql(ctx: Arc<dyn TableContext>, sql: &str) -> Result<String> { |
| 376 | + let settings = ctx.get_settings(); |
| 377 | + let metadata = Arc::new(RwLock::new(Metadata::default())); |
| 378 | + let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; |
| 379 | + let binder = Binder::new( |
| 380 | + ctx.clone(), |
| 381 | + CatalogManager::instance(), |
| 382 | + name_resolution_ctx, |
| 383 | + metadata, |
| 384 | + ); |
| 385 | + let tokens = tokenize_sql(sql)?; |
| 386 | + let (stmt, _) = parse_sql(&tokens, Dialect::PostgreSQL)?; |
| 387 | + let plan = binder.bind(&stmt).await?; |
| 388 | + if let Plan::Query { |
| 389 | + s_expr, metadata, .. |
| 390 | + } = plan |
| 391 | + { |
| 392 | + let s_expr = SubqueryRewriter::new(ctx.clone(), metadata.clone(), None).rewrite(&s_expr)?; |
| 393 | + |
| 394 | + return format_pretty(&s_expr, &metadata); |
| 395 | + } |
| 396 | + unreachable!() |
| 397 | +} |
0 commit comments