Skip to content

Commit cd0356d

Browse files
LuciferYangwangyum
authored andcommitted
[SPARK-33673][SQL] Avoid push down partition filters to ParquetScan for DataSourceV2
### What changes were proposed in this pull request? As described in SPARK-33673, some test suites in `ParquetV2SchemaPruningSuite` will failed when set `parquet.version` to 1.11.1 because Parquet will return empty results for non-existent column since PARQUET-1765. This pr change to use `readDataSchema()` instead of `schema` to build `pushedParquetFilters` in `ParquetScanBuilder` to avoid push down partition filters to `ParquetScan` for `DataSourceV2` ### Why are the changes needed? Prepare for upgrade using Parquet 1.11.1. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass the Jenkins or GitHub Action - Manual test as follows: ``` mvn -Dtest=none -DwildcardSuites=org.apache.spark.sql.execution.datasources.parquet.ParquetV2SchemaPruningSuite -Dparquet.version=1.11.1 test -pl sql/core -am ``` **Before** ``` Run completed in 3 minutes, 13 seconds. Total number of tests run: 134 Suites: completed 2, aborted 0 Tests: succeeded 120, failed 14, canceled 0, ignored 0, pending 0 *** 14 TESTS FAILED *** ``` **After** ``` Run completed in 3 minutes, 46 seconds. Total number of tests run: 134 Suites: completed 2, aborted 0 Tests: succeeded 134, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` Closes #30652 from LuciferYang/SPARK-33673. Authored-by: yangjie01 <[email protected]> Signed-off-by: Yuming Wang <[email protected]>
1 parent a84c8d8 commit cd0356d

File tree

2 files changed

+2
-2
lines changed

2 files changed

+2
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ case class ParquetScanBuilder(
5050
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
5151
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
5252
val parquetSchema =
53-
new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(schema)
53+
new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(readDataSchema())
5454
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
5555
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
5656
parquetFilters.convertibleFilters(this.filters).toArray

sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
367367
val basePath = dir.getCanonicalPath + "/" + fmt
368368
val pushFilterMaps = Map (
369369
"parquet" ->
370-
"|PushedFilers: \\[.*\\(id\\), .*\\(value\\), .*\\(id,1\\), .*\\(value,2\\)\\]",
370+
"|PushedFilers: \\[IsNotNull\\(value\\), GreaterThan\\(value,2\\)\\]",
371371
"orc" ->
372372
"|PushedFilers: \\[.*\\(id\\), .*\\(value\\), .*\\(id,1\\), .*\\(value,2\\)\\]",
373373
"csv" ->

0 commit comments

Comments
 (0)