Skip to content

Commit 6f53180

Browse files
authored
ObjectStore API to read from remote storage systems (#950)
* Object Store API to read from remote storage systems * add tokio fs * resolve comments * fmt * resolve comments * fix test * rerun * file_reader async * add delimiter option in list * fix fmt * file_reader to sync * An optional list_dir api
1 parent 4c0d430 commit 6f53180

File tree

6 files changed

+360
-1
lines changed

6 files changed

+360
-1
lines changed

ballista/rust/core/src/serde/physical_plan/from_proto.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use datafusion::catalog::catalog::{
3535
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
3636
};
3737
use datafusion::datasource::datasource::Statistics;
38+
use datafusion::datasource::object_store::ObjectStoreRegistry;
3839
use datafusion::datasource::FilePartition;
3940
use datafusion::execution::context::{
4041
ExecutionConfig, ExecutionContextState, ExecutionProps,
@@ -655,6 +656,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
655656
aggregate_functions: Default::default(),
656657
config: ExecutionConfig::new(),
657658
execution_props: ExecutionProps::new(),
659+
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
658660
};
659661

660662
let fun_expr = functions::create_physical_fun(

datafusion/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ chrono = "0.4"
5858
async-trait = "0.1.41"
5959
futures = "0.3"
6060
pin-project-lite= "^0.2.0"
61-
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
61+
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs"] }
6262
tokio-stream = "0.1"
6363
log = "^0.4"
6464
md-5 = { version = "^0.9.1", optional = true }

datafusion/src/datasource/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub mod datasource;
2222
pub mod empty;
2323
pub mod json;
2424
pub mod memory;
25+
pub mod object_store;
2526
pub mod parquet;
2627

2728
pub use self::csv::{CsvFile, CsvReadOptions};
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Object store that represents the Local File System.
19+
20+
use std::fs::Metadata;
21+
use std::sync::Arc;
22+
23+
use async_trait::async_trait;
24+
use futures::{stream, AsyncRead, StreamExt};
25+
26+
use crate::datasource::object_store::{
27+
FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore,
28+
};
29+
use crate::error::DataFusionError;
30+
use crate::error::Result;
31+
32+
#[derive(Debug)]
33+
/// Local File System as Object Store.
34+
pub struct LocalFileSystem;
35+
36+
#[async_trait]
37+
impl ObjectStore for LocalFileSystem {
38+
async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
39+
list_all(prefix.to_owned()).await
40+
}
41+
42+
async fn list_dir(
43+
&self,
44+
_prefix: &str,
45+
_delimiter: Option<String>,
46+
) -> Result<ListEntryStream> {
47+
todo!()
48+
}
49+
50+
fn file_reader(&self, file: FileMeta) -> Result<Arc<dyn ObjectReader>> {
51+
Ok(Arc::new(LocalFileReader::new(file)?))
52+
}
53+
}
54+
55+
struct LocalFileReader {
56+
file: FileMeta,
57+
}
58+
59+
impl LocalFileReader {
60+
fn new(file: FileMeta) -> Result<Self> {
61+
Ok(Self { file })
62+
}
63+
}
64+
65+
#[async_trait]
66+
impl ObjectReader for LocalFileReader {
67+
async fn chunk_reader(
68+
&self,
69+
_start: u64,
70+
_length: usize,
71+
) -> Result<Arc<dyn AsyncRead>> {
72+
todo!()
73+
}
74+
75+
fn length(&self) -> u64 {
76+
self.file.size
77+
}
78+
}
79+
80+
async fn list_all(prefix: String) -> Result<FileMetaStream> {
81+
fn get_meta(path: String, metadata: Metadata) -> FileMeta {
82+
FileMeta {
83+
path,
84+
last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
85+
size: metadata.len(),
86+
}
87+
}
88+
89+
async fn find_files_in_dir(
90+
path: String,
91+
to_visit: &mut Vec<String>,
92+
) -> Result<Vec<FileMeta>> {
93+
let mut dir = tokio::fs::read_dir(path).await?;
94+
let mut files = Vec::new();
95+
96+
while let Some(child) = dir.next_entry().await? {
97+
if let Some(child_path) = child.path().to_str() {
98+
let metadata = child.metadata().await?;
99+
if metadata.is_dir() {
100+
to_visit.push(child_path.to_string());
101+
} else {
102+
files.push(get_meta(child_path.to_owned(), metadata))
103+
}
104+
} else {
105+
return Err(DataFusionError::Plan("Invalid path".to_string()));
106+
}
107+
}
108+
Ok(files)
109+
}
110+
111+
let prefix_meta = tokio::fs::metadata(&prefix).await?;
112+
let prefix = prefix.to_owned();
113+
if prefix_meta.is_file() {
114+
Ok(Box::pin(stream::once(async move {
115+
Ok(get_meta(prefix, prefix_meta))
116+
})))
117+
} else {
118+
let result = stream::unfold(vec![prefix], move |mut to_visit| async move {
119+
match to_visit.pop() {
120+
None => None,
121+
Some(path) => {
122+
let file_stream = match find_files_in_dir(path, &mut to_visit).await {
123+
Ok(files) => stream::iter(files).map(Ok).left_stream(),
124+
Err(e) => stream::once(async { Err(e) }).right_stream(),
125+
};
126+
127+
Some((file_stream, to_visit))
128+
}
129+
}
130+
})
131+
.flatten();
132+
Ok(Box::pin(result))
133+
}
134+
}
135+
136+
#[cfg(test)]
137+
mod tests {
138+
use super::*;
139+
use futures::StreamExt;
140+
use std::collections::HashSet;
141+
use std::fs::create_dir;
142+
use std::fs::File;
143+
use tempfile::tempdir;
144+
145+
#[tokio::test]
146+
async fn test_recursive_listing() -> Result<()> {
147+
// tmp/a.txt
148+
// tmp/x/b.txt
149+
// tmp/y/c.txt
150+
let tmp = tempdir()?;
151+
let x_path = tmp.path().join("x");
152+
let y_path = tmp.path().join("y");
153+
let a_path = tmp.path().join("a.txt");
154+
let b_path = x_path.join("b.txt");
155+
let c_path = y_path.join("c.txt");
156+
create_dir(&x_path)?;
157+
create_dir(&y_path)?;
158+
File::create(&a_path)?;
159+
File::create(&b_path)?;
160+
File::create(&c_path)?;
161+
162+
let mut all_files = HashSet::new();
163+
let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?;
164+
while let Some(file) = files.next().await {
165+
let file = file?;
166+
assert_eq!(file.size, 0);
167+
all_files.insert(file.path);
168+
}
169+
170+
assert_eq!(all_files.len(), 3);
171+
assert!(all_files.contains(a_path.to_str().unwrap()));
172+
assert!(all_files.contains(b_path.to_str().unwrap()));
173+
assert!(all_files.contains(c_path.to_str().unwrap()));
174+
175+
Ok(())
176+
}
177+
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Object Store abstracts access to an underlying file/object storage.
19+
20+
pub mod local;
21+
22+
use std::collections::HashMap;
23+
use std::fmt::Debug;
24+
use std::pin::Pin;
25+
use std::sync::{Arc, RwLock};
26+
27+
use async_trait::async_trait;
28+
use futures::{AsyncRead, Stream};
29+
30+
use local::LocalFileSystem;
31+
32+
use crate::error::{DataFusionError, Result};
33+
use chrono::Utc;
34+
35+
/// Object Reader for one file in a object store
36+
#[async_trait]
37+
pub trait ObjectReader {
38+
/// Get reader for a part [start, start + length] in the file asynchronously
39+
async fn chunk_reader(&self, start: u64, length: usize)
40+
-> Result<Arc<dyn AsyncRead>>;
41+
42+
/// Get length for the file
43+
fn length(&self) -> u64;
44+
}
45+
46+
/// Represents a file or a prefix that may require further resolution
47+
#[derive(Debug)]
48+
pub enum ListEntry {
49+
/// File metadata
50+
FileMeta(FileMeta),
51+
/// Prefix to be further resolved during partition discovery
52+
Prefix(String),
53+
}
54+
55+
/// File meta we got from object store
56+
#[derive(Debug)]
57+
pub struct FileMeta {
58+
/// Path of the file
59+
pub path: String,
60+
/// Last time the file was modified in UTC
61+
pub last_modified: Option<chrono::DateTime<Utc>>,
62+
/// File size in total
63+
pub size: u64,
64+
}
65+
66+
/// Stream of files get listed from object store
67+
pub type FileMetaStream =
68+
Pin<Box<dyn Stream<Item = Result<FileMeta>> + Send + Sync + 'static>>;
69+
70+
/// Stream of list entries get from object store
71+
pub type ListEntryStream =
72+
Pin<Box<dyn Stream<Item = Result<ListEntry>> + Send + Sync + 'static>>;
73+
74+
/// A ObjectStore abstracts access to an underlying file/object storage.
75+
/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
76+
#[async_trait]
77+
pub trait ObjectStore: Sync + Send + Debug {
78+
/// Returns all the files in path `prefix`
79+
async fn list_file(&self, prefix: &str) -> Result<FileMetaStream>;
80+
81+
/// Returns all the files in `prefix` if the `prefix` is already a leaf dir,
82+
/// or all paths between the `prefix` and the first occurrence of the `delimiter` if it is provided.
83+
async fn list_dir(
84+
&self,
85+
prefix: &str,
86+
delimiter: Option<String>,
87+
) -> Result<ListEntryStream>;
88+
89+
/// Get object reader for one file
90+
fn file_reader(&self, file: FileMeta) -> Result<Arc<dyn ObjectReader>>;
91+
}
92+
93+
static LOCAL_SCHEME: &str = "file";
94+
95+
/// A Registry holds all the object stores at runtime with a scheme for each store.
96+
/// This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
97+
/// and query data inside these systems.
98+
pub struct ObjectStoreRegistry {
99+
/// A map from scheme to object store that serve list / read operations for the store
100+
pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
101+
}
102+
103+
impl ObjectStoreRegistry {
104+
/// Create the registry that object stores can registered into.
105+
/// ['LocalFileSystem'] store is registered in by default to support read local files natively.
106+
pub fn new() -> Self {
107+
let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
108+
map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
109+
110+
Self {
111+
object_stores: RwLock::new(map),
112+
}
113+
}
114+
115+
/// Adds a new store to this registry.
116+
/// If a store of the same prefix existed before, it is replaced in the registry and returned.
117+
pub fn register_store(
118+
&self,
119+
scheme: String,
120+
store: Arc<dyn ObjectStore>,
121+
) -> Option<Arc<dyn ObjectStore>> {
122+
let mut stores = self.object_stores.write().unwrap();
123+
stores.insert(scheme, store)
124+
}
125+
126+
/// Get the store registered for scheme
127+
pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
128+
let stores = self.object_stores.read().unwrap();
129+
stores.get(scheme).cloned()
130+
}
131+
132+
/// Get a suitable store for the URI based on it's scheme. For example:
133+
/// URI with scheme file or no schema will return the default LocalFS store,
134+
/// URI with scheme s3 will return the S3 store if it's registered.
135+
pub fn get_by_uri(&self, uri: &str) -> Result<Arc<dyn ObjectStore>> {
136+
if let Some((scheme, _)) = uri.split_once(':') {
137+
let stores = self.object_stores.read().unwrap();
138+
stores
139+
.get(&*scheme.to_lowercase())
140+
.map(Clone::clone)
141+
.ok_or_else(|| {
142+
DataFusionError::Internal(format!(
143+
"No suitable object store found for {}",
144+
scheme
145+
))
146+
})
147+
} else {
148+
Ok(Arc::new(LocalFileSystem))
149+
}
150+
}
151+
}

0 commit comments

Comments
 (0)