From b14a8b076c02d6c71393e5dad81d6ca9b60ac583 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Wed, 19 Mar 2025 16:35:39 +0800 Subject: [PATCH 1/4] feat(query): add iceberg table functions --- Cargo.lock | 1 + src/query/catalog/src/table_args.rs | 103 +++++++++++++++ .../table_functions/table_function_factory.rs | 11 ++ src/query/sql/src/planner/plans/scan.rs | 8 +- .../hilbert_clustering_information.rs | 2 +- .../storages/fuse/src/table_functions/mod.rs | 4 +- .../fuse/src/table_functions/table_args.rs | 123 ------------------ src/query/storages/iceberg/Cargo.toml | 1 + src/query/storages/iceberg/src/lib.rs | 2 + src/query/storages/iceberg/src/table.rs | 9 ++ .../suites/tpch_iceberg/utils.test | 1 + 11 files changed, 136 insertions(+), 129 deletions(-) delete mode 100644 src/query/storages/fuse/src/table_functions/table_args.rs diff --git a/Cargo.lock b/Cargo.lock index fe49ac2627c3b..fc073ed73e49f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4255,6 +4255,7 @@ dependencies = [ "databend-common-meta-store", "databend-common-meta-types", "databend-common-pipeline-core", + "databend-common-pipeline-sources", "databend-common-storage", "databend-common-storages-parquet", "databend-storages-common-table-meta", diff --git a/src/query/catalog/src/table_args.rs b/src/query/catalog/src/table_args.rs index 6878605e80f63..dccf3b5f8a814 100644 --- a/src/query/catalog/src/table_args.rs +++ b/src/query/catalog/src/table_args.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::collections::HashMap; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::types::NumberScalar; use databend_common_expression::Scalar; use log::debug; @@ -99,3 +101,104 @@ impl TableArgs { } } } + +pub fn string_value(value: &Scalar) -> Result { + match value { + Scalar::String(val) => Ok(val.clone()), + _ => Err(ErrorCode::BadArguments("invalid string.")), + } +} + +pub fn bool_value(value: &Scalar) -> Result { + match value { + Scalar::Boolean(val) => Ok(*val), + _ => Err(ErrorCode::BadArguments("invalid boolean.")), + } +} + +pub fn string_literal(val: &str) -> Scalar { + Scalar::String(val.to_string()) +} + +pub fn bool_literal(val: bool) -> Scalar { + Scalar::Boolean(val) +} + +pub fn u64_literal(val: u64) -> Scalar { + Scalar::Number(NumberScalar::UInt64(val)) +} + +pub fn cmp_with_null(v1: &Scalar, v2: &Scalar) -> Ordering { + match (v1.is_null(), v2.is_null()) { + (true, true) => Ordering::Equal, + (true, false) => Ordering::Greater, + (false, true) => Ordering::Less, + (false, false) => v1.cmp(v2), + } +} + +pub fn parse_sequence_args(table_args: &TableArgs, func_name: &str) -> Result { + let args = table_args.expect_all_positioned(func_name, Some(1))?; + let sequence = string_value(&args[0])?; + Ok(sequence) +} + +pub fn parse_db_tb_args(table_args: &TableArgs, func_name: &str) -> Result<(String, String)> { + let args = table_args.expect_all_positioned(func_name, Some(2))?; + let db = string_value(&args[0])?; + let tbl = string_value(&args[1])?; + Ok((db, tbl)) +} + +pub fn parse_db_tb_opt_args( + table_args: &TableArgs, + func_name: &str, +) -> Result<(String, String, Option)> { + let args = table_args.expect_all_positioned(func_name, None)?; + match args.len() { + 3 => { + let arg1 = string_value(&args[0])?; + let arg2 = string_value(&args[1])?; + let arg3 = string_value(&args[2])?; + Ok((arg1, arg2, Some(arg3))) + } + 2 => { + let arg1 = string_value(&args[0])?; + let arg2 = string_value(&args[1])?; + Ok((arg1, arg2, None)) + } + _ => Err(ErrorCode::BadArguments(format!( + "expecting , and (as string literals), but got {:?}", + args + ))), + } +} + +pub fn parse_opt_opt_args( + table_args: &TableArgs, + func_name: &str, +) -> Result<(Option, Option)> { + let args = table_args.expect_all_positioned(func_name, None)?; + match args.len() { + 2 => { + let arg1 = string_value(&args[0])?; + let arg2 = string_value(&args[1])?; + Ok((Some(arg1), Some(arg2))) + } + 1 => { + let arg1 = string_value(&args[0])?; + Ok((Some(arg1), None)) + } + 0 => Ok((None, None)), + _ => Err(ErrorCode::BadArguments(format!( + "expecting and (as string literals), but got {:?}", + args + ))), + } +} + +pub fn parse_db_tb_col_args(table_args: &TableArgs, func_name: &str) -> Result { + let args = table_args.expect_all_positioned(func_name, Some(1))?; + let db = string_value(&args[0])?; + Ok(db) +} diff --git a/src/query/service/src/table_functions/table_function_factory.rs b/src/query/service/src/table_functions/table_function_factory.rs index 7df6715482e4d..cfd5b32f0a57e 100644 --- a/src/query/service/src/table_functions/table_function_factory.rs +++ b/src/query/service/src/table_functions/table_function_factory.rs @@ -32,6 +32,7 @@ use databend_common_storages_fuse::table_functions::FuseVacuumTemporaryTable; use databend_common_storages_fuse::table_functions::HilbertClusteringInfoFunc; use databend_common_storages_fuse::table_functions::SetCacheCapacity; use databend_common_storages_fuse::table_functions::TableFunctionTemplate; +use databend_common_storages_iceberg::IcebergInspectTable; use databend_common_storages_stream::stream_status_table_func::StreamStatusTable; use databend_storages_common_table_meta::table_id_ranges::SYS_TBL_FUC_ID_END; use databend_storages_common_table_meta::table_id_ranges::SYS_TBL_FUNC_ID_BEGIN; @@ -354,6 +355,16 @@ impl TableFunctionFactory { (next_id(), Arc::new(ShowRoles::create)), ); + creators.insert( + "iceberg_snapshot".to_string(), + (next_id(), Arc::new(IcebergInspectTable::create)), + ); + + creators.insert( + "iceberg_manifest".to_string(), + (next_id(), Arc::new(IcebergInspectTable::create)), + ); + TableFunctionFactory { creators: RwLock::new(creators), } diff --git a/src/query/sql/src/planner/plans/scan.rs b/src/query/sql/src/planner/plans/scan.rs index aa3e189e7f161..bccc92ff0004d 100644 --- a/src/query/sql/src/planner/plans/scan.rs +++ b/src/query/sql/src/planner/plans/scan.rs @@ -236,8 +236,12 @@ impl Operator for Scan { } if let Some(col_stat) = v.clone() { // Safe to unwrap: min, max are all `Some(_)`. - let min = col_stat.min.unwrap(); - let max = col_stat.max.unwrap(); + let Some(min) = col_stat.min.clone() else { + continue; + }; + let Some(max) = col_stat.max.clone() else { + continue; + }; // ndv could be `None`, we will use `num_rows - null_count` as ndv instead. // // NOTE: don't touch the original num_rows, since it will be used in other places. diff --git a/src/query/storages/fuse/src/table_functions/hilbert_clustering_information.rs b/src/query/storages/fuse/src/table_functions/hilbert_clustering_information.rs index a051d8b8dcf4a..194adc932c5b0 100644 --- a/src/query/storages/fuse/src/table_functions/hilbert_clustering_information.rs +++ b/src/query/storages/fuse/src/table_functions/hilbert_clustering_information.rs @@ -18,6 +18,7 @@ use chrono::Utc; use databend_common_catalog::catalog::CATALOG_DEFAULT; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::table::Table; +use databend_common_catalog::table_args::string_literal; use databend_common_catalog::table_args::TableArgs; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; @@ -40,7 +41,6 @@ use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE; use crate::io::SegmentsIO; use crate::table_functions::parse_db_tb_args; -use crate::table_functions::string_literal; use crate::table_functions::SimpleArgFunc; use crate::table_functions::SimpleArgFuncTemplate; use crate::FuseTable; diff --git a/src/query/storages/fuse/src/table_functions/mod.rs b/src/query/storages/fuse/src/table_functions/mod.rs index 6c42b04a44330..36ac2718d0b38 100644 --- a/src/query/storages/fuse/src/table_functions/mod.rs +++ b/src/query/storages/fuse/src/table_functions/mod.rs @@ -27,13 +27,12 @@ mod fuse_vacuum_drop_aggregating_index; mod fuse_vacuum_drop_inverted_index; mod fuse_vacuum_temporary_table; mod hilbert_clustering_information; -mod table_args; mod set_cache_capacity; pub use clustering_information::ClusteringInformationFunc; pub use clustering_statistics::ClusteringStatisticsFunc; -use databend_common_catalog::table_args::TableArgs; +pub use databend_common_catalog::table_args::*; use databend_common_catalog::table_function::TableFunction; pub use function_template::SimpleTableFunc; pub use function_template::TableFunctionTemplate; @@ -52,4 +51,3 @@ pub use fuse_vacuum_drop_inverted_index::FuseVacuumDropInvertedIndex; pub use fuse_vacuum_temporary_table::FuseVacuumTemporaryTable; pub use hilbert_clustering_information::HilbertClusteringInfoFunc; pub use set_cache_capacity::SetCacheCapacity; -pub use table_args::*; diff --git a/src/query/storages/fuse/src/table_functions/table_args.rs b/src/query/storages/fuse/src/table_functions/table_args.rs deleted file mode 100644 index df8d2fbbc3e1d..0000000000000 --- a/src/query/storages/fuse/src/table_functions/table_args.rs +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::cmp::Ordering; - -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_expression::types::NumberScalar; -use databend_common_expression::Scalar; - -use crate::table_functions::TableArgs; - -pub fn string_value(value: &Scalar) -> Result { - match value { - Scalar::String(val) => Ok(val.clone()), - _ => Err(ErrorCode::BadArguments("invalid string.")), - } -} - -pub fn bool_value(value: &Scalar) -> Result { - match value { - Scalar::Boolean(val) => Ok(*val), - _ => Err(ErrorCode::BadArguments("invalid boolean.")), - } -} - -pub fn string_literal(val: &str) -> Scalar { - Scalar::String(val.to_string()) -} - -pub fn bool_literal(val: bool) -> Scalar { - Scalar::Boolean(val) -} - -pub fn u64_literal(val: u64) -> Scalar { - Scalar::Number(NumberScalar::UInt64(val)) -} - -pub fn cmp_with_null(v1: &Scalar, v2: &Scalar) -> Ordering { - match (v1.is_null(), v2.is_null()) { - (true, true) => Ordering::Equal, - (true, false) => Ordering::Greater, - (false, true) => Ordering::Less, - (false, false) => v1.cmp(v2), - } -} - -pub fn parse_sequence_args(table_args: &TableArgs, func_name: &str) -> Result { - let args = table_args.expect_all_positioned(func_name, Some(1))?; - let sequence = string_value(&args[0])?; - Ok(sequence) -} - -pub fn parse_db_tb_args(table_args: &TableArgs, func_name: &str) -> Result<(String, String)> { - let args = table_args.expect_all_positioned(func_name, Some(2))?; - let db = string_value(&args[0])?; - let tbl = string_value(&args[1])?; - Ok((db, tbl)) -} - -pub fn parse_db_tb_opt_args( - table_args: &TableArgs, - func_name: &str, -) -> Result<(String, String, Option)> { - let args = table_args.expect_all_positioned(func_name, None)?; - match args.len() { - 3 => { - let arg1 = string_value(&args[0])?; - let arg2 = string_value(&args[1])?; - let arg3 = string_value(&args[2])?; - Ok((arg1, arg2, Some(arg3))) - } - 2 => { - let arg1 = string_value(&args[0])?; - let arg2 = string_value(&args[1])?; - Ok((arg1, arg2, None)) - } - _ => Err(ErrorCode::BadArguments(format!( - "expecting , and (as string literals), but got {:?}", - args - ))), - } -} - -pub fn parse_opt_opt_args( - table_args: &TableArgs, - func_name: &str, -) -> Result<(Option, Option)> { - let args = table_args.expect_all_positioned(func_name, None)?; - match args.len() { - 2 => { - let arg1 = string_value(&args[0])?; - let arg2 = string_value(&args[1])?; - Ok((Some(arg1), Some(arg2))) - } - 1 => { - let arg1 = string_value(&args[0])?; - Ok((Some(arg1), None)) - } - 0 => Ok((None, None)), - _ => Err(ErrorCode::BadArguments(format!( - "expecting and (as string literals), but got {:?}", - args - ))), - } -} - -pub fn parse_db_tb_col_args(table_args: &TableArgs, func_name: &str) -> Result { - let args = table_args.expect_all_positioned(func_name, Some(1))?; - let db = string_value(&args[0])?; - Ok(db) -} diff --git a/src/query/storages/iceberg/Cargo.toml b/src/query/storages/iceberg/Cargo.toml index 18fdf888d92db..403a8f7558a9d 100644 --- a/src/query/storages/iceberg/Cargo.toml +++ b/src/query/storages/iceberg/Cargo.toml @@ -19,6 +19,7 @@ databend-common-meta-app = { workspace = true } databend-common-meta-store = { workspace = true } databend-common-meta-types = { workspace = true } databend-common-pipeline-core = { workspace = true } +databend-common-pipeline-sources = { workspace = true } databend-common-storage = { workspace = true } databend-common-storages-parquet = { workspace = true } databend-storages-common-table-meta = { workspace = true } diff --git a/src/query/storages/iceberg/src/lib.rs b/src/query/storages/iceberg/src/lib.rs index c423cd8d0925e..39fac38ae5ad2 100644 --- a/src/query/storages/iceberg/src/lib.rs +++ b/src/query/storages/iceberg/src/lib.rs @@ -21,6 +21,7 @@ mod catalog; mod database; +mod iceberg_inspect; mod partition; mod predicate; mod statistics; @@ -30,4 +31,5 @@ mod table_source; pub use catalog::IcebergCatalog; pub use catalog::IcebergCreator; pub use catalog::ICEBERG_CATALOG; +pub use iceberg_inspect::IcebergInspectTable; pub use table::IcebergTable; diff --git a/src/query/storages/iceberg/src/table.rs b/src/query/storages/iceberg/src/table.rs index 91b5a2f71859a..4b950cfc03696 100644 --- a/src/query/storages/iceberg/src/table.rs +++ b/src/query/storages/iceberg/src/table.rs @@ -205,6 +205,15 @@ impl IcebergTable { Ok((table, statistics)) } + pub fn try_from_table(tbl: &dyn Table) -> Result<&Self> { + tbl.as_any().downcast_ref::().ok_or_else(|| { + ErrorCode::Internal(format!( + "expects table of engine iceberg, but got {}", + tbl.engine() + )) + }) + } + /// create a new table on the table directory #[async_backtrace::framed] pub async fn try_create_from_iceberg_catalog( diff --git a/tests/sqllogictests/suites/tpch_iceberg/utils.test b/tests/sqllogictests/suites/tpch_iceberg/utils.test index a37436751e4b7..0530cdbfe5e0c 100644 --- a/tests/sqllogictests/suites/tpch_iceberg/utils.test +++ b/tests/sqllogictests/suites/tpch_iceberg/utils.test @@ -54,6 +54,7 @@ partsupp region supplier + ## test database statement ok create database abc; From ff4459f93e4211ddcc66048dc62a776dc950421c Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Wed, 19 Mar 2025 17:00:47 +0800 Subject: [PATCH 2/4] feat(query): add iceberg table functionsm --- .../storages/iceberg/src/iceberg_inspect.rs | 346 ++++++++++++++++++ .../suites/tpch_iceberg/table_function.test | 28 ++ 2 files changed, 374 insertions(+) create mode 100644 src/query/storages/iceberg/src/iceberg_inspect.rs create mode 100644 tests/sqllogictests/suites/tpch_iceberg/table_function.test diff --git a/src/query/storages/iceberg/src/iceberg_inspect.rs b/src/query/storages/iceberg/src/iceberg_inspect.rs new file mode 100644 index 0000000000000..45bf81d5f24f5 --- /dev/null +++ b/src/query/storages/iceberg/src/iceberg_inspect.rs @@ -0,0 +1,346 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use chrono::DateTime; +use databend_common_catalog::plan::DataSourcePlan; +use databend_common_catalog::plan::PartStatistics; +use databend_common_catalog::plan::Partitions; +use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table::Table; +use databend_common_catalog::table_args::parse_db_tb_args; +use databend_common_catalog::table_args::TableArgs; +use databend_common_catalog::table_context::TableContext; +use databend_common_catalog::table_function::TableFunction; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchema; +use databend_common_expression::TableDataType; +use databend_common_expression::TableField; +use databend_common_expression::TableSchema; +use databend_common_expression::TableSchemaRef; +use databend_common_expression::TableSchemaRefExt; +use databend_common_meta_app::schema::TableIdent; +use databend_common_meta_app::schema::TableInfo; +use databend_common_meta_app::schema::TableMeta; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_core::Pipeline; +use databend_common_pipeline_sources::AsyncSource; +use databend_common_pipeline_sources::AsyncSourcer; +use futures::StreamExt; + +use crate::IcebergTable; + +pub struct IcebergInspectTable { + table_info: TableInfo, + args_parsed: IcebergInspectArgsParsed, + table_args: TableArgs, + inspect_type: InspectType, +} + +impl IcebergInspectTable { + pub fn create( + _database_name: &str, + table_func_name: &str, + table_id: u64, + table_args: TableArgs, + ) -> databend_common_exception::Result> { + let args_parsed = IcebergInspectArgsParsed::parse(&table_args, table_func_name)?; + let inspect_type = InspectType::from_name(table_func_name); + let table_info = TableInfo { + ident: TableIdent::new(table_id, 0), + desc: format!("'{}'.'{}'", args_parsed.database, args_parsed.table), + name: String::from(table_func_name), + meta: TableMeta { + schema: inspect_type.schema(), + engine: String::from(table_func_name), + created_on: DateTime::from_timestamp(0, 0).unwrap(), + updated_on: DateTime::from_timestamp(0, 0).unwrap(), + ..Default::default() + }, + ..Default::default() + }; + + Ok(Arc::new(IcebergInspectTable { + table_info, + args_parsed, + table_args, + inspect_type, + })) + } + + pub fn create_source( + &self, + ctx: Arc, + output: Arc, + _push_downs: Option, + ) -> Result { + IcebergInspectSource::create( + ctx, + output, + self.args_parsed.database.clone(), + self.args_parsed.table.clone(), + self.inspect_type, + ) + } +} + +#[async_trait::async_trait] +impl Table for IcebergInspectTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_table_info(&self) -> &TableInfo { + &self.table_info + } + + #[async_backtrace::framed] + async fn read_partitions( + &self, + _ctx: Arc, + _push_downs: Option, + _dry_run: bool, + ) -> databend_common_exception::Result<(PartStatistics, Partitions)> { + Ok((PartStatistics::new_exact(1, 1, 1, 1), Partitions::default())) + } + + fn table_args(&self) -> Option { + Some(self.table_args.clone()) + } + + fn read_data( + &self, + ctx: Arc, + _plan: &DataSourcePlan, + pipeline: &mut Pipeline, + _put_cache: bool, + ) -> databend_common_exception::Result<()> { + pipeline.add_source(|output| self.create_source(ctx.clone(), output, None), 1)?; + Ok(()) + } +} + +struct IcebergInspectSource { + is_finished: bool, + database: String, + table: String, + iceberg_table: Option, + inspect_type: InspectType, + ctx: Arc, +} + +impl IcebergInspectSource { + pub fn create( + ctx: Arc, + output: Arc, + database: String, + table: String, + inspect_type: InspectType, + ) -> databend_common_exception::Result { + AsyncSourcer::create(ctx.clone(), output, IcebergInspectSource { + ctx, + database, + table, + iceberg_table: None, + is_finished: false, + inspect_type, + }) + } +} + +#[async_trait::async_trait] +impl AsyncSource for IcebergInspectSource { + const NAME: &'static str = "iceberg_inspect"; + + #[async_backtrace::framed] + async fn generate(&mut self) -> databend_common_exception::Result> { + if self.is_finished { + return Ok(None); + } + + if self.iceberg_table.is_none() { + let table = self + .ctx + .get_table( + &self.ctx.get_current_catalog(), + self.database.as_str(), + self.table.as_str(), + ) + .await?; + let table = IcebergTable::try_from_table(table.as_ref())?; + self.iceberg_table = Some(table.clone()); + } + + let table = self.iceberg_table.as_ref().unwrap(); + let is = table.table.inspect(); + let mut blocks = vec![]; + + let sc = match self.inspect_type { + InspectType::Snapshot => { + let sns = is.snapshots(); + sns.scan().await + } + InspectType::Manifest => { + let mns = is.manifests(); + mns.scan().await + } + }; + + let mut stream = sc.map_err(|err| { + ErrorCode::Internal(format!("iceberg table inspect scan build: {err:?}")) + })?; + let schema = self.inspect_type.schema(); + let schema = DataSchema::from(schema); + while let Some(Ok(d)) = stream.next().await { + let (block, _) = DataBlock::from_record_batch(&schema, &d)?; + blocks.push(block); + } + + self.is_finished = true; + if !blocks.is_empty() { + Ok(Some(DataBlock::concat(&blocks)?)) + } else { + Ok(None) + } + } +} + +impl TableFunction for IcebergInspectTable { + fn function_name(&self) -> &str { + self.name() + } + + fn as_table<'a>(self: Arc) -> Arc + where Self: 'a { + self + } +} + +#[derive(Clone)] +pub(crate) struct IcebergInspectArgsParsed { + pub(crate) database: String, + pub(crate) table: String, +} + +impl IcebergInspectArgsParsed { + fn parse(table_args: &TableArgs, name: &str) -> databend_common_exception::Result { + let (database, table) = parse_db_tb_args(table_args, name)?; + Ok(IcebergInspectArgsParsed { database, table }) + } +} + +#[derive(Debug, Copy, Clone)] +enum InspectType { + Snapshot, + Manifest, +} + +impl InspectType { + pub fn schema(&self) -> TableSchemaRef { + match self { + InspectType::Snapshot => TableSchemaRefExt::create(vec![ + TableField::new("committed_at", TableDataType::Timestamp), + TableField::new("snapshot_id", TableDataType::Number(NumberDataType::Int64)), + TableField::new("parent_id", TableDataType::Number(NumberDataType::Int64)), + TableField::new("operation", TableDataType::String), + TableField::new("manifest_list", TableDataType::String), + TableField::new( + "summary", + TableDataType::Map(Box::new(TableDataType::Tuple { + fields_name: vec!["keys".to_string(), "values".to_string()], + fields_type: vec![ + TableDataType::String, + TableDataType::String.wrap_nullable(), + ], + })), + ), + ]), + InspectType::Manifest => { + let fields = vec![ + TableField::new("content", TableDataType::Number(NumberDataType::Int32)), + TableField::new("path", TableDataType::String), + TableField::new("length", TableDataType::Number(NumberDataType::Int64)), + TableField::new( + "partition_spec_id", + TableDataType::Number(NumberDataType::Int32), + ), + TableField::new( + "added_snapshot_id", + TableDataType::Number(NumberDataType::Int64).wrap_nullable(), + ), + TableField::new( + "added_data_files_count", + TableDataType::Number(NumberDataType::Int32).wrap_nullable(), + ), + TableField::new( + "existing_data_files_count", + TableDataType::Number(NumberDataType::Int32).wrap_nullable(), + ), + TableField::new( + "deleted_data_files_count", + TableDataType::Number(NumberDataType::Int32).wrap_nullable(), + ), + TableField::new( + "added_delete_files_count", + TableDataType::Number(NumberDataType::Int32).wrap_nullable(), + ), + TableField::new( + "existing_delete_files_count", + TableDataType::Number(NumberDataType::Int32).wrap_nullable(), + ), + TableField::new( + "deleted_delete_files_count", + TableDataType::Number(NumberDataType::Int32).wrap_nullable(), + ), + TableField::new( + "partition_summaries", + TableDataType::Array(Box::new( + TableDataType::Tuple { + fields_name: vec![ + "contains_null".to_string(), + "contains_nan".to_string(), + "lower_bound".to_string(), + "upper_bound".to_string(), + ], + fields_type: vec![ + TableDataType::Boolean.wrap_nullable(), + TableDataType::Boolean.wrap_nullable(), + TableDataType::String, + TableDataType::String, + ], + } + .wrap_nullable(), + )), + ), + ]; + + Arc::new(TableSchema::new(fields)) + } + } + } + + pub fn from_name(name: &str) -> InspectType { + match name { + "iceberg_snapshot" => InspectType::Snapshot, + "iceberg_manifest" => InspectType::Manifest, + _ => unimplemented!(), + } + } +} diff --git a/tests/sqllogictests/suites/tpch_iceberg/table_function.test b/tests/sqllogictests/suites/tpch_iceberg/table_function.test new file mode 100644 index 0000000000000..d730aea9c439f --- /dev/null +++ b/tests/sqllogictests/suites/tpch_iceberg/table_function.test @@ -0,0 +1,28 @@ +statement ok +DROP CATALOG IF EXISTS ctl; + +statement ok +CREATE CATALOG ctl +TYPE=ICEBERG +CONNECTION=( + TYPE='rest' + ADDRESS='http://127.0.0.1:8181' + WAREHOUSE='s3://iceberg-tpch' + "s3.region"='us-east-1' + "s3.endpoint"='http://127.0.0.1:9000' +); + +statement ok +use catalog ctl; + +statement ok +select * from iceberg_snapshot('tpch', 'lineitem'); + +statement ok +select * from iceberg_manifest('tpch', 'lineitem'); + +statement ok +select sum(added_data_files_count) from iceberg_manifest('tpch', 'lineitem'); + +statement ok +select sum(summary['total-records']::Int64), count() from iceberg_snapshot('tpch', 'lineitem') where operation = 'append'; From 758e416a178b7daef71fd5471a720940029cc76f Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Wed, 19 Mar 2025 22:54:14 +0800 Subject: [PATCH 3/4] feat(query): add iceberg table functions --- src/query/storages/iceberg/src/iceberg_inspect.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/query/storages/iceberg/src/iceberg_inspect.rs b/src/query/storages/iceberg/src/iceberg_inspect.rs index 45bf81d5f24f5..1529970bbff10 100644 --- a/src/query/storages/iceberg/src/iceberg_inspect.rs +++ b/src/query/storages/iceberg/src/iceberg_inspect.rs @@ -76,7 +76,6 @@ impl IcebergInspectTable { }, ..Default::default() }; - Ok(Arc::new(IcebergInspectTable { table_info, args_parsed, From 07d6c19bc2e371e2f13e7fdf0ca8db94523c486c Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Thu, 20 Mar 2025 09:02:11 +0800 Subject: [PATCH 4/4] feat(query): add iceberg table functions --- src/query/catalog/src/table_args.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/query/catalog/src/table_args.rs b/src/query/catalog/src/table_args.rs index dccf3b5f8a814..3f9cfeccf6885 100644 --- a/src/query/catalog/src/table_args.rs +++ b/src/query/catalog/src/table_args.rs @@ -105,14 +105,18 @@ impl TableArgs { pub fn string_value(value: &Scalar) -> Result { match value { Scalar::String(val) => Ok(val.clone()), - _ => Err(ErrorCode::BadArguments("invalid string.")), + _ => Err(ErrorCode::BadArguments(format!( + "invalid value {value} expect to be string literal." + ))), } } pub fn bool_value(value: &Scalar) -> Result { match value { Scalar::Boolean(val) => Ok(*val), - _ => Err(ErrorCode::BadArguments("invalid boolean.")), + _ => Err(ErrorCode::BadArguments(format!( + "invalid value {value} expect to be boolean literal." + ))), } }