@@ -22,7 +22,7 @@ use std::fmt::{self, Debug, Display};
2222use  crate :: { Result ,  ScalarValue } ; 
2323
2424use  crate :: error:: _plan_err; 
25- use  arrow:: datatypes:: { DataType ,  Schema ,   SchemaRef } ; 
25+ use  arrow:: datatypes:: { DataType ,  Schema } ; 
2626
2727/// Represents a value with a degree of certainty. `Precision` is used to 
2828/// propagate information the precision of statistical values. 
@@ -391,13 +391,15 @@ impl Statistics {
391391     /// parameter to compute global statistics in a multi-partition setting. 
392392     pub  fn  with_fetch ( 
393393        mut  self , 
394-         schema :  SchemaRef , 
395394        fetch :  Option < usize > , 
396395        skip :  usize , 
397396        n_partitions :  usize , 
398397    )  -> Result < Self >  { 
399398        let  fetch_val = fetch. unwrap_or ( usize:: MAX ) ; 
400399
400+         // Get the ratio of rows after / rows before on a per-partition basis 
401+         let  num_rows_before = self . num_rows ; 
402+ 
401403        self . num_rows  = match  self  { 
402404            Statistics  { 
403405                num_rows :  Precision :: Exact ( nr) , 
@@ -431,8 +433,7 @@ impl Statistics {
431433                    // At this point we know that we were given a `fetch` value 
432434                    // as the `None` case would go into the branch above. Since 
433435                    // the input has more rows than `fetch + skip`, the number 
434-                     // of rows will be the `fetch`, but we won't be able to 
435-                     // predict the other statistics. 
436+                     // of rows will be the `fetch`, other statistics will have to be downgraded to inexact. 
436437                    check_num_rows ( 
437438                        fetch_val. checked_mul ( n_partitions) , 
438439                        // We know that we have an estimate for the number of rows: 
@@ -445,8 +446,32 @@ impl Statistics {
445446                ..
446447            }  => check_num_rows ( fetch. and_then ( |v| v. checked_mul ( n_partitions) ) ,  false ) , 
447448        } ; 
448-         self . column_statistics  = Statistics :: unknown_column ( & schema) ; 
449-         self . total_byte_size  = Precision :: Absent ; 
449+         let  ratio:  f64  = match  ( num_rows_before,  self . num_rows )  { 
450+             ( 
451+                 Precision :: Exact ( nr_before)  | Precision :: Inexact ( nr_before) , 
452+                 Precision :: Exact ( nr_after)  | Precision :: Inexact ( nr_after) , 
453+             )  => { 
454+                 if  nr_before == 0  { 
455+                     0.0 
456+                 }  else  { 
457+                     nr_after as  f64  / nr_before as  f64 
458+                 } 
459+             } 
460+             _ => 0.0 , 
461+         } ; 
462+         self . column_statistics  = self 
463+             . column_statistics 
464+             . into_iter ( ) 
465+             . map ( ColumnStatistics :: to_inexact) 
466+             . collect ( ) ; 
467+         // Adjust the total_byte_size for the ratio of rows before and after, also marking it as inexact 
468+         self . total_byte_size  = match  & self . total_byte_size  { 
469+             Precision :: Exact ( n)  | Precision :: Inexact ( n)  => { 
470+                 let  adjusted = ( * n as  f64  *  ratio)  as  usize ; 
471+                 Precision :: Inexact ( adjusted) 
472+             } 
473+             Precision :: Absent  => Precision :: Absent , 
474+         } ; 
450475        Ok ( self ) 
451476    } 
452477
@@ -1199,4 +1224,265 @@ mod tests {
11991224        // Distinct count should be Absent after merge 
12001225        assert_eq ! ( col_stats. distinct_count,  Precision :: Absent ) ; 
12011226    } 
1227+ 
1228+     #[ test]  
1229+     fn  test_with_fetch_basic_preservation ( )  { 
1230+         // Test that column statistics and byte size are preserved (as inexact) when applying fetch 
1231+         let  original_stats = Statistics  { 
1232+             num_rows :  Precision :: Exact ( 1000 ) , 
1233+             total_byte_size :  Precision :: Exact ( 8000 ) , 
1234+             column_statistics :  vec ! [ 
1235+                 ColumnStatistics  { 
1236+                     null_count:  Precision :: Exact ( 10 ) , 
1237+                     max_value:  Precision :: Exact ( ScalarValue :: Int32 ( Some ( 100 ) ) ) , 
1238+                     min_value:  Precision :: Exact ( ScalarValue :: Int32 ( Some ( 0 ) ) ) , 
1239+                     sum_value:  Precision :: Exact ( ScalarValue :: Int32 ( Some ( 5050 ) ) ) , 
1240+                     distinct_count:  Precision :: Exact ( 50 ) , 
1241+                 } , 
1242+                 ColumnStatistics  { 
1243+                     null_count:  Precision :: Exact ( 20 ) , 
1244+                     max_value:  Precision :: Exact ( ScalarValue :: Int64 ( Some ( 200 ) ) ) , 
1245+                     min_value:  Precision :: Exact ( ScalarValue :: Int64 ( Some ( 10 ) ) ) , 
1246+                     sum_value:  Precision :: Exact ( ScalarValue :: Int64 ( Some ( 10100 ) ) ) , 
1247+                     distinct_count:  Precision :: Exact ( 75 ) , 
1248+                 } , 
1249+             ] , 
1250+         } ; 
1251+ 
1252+         // Apply fetch of 100 rows (10% of original) 
1253+         let  result = original_stats. clone ( ) . with_fetch ( Some ( 100 ) ,  0 ,  1 ) . unwrap ( ) ; 
1254+ 
1255+         // Check num_rows 
1256+         assert_eq ! ( result. num_rows,  Precision :: Exact ( 100 ) ) ; 
1257+ 
1258+         // Check total_byte_size is scaled proportionally and marked as inexact 
1259+         // 100/1000 = 0.1, so 8000 * 0.1 = 800 
1260+         assert_eq ! ( result. total_byte_size,  Precision :: Inexact ( 800 ) ) ; 
1261+ 
1262+         // Check column statistics are preserved but marked as inexact 
1263+         assert_eq ! ( result. column_statistics. len( ) ,  2 ) ; 
1264+ 
1265+         // First column 
1266+         assert_eq ! ( 
1267+             result. column_statistics[ 0 ] . null_count, 
1268+             Precision :: Inexact ( 10 ) 
1269+         ) ; 
1270+         assert_eq ! ( 
1271+             result. column_statistics[ 0 ] . max_value, 
1272+             Precision :: Inexact ( ScalarValue :: Int32 ( Some ( 100 ) ) ) 
1273+         ) ; 
1274+         assert_eq ! ( 
1275+             result. column_statistics[ 0 ] . min_value, 
1276+             Precision :: Inexact ( ScalarValue :: Int32 ( Some ( 0 ) ) ) 
1277+         ) ; 
1278+         assert_eq ! ( 
1279+             result. column_statistics[ 0 ] . sum_value, 
1280+             Precision :: Inexact ( ScalarValue :: Int32 ( Some ( 5050 ) ) ) 
1281+         ) ; 
1282+         assert_eq ! ( 
1283+             result. column_statistics[ 0 ] . distinct_count, 
1284+             Precision :: Inexact ( 50 ) 
1285+         ) ; 
1286+ 
1287+         // Second column 
1288+         assert_eq ! ( 
1289+             result. column_statistics[ 1 ] . null_count, 
1290+             Precision :: Inexact ( 20 ) 
1291+         ) ; 
1292+         assert_eq ! ( 
1293+             result. column_statistics[ 1 ] . max_value, 
1294+             Precision :: Inexact ( ScalarValue :: Int64 ( Some ( 200 ) ) ) 
1295+         ) ; 
1296+         assert_eq ! ( 
1297+             result. column_statistics[ 1 ] . min_value, 
1298+             Precision :: Inexact ( ScalarValue :: Int64 ( Some ( 10 ) ) ) 
1299+         ) ; 
1300+         assert_eq ! ( 
1301+             result. column_statistics[ 1 ] . sum_value, 
1302+             Precision :: Inexact ( ScalarValue :: Int64 ( Some ( 10100 ) ) ) 
1303+         ) ; 
1304+         assert_eq ! ( 
1305+             result. column_statistics[ 1 ] . distinct_count, 
1306+             Precision :: Inexact ( 75 ) 
1307+         ) ; 
1308+     } 
1309+ 
1310+     #[ test]  
1311+     fn  test_with_fetch_inexact_input ( )  { 
1312+         // Test that inexact input statistics remain inexact 
1313+         let  original_stats = Statistics  { 
1314+             num_rows :  Precision :: Inexact ( 1000 ) , 
1315+             total_byte_size :  Precision :: Inexact ( 8000 ) , 
1316+             column_statistics :  vec ! [ ColumnStatistics  { 
1317+                 null_count:  Precision :: Inexact ( 10 ) , 
1318+                 max_value:  Precision :: Inexact ( ScalarValue :: Int32 ( Some ( 100 ) ) ) , 
1319+                 min_value:  Precision :: Inexact ( ScalarValue :: Int32 ( Some ( 0 ) ) ) , 
1320+                 sum_value:  Precision :: Inexact ( ScalarValue :: Int32 ( Some ( 5050 ) ) ) , 
1321+                 distinct_count:  Precision :: Inexact ( 50 ) , 
1322+             } ] , 
1323+         } ; 
1324+ 
1325+         let  result = original_stats. clone ( ) . with_fetch ( Some ( 500 ) ,  0 ,  1 ) . unwrap ( ) ; 
1326+ 
1327+         // Check num_rows is inexact 
1328+         assert_eq ! ( result. num_rows,  Precision :: Inexact ( 500 ) ) ; 
1329+ 
1330+         // Check total_byte_size is scaled and inexact 
1331+         // 500/1000 = 0.5, so 8000 * 0.5 = 4000 
1332+         assert_eq ! ( result. total_byte_size,  Precision :: Inexact ( 4000 ) ) ; 
1333+ 
1334+         // Column stats remain inexact 
1335+         assert_eq ! ( 
1336+             result. column_statistics[ 0 ] . null_count, 
1337+             Precision :: Inexact ( 10 ) 
1338+         ) ; 
1339+     } 
1340+ 
1341+     #[ test]  
1342+     fn  test_with_fetch_skip_all_rows ( )  { 
1343+         // Test when skip >= num_rows (all rows are skipped) 
1344+         let  original_stats = Statistics  { 
1345+             num_rows :  Precision :: Exact ( 100 ) , 
1346+             total_byte_size :  Precision :: Exact ( 800 ) , 
1347+             column_statistics :  vec ! [ col_stats_i64( 10 ) ] , 
1348+         } ; 
1349+ 
1350+         let  result = original_stats. clone ( ) . with_fetch ( Some ( 50 ) ,  100 ,  1 ) . unwrap ( ) ; 
1351+ 
1352+         assert_eq ! ( result. num_rows,  Precision :: Exact ( 0 ) ) ; 
1353+         // When ratio is 0/100 = 0, byte size should be 0 
1354+         assert_eq ! ( result. total_byte_size,  Precision :: Inexact ( 0 ) ) ; 
1355+     } 
1356+ 
1357+     #[ test]  
1358+     fn  test_with_fetch_no_limit ( )  { 
1359+         // Test when fetch is None and skip is 0 (no limit applied) 
1360+         let  original_stats = Statistics  { 
1361+             num_rows :  Precision :: Exact ( 100 ) , 
1362+             total_byte_size :  Precision :: Exact ( 800 ) , 
1363+             column_statistics :  vec ! [ col_stats_i64( 10 ) ] , 
1364+         } ; 
1365+ 
1366+         let  result = original_stats. clone ( ) . with_fetch ( None ,  0 ,  1 ) . unwrap ( ) ; 
1367+ 
1368+         // Stats should be unchanged when no fetch and no skip 
1369+         assert_eq ! ( result. num_rows,  Precision :: Exact ( 100 ) ) ; 
1370+         assert_eq ! ( result. total_byte_size,  Precision :: Exact ( 800 ) ) ; 
1371+     } 
1372+ 
1373+     #[ test]  
1374+     fn  test_with_fetch_with_skip ( )  { 
1375+         // Test with both skip and fetch 
1376+         let  original_stats = Statistics  { 
1377+             num_rows :  Precision :: Exact ( 1000 ) , 
1378+             total_byte_size :  Precision :: Exact ( 8000 ) , 
1379+             column_statistics :  vec ! [ col_stats_i64( 10 ) ] , 
1380+         } ; 
1381+ 
1382+         // Skip 200, fetch 300, so we get rows 200-500 
1383+         let  result = original_stats
1384+             . clone ( ) 
1385+             . with_fetch ( Some ( 300 ) ,  200 ,  1 ) 
1386+             . unwrap ( ) ; 
1387+ 
1388+         assert_eq ! ( result. num_rows,  Precision :: Exact ( 300 ) ) ; 
1389+         // 300/1000 = 0.3, so 8000 * 0.3 = 2400 
1390+         assert_eq ! ( result. total_byte_size,  Precision :: Inexact ( 2400 ) ) ; 
1391+     } 
1392+ 
1393+     #[ test]  
1394+     fn  test_with_fetch_multi_partition ( )  { 
1395+         // Test with multiple partitions 
1396+         let  original_stats = Statistics  { 
1397+             num_rows :  Precision :: Exact ( 1000 ) ,  // per partition 
1398+             total_byte_size :  Precision :: Exact ( 8000 ) , 
1399+             column_statistics :  vec ! [ col_stats_i64( 10 ) ] , 
1400+         } ; 
1401+ 
1402+         // Fetch 100 per partition, 4 partitions = 400 total 
1403+         let  result = original_stats. clone ( ) . with_fetch ( Some ( 100 ) ,  0 ,  4 ) . unwrap ( ) ; 
1404+ 
1405+         assert_eq ! ( result. num_rows,  Precision :: Exact ( 400 ) ) ; 
1406+         // 400/1000 = 0.4, so 8000 * 0.4 = 3200 
1407+         assert_eq ! ( result. total_byte_size,  Precision :: Inexact ( 3200 ) ) ; 
1408+     } 
1409+ 
1410+     #[ test]  
1411+     fn  test_with_fetch_absent_stats ( )  { 
1412+         // Test with absent statistics 
1413+         let  original_stats = Statistics  { 
1414+             num_rows :  Precision :: Absent , 
1415+             total_byte_size :  Precision :: Absent , 
1416+             column_statistics :  vec ! [ ColumnStatistics  { 
1417+                 null_count:  Precision :: Absent , 
1418+                 max_value:  Precision :: Absent , 
1419+                 min_value:  Precision :: Absent , 
1420+                 sum_value:  Precision :: Absent , 
1421+                 distinct_count:  Precision :: Absent , 
1422+             } ] , 
1423+         } ; 
1424+ 
1425+         let  result = original_stats. clone ( ) . with_fetch ( Some ( 100 ) ,  0 ,  1 ) . unwrap ( ) ; 
1426+ 
1427+         // With absent input stats, output should be inexact estimate 
1428+         assert_eq ! ( result. num_rows,  Precision :: Inexact ( 100 ) ) ; 
1429+         assert_eq ! ( result. total_byte_size,  Precision :: Absent ) ; 
1430+         // Column stats should remain absent 
1431+         assert_eq ! ( result. column_statistics[ 0 ] . null_count,  Precision :: Absent ) ; 
1432+     } 
1433+ 
1434+     #[ test]  
1435+     fn  test_with_fetch_fetch_exceeds_rows ( )  { 
1436+         // Test when fetch is larger than available rows after skip 
1437+         let  original_stats = Statistics  { 
1438+             num_rows :  Precision :: Exact ( 100 ) , 
1439+             total_byte_size :  Precision :: Exact ( 800 ) , 
1440+             column_statistics :  vec ! [ col_stats_i64( 10 ) ] , 
1441+         } ; 
1442+ 
1443+         // Skip 50, fetch 100, but only 50 rows remain 
1444+         let  result = original_stats. clone ( ) . with_fetch ( Some ( 100 ) ,  50 ,  1 ) . unwrap ( ) ; 
1445+ 
1446+         assert_eq ! ( result. num_rows,  Precision :: Exact ( 50 ) ) ; 
1447+         // 50/100 = 0.5, so 800 * 0.5 = 400 
1448+         assert_eq ! ( result. total_byte_size,  Precision :: Inexact ( 400 ) ) ; 
1449+     } 
1450+ 
1451+     #[ test]  
1452+     fn  test_with_fetch_preserves_all_column_stats ( )  { 
1453+         // Comprehensive test that all column statistic fields are preserved 
1454+         let  original_col_stats = ColumnStatistics  { 
1455+             null_count :  Precision :: Exact ( 42 ) , 
1456+             max_value :  Precision :: Exact ( ScalarValue :: Int32 ( Some ( 999 ) ) ) , 
1457+             min_value :  Precision :: Exact ( ScalarValue :: Int32 ( Some ( -100 ) ) ) , 
1458+             sum_value :  Precision :: Exact ( ScalarValue :: Int32 ( Some ( 123456 ) ) ) , 
1459+             distinct_count :  Precision :: Exact ( 789 ) , 
1460+         } ; 
1461+ 
1462+         let  original_stats = Statistics  { 
1463+             num_rows :  Precision :: Exact ( 1000 ) , 
1464+             total_byte_size :  Precision :: Exact ( 8000 ) , 
1465+             column_statistics :  vec ! [ original_col_stats. clone( ) ] , 
1466+         } ; 
1467+ 
1468+         let  result = original_stats. with_fetch ( Some ( 250 ) ,  0 ,  1 ) . unwrap ( ) ; 
1469+ 
1470+         let  result_col_stats = & result. column_statistics [ 0 ] ; 
1471+ 
1472+         // All values should be preserved but marked as inexact 
1473+         assert_eq ! ( result_col_stats. null_count,  Precision :: Inexact ( 42 ) ) ; 
1474+         assert_eq ! ( 
1475+             result_col_stats. max_value, 
1476+             Precision :: Inexact ( ScalarValue :: Int32 ( Some ( 999 ) ) ) 
1477+         ) ; 
1478+         assert_eq ! ( 
1479+             result_col_stats. min_value, 
1480+             Precision :: Inexact ( ScalarValue :: Int32 ( Some ( -100 ) ) ) 
1481+         ) ; 
1482+         assert_eq ! ( 
1483+             result_col_stats. sum_value, 
1484+             Precision :: Inexact ( ScalarValue :: Int32 ( Some ( 123456 ) ) ) 
1485+         ) ; 
1486+         assert_eq ! ( result_col_stats. distinct_count,  Precision :: Inexact ( 789 ) ) ; 
1487+     } 
12021488} 
0 commit comments