1515// specific language governing permissions and limitations
1616// under the License.
1717
18- use arrow:: datatypes:: SchemaRef ;
1918use arrow:: util:: pretty;
20- use datafusion:: common:: { Result , ToDFSchema } ;
21- use datafusion:: config:: {
22- ConfigOptions , OPT_PARQUET_ENABLE_PAGE_INDEX , OPT_PARQUET_PUSHDOWN_FILTERS ,
23- OPT_PARQUET_REORDER_FILTERS ,
24- } ;
25- use datafusion:: datasource:: listing:: { ListingTableUrl , PartitionedFile } ;
26- use datafusion:: datasource:: object_store:: ObjectStoreUrl ;
27- use datafusion:: execution:: context:: ExecutionProps ;
19+ use datafusion:: common:: Result ;
2820use datafusion:: logical_expr:: { lit, or, Expr } ;
2921use datafusion:: optimizer:: utils:: disjunction;
30- use datafusion:: physical_expr:: create_physical_expr;
3122use datafusion:: physical_plan:: collect;
32- use datafusion:: physical_plan:: file_format:: { FileScanConfig , ParquetExec } ;
33- use datafusion:: physical_plan:: filter:: FilterExec ;
3423use datafusion:: prelude:: { col, SessionConfig , SessionContext } ;
35- use object_store:: path:: Path ;
36- use object_store:: ObjectMeta ;
37- use parquet:: arrow:: ArrowWriter ;
38- use parquet:: file:: properties:: WriterProperties ;
39- use std:: fs:: File ;
24+ use parquet_test_utils:: { ParquetScanOptions , TestParquetFile } ;
4025use std:: path:: PathBuf ;
41- use std:: sync:: Arc ;
4226use std:: time:: Instant ;
4327use structopt:: StructOpt ;
4428use test_utils:: AccessLogGenerator ;
@@ -89,34 +73,16 @@ async fn main() -> Result<()> {
8973
9074 let path = opt. path . join ( "logs.parquet" ) ;
9175
92- let ( schema, object_store_url, object_meta) =
93- gen_data ( path, opt. scale_factor , opt. page_size , opt. row_group_size ) ?;
76+ let test_file = gen_data ( path, opt. scale_factor , opt. page_size , opt. row_group_size ) ?;
9477
95- run_benchmarks (
96- & mut ctx,
97- schema,
98- object_store_url,
99- object_meta,
100- opt. iterations ,
101- opt. debug ,
102- )
103- . await ?;
78+ run_benchmarks ( & mut ctx, & test_file, opt. iterations , opt. debug ) . await ?;
10479
10580 Ok ( ( ) )
10681}
10782
108- #[ derive( Debug , Clone ) ]
109- struct ParquetScanOptions {
110- pushdown_filters : bool ,
111- reorder_filters : bool ,
112- enable_page_index : bool ,
113- }
114-
11583async fn run_benchmarks (
11684 ctx : & mut SessionContext ,
117- schema : SchemaRef ,
118- object_store_url : ObjectStoreUrl ,
119- object_meta : ObjectMeta ,
85+ test_file : & TestParquetFile ,
12086 iterations : usize ,
12187 debug : bool ,
12288) -> Result < ( ) > {
@@ -174,9 +140,7 @@ async fn run_benchmarks(
174140 let start = Instant :: now ( ) ;
175141 let rows = exec_scan (
176142 ctx,
177- schema. clone ( ) ,
178- object_store_url. clone ( ) ,
179- object_meta. clone ( ) ,
143+ test_file,
180144 filter_expr. clone ( ) ,
181145 scan_options. clone ( ) ,
182146 debug,
@@ -197,52 +161,12 @@ async fn run_benchmarks(
197161
198162async fn exec_scan (
199163 ctx : & SessionContext ,
200- schema : SchemaRef ,
201- object_store_url : ObjectStoreUrl ,
202- object_meta : ObjectMeta ,
164+ test_file : & TestParquetFile ,
203165 filter : Expr ,
204166 scan_options : ParquetScanOptions ,
205167 debug : bool ,
206168) -> Result < usize > {
207- let ParquetScanOptions {
208- pushdown_filters,
209- reorder_filters,
210- enable_page_index,
211- } = scan_options;
212-
213- let mut config_options = ConfigOptions :: new ( ) ;
214- config_options. set_bool ( OPT_PARQUET_PUSHDOWN_FILTERS , pushdown_filters) ;
215- config_options. set_bool ( OPT_PARQUET_REORDER_FILTERS , reorder_filters) ;
216- config_options. set_bool ( OPT_PARQUET_ENABLE_PAGE_INDEX , enable_page_index) ;
217-
218- let scan_config = FileScanConfig {
219- object_store_url,
220- file_schema : schema. clone ( ) ,
221- file_groups : vec ! [ vec![ PartitionedFile {
222- object_meta,
223- partition_values: vec![ ] ,
224- range: None ,
225- extensions: None ,
226- } ] ] ,
227- statistics : Default :: default ( ) ,
228- projection : None ,
229- limit : None ,
230- table_partition_cols : vec ! [ ] ,
231- config_options : config_options. into_shareable ( ) ,
232- } ;
233-
234- let df_schema = schema. clone ( ) . to_dfschema ( ) ?;
235-
236- let physical_filter_expr = create_physical_expr (
237- & filter,
238- & df_schema,
239- schema. as_ref ( ) ,
240- & ExecutionProps :: default ( ) ,
241- ) ?;
242-
243- let parquet_exec = Arc :: new ( ParquetExec :: new ( scan_config, Some ( filter) , None ) ) ;
244-
245- let exec = Arc :: new ( FilterExec :: try_new ( physical_filter_expr, parquet_exec) ?) ;
169+ let exec = test_file. create_scan ( filter, scan_options) . await ?;
246170
247171 let task_ctx = ctx. task_ctx ( ) ;
248172 let result = collect ( exec, task_ctx) . await ?;
@@ -258,53 +182,15 @@ fn gen_data(
258182 scale_factor : f32 ,
259183 page_size : Option < usize > ,
260184 row_group_size : Option < usize > ,
261- ) -> Result < ( SchemaRef , ObjectStoreUrl , ObjectMeta ) > {
185+ ) -> Result < TestParquetFile > {
262186 let generator = AccessLogGenerator :: new ( ) ;
263187
264- let file = File :: create ( & path) . unwrap ( ) ;
265-
266- let mut props_builder = WriterProperties :: builder ( ) ;
267-
268- if let Some ( s) = page_size {
269- props_builder = props_builder
270- . set_data_pagesize_limit ( s)
271- . set_write_batch_size ( s) ;
272- }
273-
274- if let Some ( s) = row_group_size {
275- props_builder = props_builder. set_max_row_group_size ( s) ;
276- }
277-
278- let schema = generator. schema ( ) ;
279- let mut writer =
280- ArrowWriter :: try_new ( file, schema. clone ( ) , Some ( props_builder. build ( ) ) ) . unwrap ( ) ;
281-
282- let mut num_rows = 0 ;
283-
284188 let num_batches = 100_f32 * scale_factor;
285189
286- for batch in generator. take ( num_batches as usize ) {
287- writer. write ( & batch) . unwrap ( ) ;
288- writer. flush ( ) ?;
289- num_rows += batch. num_rows ( ) ;
290- }
291- writer. close ( ) . unwrap ( ) ;
292-
293- println ! ( "Generated test dataset with {} rows" , num_rows) ;
294-
295- let size = std:: fs:: metadata ( & path) ?. len ( ) as usize ;
296-
297- let canonical_path = path. canonicalize ( ) ?;
298-
299- let object_store_url =
300- ListingTableUrl :: parse ( canonical_path. to_str ( ) . unwrap_or_default ( ) ) ?
301- . object_store ( ) ;
302-
303- let object_meta = ObjectMeta {
304- location : Path :: parse ( canonical_path. to_str ( ) . unwrap_or_default ( ) ) ?,
305- last_modified : Default :: default ( ) ,
306- size,
307- } ;
308-
309- Ok ( ( schema, object_store_url, object_meta) )
190+ TestParquetFile :: try_new (
191+ path,
192+ generator. take ( num_batches as usize ) ,
193+ page_size,
194+ row_group_size,
195+ )
310196}
0 commit comments