Skip to content

Commit 4a67d0d

Browse files
Brent Gardneralamb
andauthored
Automatically register tables if ObjectStore root is configured (#4095)
* squash * Debug test in CI :'( * Hashing inconsistency * Address Andy's concerns * Docs * Docs * fmt * treat empty string like None :( * clippy * PR feedback * Update datafusion/core/src/config.rs Co-authored-by: Andrew Lamb <[email protected]> * Update datafusion/core/src/config.rs Co-authored-by: Andrew Lamb <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent fc669d5 commit 4a67d0d

File tree

6 files changed

+291
-14
lines changed

6 files changed

+291
-14
lines changed
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! listing_schema contains a SchemaProvider that scans ObjectStores for tables automatically
19+
use crate::catalog::schema::SchemaProvider;
20+
use crate::datasource::datasource::TableProviderFactory;
21+
use crate::datasource::TableProvider;
22+
use datafusion_common::DataFusionError;
23+
use futures::TryStreamExt;
24+
use object_store::ObjectStore;
25+
use std::any::Any;
26+
use std::collections::{HashMap, HashSet};
27+
use std::path::Path;
28+
use std::sync::{Arc, Mutex};
29+
30+
/// A `SchemaProvider` that scans an `ObjectStore` to automatically discover tables
31+
///
32+
/// A subfolder relationship is assumed, i.e. given:
33+
/// authority = s3://host.example.com:3000
34+
/// path = /data/tpch
35+
/// factory = `DeltaTableFactory`
36+
///
37+
/// A table called "customer" will be registered for the folder:
38+
/// s3://host.example.com:3000/data/tpch/customer
39+
///
40+
/// assuming it contains valid deltalake data, i.e:
41+
/// s3://host.example.com:3000/data/tpch/customer/part-00000-xxxx.snappy.parquet
42+
/// s3://host.example.com:3000/data/tpch/customer/_delta_log/
43+
pub struct ListingSchemaProvider {
44+
authority: String,
45+
path: object_store::path::Path,
46+
factory: Arc<dyn TableProviderFactory>,
47+
store: Arc<dyn ObjectStore>,
48+
tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
49+
}
50+
51+
impl ListingSchemaProvider {
52+
/// Create a new `ListingSchemaProvider`
53+
///
54+
/// Arguments:
55+
/// `authority`: The scheme (i.e. s3://) + host (i.e. example.com:3000)
56+
/// `path`: The root path that contains subfolders which represent tables
57+
/// `factory`: The `TableProviderFactory` to use to instantiate tables for each subfolder
58+
/// `store`: The `ObjectStore` containing the table data
59+
pub fn new(
60+
authority: String,
61+
path: object_store::path::Path,
62+
factory: Arc<dyn TableProviderFactory>,
63+
store: Arc<dyn ObjectStore>,
64+
) -> Self {
65+
Self {
66+
authority,
67+
path,
68+
factory,
69+
store,
70+
tables: Arc::new(Mutex::new(HashMap::new())),
71+
}
72+
}
73+
74+
/// Reload table information from ObjectStore
75+
pub async fn refresh(&self) -> datafusion_common::Result<()> {
76+
let entries: Vec<_> = self
77+
.store
78+
.list(Some(&self.path))
79+
.await?
80+
.try_collect()
81+
.await?;
82+
let base = Path::new(self.path.as_ref());
83+
let mut tables = HashSet::new();
84+
for file in entries.iter() {
85+
let mut parent = Path::new(file.location.as_ref());
86+
while let Some(p) = parent.parent() {
87+
if p == base {
88+
tables.insert(parent);
89+
}
90+
parent = p;
91+
}
92+
}
93+
for table in tables.iter() {
94+
let file_name = table
95+
.file_name()
96+
.ok_or_else(|| {
97+
DataFusionError::Internal("Cannot parse file name!".to_string())
98+
})?
99+
.to_str()
100+
.ok_or_else(|| {
101+
DataFusionError::Internal("Cannot parse file name!".to_string())
102+
})?;
103+
let table_name = table.to_str().ok_or_else(|| {
104+
DataFusionError::Internal("Cannot parse file name!".to_string())
105+
})?;
106+
if !self.table_exist(file_name) {
107+
let table_name = format!("{}/{}", self.authority, table_name);
108+
let provider = self.factory.create(table_name.as_str()).await?;
109+
let _ = self.register_table(file_name.to_string(), provider.clone())?;
110+
}
111+
}
112+
Ok(())
113+
}
114+
}
115+
116+
impl SchemaProvider for ListingSchemaProvider {
117+
fn as_any(&self) -> &dyn Any {
118+
self
119+
}
120+
121+
fn table_names(&self) -> Vec<String> {
122+
self.tables
123+
.lock()
124+
.expect("Can't lock tables")
125+
.keys()
126+
.map(|it| it.to_string())
127+
.collect()
128+
}
129+
130+
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
131+
self.tables
132+
.lock()
133+
.expect("Can't lock tables")
134+
.get(name)
135+
.cloned()
136+
}
137+
138+
fn register_table(
139+
&self,
140+
name: String,
141+
table: Arc<dyn TableProvider>,
142+
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
143+
self.tables
144+
.lock()
145+
.expect("Can't lock tables")
146+
.insert(name, table.clone());
147+
Ok(Some(table))
148+
}
149+
150+
fn deregister_table(
151+
&self,
152+
name: &str,
153+
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
154+
Ok(self.tables.lock().expect("Can't lock tables").remove(name))
155+
}
156+
157+
fn table_exist(&self, name: &str) -> bool {
158+
self.tables
159+
.lock()
160+
.expect("Can't lock tables")
161+
.contains_key(name)
162+
}
163+
}

datafusion/core/src/catalog/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#![allow(clippy::module_inception)]
2222
pub mod catalog;
2323
pub mod information_schema;
24+
pub mod listing_schema;
2425
pub mod schema;
2526

2627
pub use datafusion_sql::{ResolvedTableReference, TableReference};

datafusion/core/src/config.rs

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str =
6868
/// Configuration option "datafusion.optimizer.max_passes"
6969
pub const OPT_OPTIMIZER_MAX_PASSES: &str = "datafusion.optimizer.max_passes";
7070

71+
/// Location scanned to load tables for `default` schema
72+
pub const OPT_CATALOG_LOCATION: &str = "datafusion.catalog.location";
73+
74+
/// Type of `TableProvider` to use when loading `default` schema
75+
pub const OPT_CATALOG_TYPE: &str = "datafusion.catalog.type";
76+
7177
/// Definition of a configuration option
7278
pub struct ConfigDefinition {
7379
/// key used to identifier this configuration option
@@ -144,13 +150,13 @@ impl ConfigDefinition {
144150
pub fn new_string(
145151
key: impl Into<String>,
146152
description: impl Into<String>,
147-
default_value: String,
153+
default_value: Option<String>,
148154
) -> Self {
149155
Self::new(
150156
key,
151157
description,
152158
DataType::Utf8,
153-
ScalarValue::Utf8(Some(default_value)),
159+
ScalarValue::Utf8(default_value),
154160
)
155161
}
156162
}
@@ -217,7 +223,7 @@ impl BuiltInConfigs {
217223
"The session time zone which some function require \
218224
e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,
219225
then extract the hour.",
220-
"UTC".into()
226+
Some("UTC".into()),
221227
),
222228
ConfigDefinition::new_bool(
223229
OPT_PARQUET_PUSHDOWN_FILTERS,
@@ -245,11 +251,22 @@ impl BuiltInConfigs {
245251
rule. When set to false, any rules that produce errors will cause the query to fail.",
246252
true
247253
),
248-
ConfigDefinition::new_u64(
249-
OPT_OPTIMIZER_MAX_PASSES,
250-
"Number of times that the optimizer will attempt to optimize the plan",
251-
3
252-
)]
254+
ConfigDefinition::new_u64(
255+
OPT_OPTIMIZER_MAX_PASSES,
256+
"Number of times that the optimizer will attempt to optimize the plan",
257+
3
258+
),
259+
ConfigDefinition::new_string(
260+
OPT_CATALOG_LOCATION,
261+
"Location scanned to load tables for `default` schema, defaults to None",
262+
None,
263+
),
264+
ConfigDefinition::new_string(
265+
OPT_CATALOG_TYPE,
266+
"Type of `TableProvider` to use when loading `default` schema. Defaults to None",
267+
None,
268+
),
269+
]
253270
}
254271
}
255272

