16
16
*
17
17
*/
18
18
19
- use std:: any:: Any ;
20
- use std:: collections:: HashMap ;
21
- use std:: ops:: Bound ;
22
- use std:: os:: unix:: fs:: MetadataExt ;
23
- use std:: sync:: Arc ;
19
+ use std:: { any:: Any , collections:: HashMap , ops:: Bound , sync:: Arc } ;
24
20
25
21
use arrow_array:: RecordBatch ;
26
22
use arrow_schema:: { Schema , SchemaRef , SortOptions } ;
@@ -44,8 +40,8 @@ use datafusion::{
44
40
logical_expr:: {
45
41
utils:: conjunction, BinaryExpr , Operator , TableProviderFilterPushDown , TableType ,
46
42
} ,
47
- physical_expr:: { create_physical_expr, LexOrdering , PhysicalSortExpr } ,
48
- physical_plan:: { self , empty:: EmptyExec , union:: UnionExec , ExecutionPlan , Statistics } ,
43
+ physical_expr:: { create_physical_expr, expressions :: col , LexOrdering , PhysicalSortExpr } ,
44
+ physical_plan:: { empty:: EmptyExec , union:: UnionExec , ExecutionPlan , Statistics } ,
49
45
prelude:: Expr ,
50
46
scalar:: ScalarValue ,
51
47
} ;
@@ -57,20 +53,20 @@ use url::Url;
57
53
58
54
use crate :: {
59
55
catalog:: {
60
- self , column:: TypedStatistics , manifest:: File , manifest:: Manifest , snapshot:: ManifestItem ,
61
- snapshot:: Snapshot , ManifestFile ,
56
+ column:: { Column , TypedStatistics } ,
57
+ manifest:: { File , Manifest } ,
58
+ snapshot:: { ManifestItem , Snapshot } ,
59
+ ManifestFile , Snapshot as CatalogSnapshot ,
62
60
} ,
63
61
event:: DEFAULT_TIMESTAMP_KEY ,
64
62
hottier:: HotTierManager ,
65
63
metrics:: QUERY_CACHE_HIT ,
66
64
option:: Mode ,
67
- parseable:: PARSEABLE ,
68
- parseable:: STREAM_EXISTS ,
65
+ parseable:: { PARSEABLE , STREAM_EXISTS } ,
69
66
storage:: { ObjectStorage , ObjectStoreFormat , STREAM_ROOT_DIRECTORY } ,
70
67
} ;
71
68
72
69
use super :: listing_table_builder:: ListingTableBuilder ;
73
- use crate :: catalog:: Snapshot as CatalogSnapshot ;
74
70
75
71
// schema provider for stream based on global data
76
72
#[ derive( Debug ) ]
@@ -141,9 +137,9 @@ impl StandardTableProvider {
141
137
142
138
let sort_expr = PhysicalSortExpr {
143
139
expr : if let Some ( time_partition) = time_partition {
144
- physical_plan :: expressions :: col ( & time_partition, & self . schema ) ?
140
+ col ( & time_partition, & self . schema ) ?
145
141
} else {
146
- physical_plan :: expressions :: col ( DEFAULT_TIMESTAMP_KEY , & self . schema ) ?
142
+ col ( DEFAULT_TIMESTAMP_KEY , & self . schema ) ?
147
143
} ,
148
144
options : SortOptions {
149
145
descending : true ,
@@ -249,7 +245,7 @@ impl StandardTableProvider {
249
245
let Ok ( file_meta) = file_path. metadata ( ) else {
250
246
continue ;
251
247
} ;
252
- let file = PartitionedFile :: new ( file_path. display ( ) . to_string ( ) , file_meta. size ( ) ) ;
248
+ let file = PartitionedFile :: new ( file_path. display ( ) . to_string ( ) , file_meta. len ( ) ) ;
253
249
partitioned_files[ index % target_partition] . push ( file)
254
250
}
255
251
@@ -324,20 +320,19 @@ impl StandardTableProvider {
324
320
325
321
fn partitioned_files (
326
322
& self ,
327
- manifest_files : Vec < catalog :: manifest :: File > ,
323
+ manifest_files : Vec < File > ,
328
324
) -> ( Vec < Vec < PartitionedFile > > , datafusion:: common:: Statistics ) {
329
325
let target_partition = num_cpus:: get ( ) ;
330
326
let mut partitioned_files = Vec :: from_iter ( ( 0 ..target_partition) . map ( |_| Vec :: new ( ) ) ) ;
331
- let mut column_statistics =
332
- HashMap :: < String , Option < catalog:: column:: TypedStatistics > > :: new ( ) ;
327
+ let mut column_statistics = HashMap :: < String , Option < TypedStatistics > > :: new ( ) ;
333
328
let mut count = 0 ;
334
329
for ( index, file) in manifest_files
335
330
. into_iter ( )
336
331
. enumerate ( )
337
332
. map ( |( x, y) | ( x % target_partition, y) )
338
333
{
339
334
#[ allow( unused_mut) ]
340
- let catalog :: manifest :: File {
335
+ let File {
341
336
mut file_path,
342
337
num_rows,
343
338
columns,
@@ -404,12 +399,12 @@ impl StandardTableProvider {
404
399
}
405
400
406
401
async fn collect_from_snapshot (
407
- snapshot : & catalog :: snapshot :: Snapshot ,
402
+ snapshot : & Snapshot ,
408
403
time_filters : & [ PartialTimeFilter ] ,
409
404
object_store : Arc < dyn ObjectStore > ,
410
405
filters : & [ Expr ] ,
411
406
limit : Option < usize > ,
412
- ) -> Result < Vec < catalog :: manifest :: File > , DataFusionError > {
407
+ ) -> Result < Vec < File > , DataFusionError > {
413
408
let items = snapshot. manifests ( time_filters) ;
414
409
let manifest_files = collect_manifest_files (
415
410
object_store,
@@ -895,7 +890,7 @@ pub fn extract_primary_filter(
895
890
}
896
891
897
892
trait ManifestExt : ManifestFile {
898
- fn find_matching_column ( & self , partial_filter : & Expr ) -> Option < & catalog :: column :: Column > {
893
+ fn find_matching_column ( & self , partial_filter : & Expr ) -> Option < & Column > {
899
894
let name = match partial_filter {
900
895
Expr :: BinaryExpr ( binary_expr) => {
901
896
let Expr :: Column ( col) = binary_expr. left . as_ref ( ) else {
0 commit comments