diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs index c87b307c5fb8..0f31eb7caf41 100644 --- a/datafusion/datasource/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -252,7 +252,10 @@ impl ListingTableUrl { .boxed(), // If the head command fails, it is likely that object doesn't exist. // Retry as though it were a prefix (aka a collection) - Err(_) => list_with_cache(ctx, store, &self.prefix).await?, + Err(object_store::Error::NotFound { .. }) => { + list_with_cache(ctx, store, &self.prefix).await? + } + Err(e) => return Err(e.into()), } }; @@ -405,6 +408,8 @@ fn split_glob_expression(path: &str) -> Option<(&str, &str)> { #[cfg(test)] mod tests { use super::*; + use async_trait::async_trait; + use bytes::Bytes; use datafusion_common::config::TableOptions; use datafusion_common::DFSchema; use datafusion_execution::config::SessionConfig; @@ -414,9 +419,13 @@ mod tests { use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::ExecutionPlan; - use object_store::PutPayload; + use object_store::{ + GetOptions, GetResult, ListResult, MultipartUpload, PutMultipartOptions, + PutPayload, + }; use std::any::Any; use std::collections::HashMap; + use std::ops::Range; use tempfile::tempdir; #[test] @@ -632,48 +641,68 @@ mod tests { } #[tokio::test] - async fn test_list_files() { - let store = object_store::memory::InMemory::new(); + async fn test_list_files() -> Result<()> { + let store = MockObjectStore { + in_mem: object_store::memory::InMemory::new(), + forbidden_paths: vec!["forbidden/e.parquet".into()], + }; + // Create some files: create_file(&store, "a.parquet").await; create_file(&store, "/t/b.parquet").await; create_file(&store, "/t/c.csv").await; create_file(&store, "/t/d.csv").await; + // This file returns a permission error. + create_file(&store, "/forbidden/e.parquet").await; + assert_eq!( - list_all_files("/", &store, "parquet").await, + list_all_files("/", &store, "parquet").await?, vec!["a.parquet"], ); // test with and without trailing slash assert_eq!( - list_all_files("/t/", &store, "parquet").await, + list_all_files("/t/", &store, "parquet").await?, vec!["t/b.parquet"], ); assert_eq!( - list_all_files("/t", &store, "parquet").await, + list_all_files("/t", &store, "parquet").await?, vec!["t/b.parquet"], ); // test with and without trailing slash assert_eq!( - list_all_files("/t", &store, "csv").await, + list_all_files("/t", &store, "csv").await?, vec!["t/c.csv", "t/d.csv"], ); assert_eq!( - list_all_files("/t/", &store, "csv").await, + list_all_files("/t/", &store, "csv").await?, vec!["t/c.csv", "t/d.csv"], ); // Test a non existing prefix assert_eq!( - list_all_files("/NonExisting", &store, "csv").await, + list_all_files("/NonExisting", &store, "csv").await?, vec![] as Vec ); assert_eq!( - list_all_files("/NonExisting/", &store, "csv").await, + list_all_files("/NonExisting/", &store, "csv").await?, vec![] as Vec ); + + // Including forbidden.parquet generates an error. + let Err(DataFusionError::ObjectStore(err)) = + list_all_files("/forbidden/e.parquet", &store, "parquet").await + else { + panic!("Expected ObjectStore error"); + }; + + let object_store::Error::PermissionDenied { .. } = &*err else { + panic!("Expected PermissionDenied error"); + }; + + Ok(()) } /// Creates a file with "hello world" content at the specified path @@ -691,10 +720,8 @@ mod tests { url: &str, store: &dyn ObjectStore, file_extension: &str, - ) -> Vec { - try_list_all_files(url, store, file_extension) - .await - .unwrap() + ) -> Result> { + try_list_all_files(url, store, file_extension).await } /// Runs "list_all_files" and returns their paths @@ -716,6 +743,95 @@ mod tests { Ok(files) } + #[derive(Debug)] + struct MockObjectStore { + in_mem: object_store::memory::InMemory, + forbidden_paths: Vec, + } + + impl std::fmt::Display for MockObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.in_mem.fmt(f) + } + } + + #[async_trait] + impl ObjectStore for MockObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: object_store::PutOptions, + ) -> object_store::Result { + self.in_mem.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> object_store::Result> { + self.in_mem.put_multipart_opts(location, opts).await + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result { + self.in_mem.get_opts(location, options).await + } + + async fn get_ranges( + &self, + location: &Path, + ranges: &[Range], + ) -> object_store::Result> { + self.in_mem.get_ranges(location, ranges).await + } + + async fn head(&self, location: &Path) -> object_store::Result { + if self.forbidden_paths.contains(location) { + Err(object_store::Error::PermissionDenied { + path: location.to_string(), + source: "forbidden".into(), + }) + } else { + self.in_mem.head(location).await + } + } + + async fn delete(&self, location: &Path) -> object_store::Result<()> { + self.in_mem.delete(location).await + } + + fn list( + &self, + prefix: Option<&Path>, + ) -> BoxStream<'static, object_store::Result> { + self.in_mem.list(prefix) + } + + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> object_store::Result { + self.in_mem.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.in_mem.copy(from, to).await + } + + async fn copy_if_not_exists( + &self, + from: &Path, + to: &Path, + ) -> object_store::Result<()> { + self.in_mem.copy_if_not_exists(from, to).await + } + } + struct MockSession { config: SessionConfig, runtime_env: Arc,