Skip to content

Commit 85150dd

Browse files
colinmarctobixdev
authored andcommitted
fix: only fall back to listing prefixes on 404 errors (apache#18263)
This logic would previously swallow all errors, including things like DNS resolution failures. If the path was indeed a path and not a prefix, that would result in the path being dropped, since the prefix wouldn't include any further files. Fixes apache#18242.
1 parent 0405bda commit 85150dd

File tree

1 file changed

+131
-15
lines changed
  • datafusion/datasource/src

1 file changed

+131
-15
lines changed

datafusion/datasource/src/url.rs

Lines changed: 131 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,10 @@ impl ListingTableUrl {
252252
.boxed(),
253253
// If the head command fails, it is likely that object doesn't exist.
254254
// Retry as though it were a prefix (aka a collection)
255-
Err(_) => list_with_cache(ctx, store, &self.prefix).await?,
255+
Err(object_store::Error::NotFound { .. }) => {
256+
list_with_cache(ctx, store, &self.prefix).await?
257+
}
258+
Err(e) => return Err(e.into()),
256259
}
257260
};
258261

@@ -405,6 +408,8 @@ fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
405408
#[cfg(test)]
406409
mod tests {
407410
use super::*;
411+
use async_trait::async_trait;
412+
use bytes::Bytes;
408413
use datafusion_common::config::TableOptions;
409414
use datafusion_common::DFSchema;
410415
use datafusion_execution::config::SessionConfig;
@@ -415,9 +420,13 @@ mod tests {
415420
use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
416421
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
417422
use datafusion_physical_plan::ExecutionPlan;
418-
use object_store::PutPayload;
423+
use object_store::{
424+
GetOptions, GetResult, ListResult, MultipartUpload, PutMultipartOptions,
425+
PutPayload,
426+
};
419427
use std::any::Any;
420428
use std::collections::HashMap;
429+
use std::ops::Range;
421430
use tempfile::tempdir;
422431

423432
#[test]
@@ -633,48 +642,68 @@ mod tests {
633642
}
634643

635644
#[tokio::test]
636-
async fn test_list_files() {
637-
let store = object_store::memory::InMemory::new();
645+
async fn test_list_files() -> Result<()> {
646+
let store = MockObjectStore {
647+
in_mem: object_store::memory::InMemory::new(),
648+
forbidden_paths: vec!["forbidden/e.parquet".into()],
649+
};
650+
638651
// Create some files:
639652
create_file(&store, "a.parquet").await;
640653
create_file(&store, "/t/b.parquet").await;
641654
create_file(&store, "/t/c.csv").await;
642655
create_file(&store, "/t/d.csv").await;
643656

657+
// This file returns a permission error.
658+
create_file(&store, "/forbidden/e.parquet").await;
659+
644660
assert_eq!(
645-
list_all_files("/", &store, "parquet").await,
661+
list_all_files("/", &store, "parquet").await?,
646662
vec!["a.parquet"],
647663
);
648664

649665
// test with and without trailing slash
650666
assert_eq!(
651-
list_all_files("/t/", &store, "parquet").await,
667+
list_all_files("/t/", &store, "parquet").await?,
652668
vec!["t/b.parquet"],
653669
);
654670
assert_eq!(
655-
list_all_files("/t", &store, "parquet").await,
671+
list_all_files("/t", &store, "parquet").await?,
656672
vec!["t/b.parquet"],
657673
);
658674

659675
// test with and without trailing slash
660676
assert_eq!(
661-
list_all_files("/t", &store, "csv").await,
677+
list_all_files("/t", &store, "csv").await?,
662678
vec!["t/c.csv", "t/d.csv"],
663679
);
664680
assert_eq!(
665-
list_all_files("/t/", &store, "csv").await,
681+
list_all_files("/t/", &store, "csv").await?,
666682
vec!["t/c.csv", "t/d.csv"],
667683
);
668684

669685
// Test a non existing prefix
670686
assert_eq!(
671-
list_all_files("/NonExisting", &store, "csv").await,
687+
list_all_files("/NonExisting", &store, "csv").await?,
672688
vec![] as Vec<String>
673689
);
674690
assert_eq!(
675-
list_all_files("/NonExisting/", &store, "csv").await,
691+
list_all_files("/NonExisting/", &store, "csv").await?,
676692
vec![] as Vec<String>
677693
);
694+
695+
// Including forbidden.parquet generates an error.
696+
let Err(DataFusionError::ObjectStore(err)) =
697+
list_all_files("/forbidden/e.parquet", &store, "parquet").await
698+
else {
699+
panic!("Expected ObjectStore error");
700+
};
701+
702+
let object_store::Error::PermissionDenied { .. } = &*err else {
703+
panic!("Expected PermissionDenied error");
704+
};
705+
706+
Ok(())
678707
}
679708

680709
/// Creates a file with "hello world" content at the specified path
@@ -692,10 +721,8 @@ mod tests {
692721
url: &str,
693722
store: &dyn ObjectStore,
694723
file_extension: &str,
695-
) -> Vec<String> {
696-
try_list_all_files(url, store, file_extension)
697-
.await
698-
.unwrap()
724+
) -> Result<Vec<String>> {
725+
try_list_all_files(url, store, file_extension).await
699726
}
700727

701728
/// Runs "list_all_files" and returns their paths
@@ -717,6 +744,95 @@ mod tests {
717744
Ok(files)
718745
}
719746

747+
#[derive(Debug)]
748+
struct MockObjectStore {
749+
in_mem: object_store::memory::InMemory,
750+
forbidden_paths: Vec<Path>,
751+
}
752+
753+
impl std::fmt::Display for MockObjectStore {
754+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
755+
self.in_mem.fmt(f)
756+
}
757+
}
758+
759+
#[async_trait]
760+
impl ObjectStore for MockObjectStore {
761+
async fn put_opts(
762+
&self,
763+
location: &Path,
764+
payload: PutPayload,
765+
opts: object_store::PutOptions,
766+
) -> object_store::Result<object_store::PutResult> {
767+
self.in_mem.put_opts(location, payload, opts).await
768+
}
769+
770+
async fn put_multipart_opts(
771+
&self,
772+
location: &Path,
773+
opts: PutMultipartOptions,
774+
) -> object_store::Result<Box<dyn MultipartUpload>> {
775+
self.in_mem.put_multipart_opts(location, opts).await
776+
}
777+
778+
async fn get_opts(
779+
&self,
780+
location: &Path,
781+
options: GetOptions,
782+
) -> object_store::Result<GetResult> {
783+
self.in_mem.get_opts(location, options).await
784+
}
785+
786+
async fn get_ranges(
787+
&self,
788+
location: &Path,
789+
ranges: &[Range<u64>],
790+
) -> object_store::Result<Vec<Bytes>> {
791+
self.in_mem.get_ranges(location, ranges).await
792+
}
793+
794+
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
795+
if self.forbidden_paths.contains(location) {
796+
Err(object_store::Error::PermissionDenied {
797+
path: location.to_string(),
798+
source: "forbidden".into(),
799+
})
800+
} else {
801+
self.in_mem.head(location).await
802+
}
803+
}
804+
805+
async fn delete(&self, location: &Path) -> object_store::Result<()> {
806+
self.in_mem.delete(location).await
807+
}
808+
809+
fn list(
810+
&self,
811+
prefix: Option<&Path>,
812+
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
813+
self.in_mem.list(prefix)
814+
}
815+
816+
async fn list_with_delimiter(
817+
&self,
818+
prefix: Option<&Path>,
819+
) -> object_store::Result<ListResult> {
820+
self.in_mem.list_with_delimiter(prefix).await
821+
}
822+
823+
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
824+
self.in_mem.copy(from, to).await
825+
}
826+
827+
async fn copy_if_not_exists(
828+
&self,
829+
from: &Path,
830+
to: &Path,
831+
) -> object_store::Result<()> {
832+
self.in_mem.copy_if_not_exists(from, to).await
833+
}
834+
}
835+
720836
struct MockSession {
721837
config: SessionConfig,
722838
runtime_env: Arc<RuntimeEnv>,

0 commit comments

Comments
 (0)