datafusion/core/src/execution/context.rs

Lines changed: 97 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,10 @@ use datafusion_sql::{
9797
planner::{ContextProvider, SqlToRel},
9898
};
9999
use parquet::file::properties::WriterProperties;
100+
use url::Url;
100101

102+
use crate::catalog::listing_schema::ListingSchemaProvider;
103+
use crate::datasource::object_store::ObjectStoreUrl;
101104
use uuid::Uuid;
102105

103106
use super::options::{
@@ -914,6 +917,11 @@ impl SessionContext {
914917
state.catalog_list.register_catalog(name, catalog)
915918
}
916919

920+
/// Retrieves the list of available catalog names.
921+
pub fn catalog_names(&self) -> Vec<String> {
922+
self.state.read().catalog_list.catalog_names()
923+
}
924+
917925
/// Retrieves a [`CatalogProvider`] instance by name
918926
pub fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
919927
self.state.read().catalog_list.catalog(name)
@@ -1258,6 +1266,11 @@ impl SessionConfig {
12581266
self.set(key, ScalarValue::UInt64(Some(value)))
12591267
}
12601268

1269+
/// Set a generic `str` configuration option
1270+
pub fn set_str(self, key: &str, value: &str) -> Self {
1271+
self.set(key, ScalarValue::Utf8(Some(value.to_string())))
1272+
}
1273+
12611274
/// Customize batch size
12621275
pub fn with_batch_size(self, n: usize) -> Self {
12631276
// batch size must be greater than zero
@@ -1508,6 +1521,8 @@ impl SessionState {
15081521
)
15091522
.expect("memory catalog provider can register schema");
15101523

1524+
Self::register_default_schema(&config, &runtime, &default_catalog);
1525+
15111526
let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema {
15121527
Arc::new(CatalogWithInformationSchema::new(
15131528
Arc::downgrade(&catalog_list),
@@ -1566,6 +1581,48 @@ impl SessionState {
15661581
}
15671582
}
15681583

1584+
fn register_default_schema(
1585+
config: &SessionConfig,
1586+
runtime: &Arc<RuntimeEnv>,
1587+
default_catalog: &MemoryCatalogProvider,
1588+
) {
1589+
let url = config
1590+
.config_options
1591+
.read()
1592+
.get("datafusion.catalog.location");
1593+
let format = config.config_options.read().get("datafusion.catalog.type");
1594+
let (url, format) = match (url, format) {
1595+
(Some(url), Some(format)) => (url, format),
1596+
_ => return,
1597+
};
1598+
if url.is_null() || format.is_null() {
1599+
return;
1600+
}
1601+
let url = url.to_string();
1602+
let format = format.to_string();
1603+
let url = Url::parse(url.as_str()).expect("Invalid default catalog location!");
1604+
let authority = match url.host_str() {
1605+
Some(host) => format!("{}://{}", url.scheme(), host),
1606+
None => format!("{}://", url.scheme()),
1607+
};
1608+
let path = &url.as_str()[authority.len() as usize..];
1609+
let path = object_store::path::Path::parse(path).expect("Can't parse path");
1610+
let store = ObjectStoreUrl::parse(authority.as_str())
1611+
.expect("Invalid default catalog url");
1612+
let store = match runtime.object_store(store) {
1613+
Ok(store) => store,
1614+
_ => return,
1615+
};
1616+
let factory = match runtime.table_factories.get(format.as_str()) {
1617+
Some(factory) => factory,
1618+
_ => return,
1619+
};
1620+
let schema = ListingSchemaProvider::new(authority, path, factory.clone(), store);
1621+
let _ = default_catalog
1622+
.register_schema("default", Arc::new(schema))
1623+
.expect("Failed to register default schema");
1624+
}
1625+
15691626
fn resolve_table_ref<'a>(
15701627
&'a self,
15711628
table_ref: impl Into<TableReference<'a>>,
@@ -1947,10 +2004,12 @@ impl FunctionRegistry for TaskContext {
19472004
mod tests {
19482005
use super::*;
19492006
use crate::assert_batches_eq;
2007+
use crate::datasource::datasource::TableProviderFactory;
19502008
use crate::execution::context::QueryPlanner;
2009+
use crate::execution::runtime_env::RuntimeConfig;
19512010
use crate::physical_plan::expressions::AvgAccumulator;
19522011
use crate::test;
1953-
use crate::test_util::parquet_test_data;
2012+
use crate::test_util::{parquet_test_data, TestTableFactory};
19542013
use crate::variable::VarType;
19552014
use arrow::array::ArrayRef;
19562015
use arrow::datatypes::*;
@@ -1959,9 +2018,10 @@ mod tests {
19592018
use datafusion_expr::{create_udaf, create_udf, Expr, Volatility};
19602019
use datafusion_physical_expr::functions::make_scalar_function;
19612020
use std::fs::File;
2021+
use std::path::PathBuf;
19622022
use std::sync::Weak;
19632023
use std::thread::{self, JoinHandle};
1964-
use std::{io::prelude::*, sync::Mutex};
2024+
use std::{env, io::prelude::*, sync::Mutex};
19652025
use tempfile::TempDir;
19662026

19672027
#[tokio::test]
@@ -2199,6 +2259,41 @@ mod tests {
21992259
Ok(())
22002260
}
22012261

2262+
#[tokio::test]
2263+
async fn with_listing_schema_provider() -> Result<()> {
2264+
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2265+
let path = path.join("tests/tpch-csv");
2266+
let url = format!("file://{}", path.display());
2267+
2268+
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
2269+
HashMap::new();
2270+
table_factories.insert("test".to_string(), Arc::new(TestTableFactory {}));
2271+
let rt_cfg = RuntimeConfig::new().with_table_factories(table_factories);
2272+
let runtime = Arc::new(RuntimeEnv::new(rt_cfg).unwrap());
2273+
let cfg = SessionConfig::new()
2274+
.set_str("datafusion.catalog.location", url.as_str())
2275+
.set_str("datafusion.catalog.type", "test");
2276+
let session_state = SessionState::with_config_rt(cfg, runtime);
2277+
let ctx = SessionContext::with_state(session_state);
2278+
2279+
let mut table_count = 0;
2280+
for cat_name in ctx.catalog_names().iter() {
2281+
let cat = ctx.catalog(cat_name).unwrap();
2282+
for s_name in cat.schema_names().iter() {
2283+
let schema = cat.schema(s_name).unwrap();
2284+
if let Some(listing) =
2285+
schema.as_any().downcast_ref::<ListingSchemaProvider>()
2286+
{
2287+
listing.refresh().await.unwrap();
2288+
table_count = schema.table_names().len();
2289+
}
2290+
}
2291+
}
2292+
2293+
assert_eq!(table_count, 8);
2294+
Ok(())
2295+
}
2296+
22022297
#[tokio::test]
22032298
async fn custom_query_planner() -> Result<()> {
22042299
let runtime = Arc::new(RuntimeEnv::default());

0 commit comments

Comments
 (0)