Skip to content

Commit faedcd9

Browse files
databricks-david-lewiscloud-fan
authored andcommitted
[SPARK-41970] Introduce SparkPath for typesafety
### What changes were proposed in this pull request? This PR proposes a strongly typed `SparkPath` that encapsulates a url-encoded string. It has helper methods for creating hadoop paths, uris, and uri-encoded strings. The intent is to identify and fix various bugs in the way that Spark handles these paths. To do this we introduced the SparkPath type to `PartitionFile` (a widely used class), and then started fixing compile errors. In doing so we fixed various bugs. ### Why are the changes needed? Given `val str = "s3://bucket/path with space/a"` There is a difference between `new Path(str)` and `new Path(new URI(str))`, and thus a difference between `new URI(str)` and `new Path(str).toUri`. Both `URI` and `Path` are symmetric in construction and `toString`, but are not interchangeable. Spark confuses these two paths (uri-encoded vs not). This PR attempts to use types to disambiguate them. ### Does this PR introduce _any_ user-facing change? This PR proposes changing the public API of `PartitionedFile`, and various other methods in the name of type safety. It needs to be clear to callers of an API what type of path string is expected. ### How was this patch tested? We rely on existing tests, and update the default temp path creation to include paths with spaces. Closes #39488 from databricks-david-lewis/SPARK_PATH. Authored-by: David Lewis <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 498b3ec commit faedcd9

File tree

42 files changed

+216
-133
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+216
-133
lines changed

connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.avro
1919

2020
import java.io._
21-
import java.net.URI
2221

2322
import scala.util.control.NonFatal
2423

@@ -96,9 +95,9 @@ private[sql] class AvroFileFormat extends FileFormat
9695
// Doing input file filtering is improper because we may generate empty tasks that process no
9796
// input files but stress the scheduler. We should probably add a more general input file
9897
// filtering mechanism for `FileFormat` data sources. See SPARK-16317.
99-
if (parsedOptions.ignoreExtension || file.filePath.endsWith(".avro")) {
98+
if (parsedOptions.ignoreExtension || file.urlEncodedPath.endsWith(".avro")) {
10099
val reader = {
101-
val in = new FsInput(new Path(new URI(file.filePath)), conf)
100+
val in = new FsInput(file.toPath, conf)
102101
try {
103102
val datumReader = userProvidedSchema match {
104103
case Some(userSchema) => new GenericDatumReader[GenericRecord](userSchema)

connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,11 @@
1616
*/
1717
package org.apache.spark.sql.v2.avro
1818

19-
import java.net.URI
20-
2119
import scala.util.control.NonFatal
2220

2321
import org.apache.avro.file.DataFileReader
2422
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
2523
import org.apache.avro.mapred.FsInput
26-
import org.apache.hadoop.fs.Path
2724

2825
import org.apache.spark.TaskContext
2926
import org.apache.spark.broadcast.Broadcast
@@ -62,9 +59,9 @@ case class AvroPartitionReaderFactory(
6259
val conf = broadcastedConf.value.value
6360
val userProvidedSchema = options.schema
6461

65-
if (options.ignoreExtension || partitionedFile.filePath.endsWith(".avro")) {
62+
if (options.ignoreExtension || partitionedFile.urlEncodedPath.endsWith(".avro")) {
6663
val reader = {
67-
val in = new FsInput(new Path(new URI(partitionedFile.filePath)), conf)
64+
val in = new FsInput(partitionedFile.toPath, conf)
6865
try {
6966
val datumReader = userProvidedSchema match {
7067
case Some(userSchema) => new GenericDatumReader[GenericRecord](userSchema)

connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@
1818
package org.apache.spark.sql.avro
1919

2020
import java.io._
21-
import java.net.URI
2221

2322
import org.apache.avro.file.DataFileReader
2423
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
2524
import org.apache.avro.mapred.FsInput
2625
import org.apache.hadoop.conf.Configuration
27-
import org.apache.hadoop.fs.Path
2826

2927
import org.apache.spark.SparkConf
3028
import org.apache.spark.sql._
@@ -62,8 +60,8 @@ class AvroRowReaderSuite
6260
case BatchScanExec(_, f: AvroScan, _, _, _, _, _) => f
6361
}
6462
val filePath = fileScan.get.fileIndex.inputFiles(0)
65-
val fileSize = new File(new URI(filePath)).length
66-
val in = new FsInput(new Path(new URI(filePath)), new Configuration())
63+
val fileSize = new File(filePath.toUri).length
64+
val in = new FsInput(filePath.toPath, new Configuration())
6765
val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())
6866

6967
val it = new Iterator[InternalRow] with AvroUtils.RowReader {

connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2357,7 +2357,8 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
23572357
assert(fileScan.get.dataFilters.nonEmpty)
23582358
assert(fileScan.get.planInputPartitions().forall { partition =>
23592359
partition.asInstanceOf[FilePartition].files.forall { file =>
2360-
file.filePath.contains("p1=1") && file.filePath.contains("p2=2")
2360+
file.urlEncodedPath.contains("p1=1") &&
2361+
file.urlEncodedPath.contains("p2=2")
23612362
}
23622363
})
23632364
checkAnswer(df, Row("b", 1, 2))

core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ private[spark] class WorkerWatcher(
4747
private[deploy] var isShutDown = false
4848

4949
// Lets filter events only from the worker's rpc system
50-
private val expectedAddress = RpcAddress.fromURIString(workerUrl)
50+
private val expectedAddress = RpcAddress.fromUrlString(workerUrl)
5151
private def isWorker(address: RpcAddress) = expectedAddress == address
5252

5353
private def exitNonZero() =
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.paths
19+
20+
import java.net.URI
21+
22+
import org.apache.hadoop.fs.{FileStatus, Path}
23+
24+
/**
25+
* A canonical representation of a file path. This class is intended to provide
26+
* type-safety to the way that Spark handles Paths. Paths can be represented as
27+
* Strings in multiple ways, which are not always compatible. Spark regularly uses
28+
* two ways: 1. hadoop Path.toString and java URI.toString.
29+
*/
30+
case class SparkPath private (private val underlying: String) {
31+
def urlEncoded: String = underlying
32+
def toUri: URI = new URI(underlying)
33+
def toPath: Path = new Path(toUri)
34+
override def toString: String = underlying
35+
}
36+
37+
object SparkPath {
38+
/**
39+
* Creates a SparkPath from a hadoop Path string.
40+
* Please be very sure that the provided string is encoded (or not encoded) in the right way.
41+
*
42+
* Please see the hadoop Path documentation here:
43+
* https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/Path.html#Path-java.lang.String-
44+
*/
45+
def fromPathString(str: String): SparkPath = fromPath(new Path(str))
46+
def fromPath(path: Path): SparkPath = fromUri(path.toUri)
47+
def fromFileStatus(fs: FileStatus): SparkPath = fromPath(fs.getPath)
48+
49+
/**
50+
* Creates a SparkPath from a url-encoded string.
51+
* Note: It is the responsibility of the caller to ensure that str is a valid url-encoded string.
52+
*/
53+
def fromUrlString(str: String): SparkPath = SparkPath(str)
54+
def fromUri(uri: URI): SparkPath = fromUrlString(uri.toString)
55+
}

core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ private[spark] case class RpcAddress(_host: String, port: Int) {
3939
private[spark] object RpcAddress {
4040

4141
/** Return the [[RpcAddress]] represented by `uri`. */
42-
def fromURIString(uri: String): RpcAddress = {
42+
def fromUrlString(uri: String): RpcAddress = {
4343
val uriObj = new java.net.URI(uri)
4444
RpcAddress(uriObj.getHost, uriObj.getPort)
4545
}

mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.ml.source.image
1919

2020
import com.google.common.io.{ByteStreams, Closeables}
2121
import org.apache.hadoop.conf.Configuration
22-
import org.apache.hadoop.fs.{FileStatus, Path}
22+
import org.apache.hadoop.fs.FileStatus
2323
import org.apache.hadoop.mapreduce.Job
2424

2525
import org.apache.spark.ml.image.ImageSchema
@@ -71,8 +71,8 @@ private[image] class ImageFileFormat extends FileFormat with DataSourceRegister
7171
if (!imageSourceOptions.dropInvalid && requiredSchema.isEmpty) {
7272
Iterator(emptyUnsafeRow)
7373
} else {
74-
val origin = file.filePath
75-
val path = new Path(origin)
74+
val origin = file.urlEncodedPath
75+
val path = file.toPath
7676
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
7777
val stream = fs.open(path)
7878
val bytes = try {

scalastyle-config.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,4 +437,12 @@ This file is divided into 3 sections:
437437
Use org.apache.spark.util.Utils.createTempDir instead.
438438
</customMessage>
439439
</check>
440+
441+
<check customId="pathfromuri" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
442+
<parameters><parameter name="regex">new Path\(new URI\(</parameter></parameters>
443+
<customMessage><![CDATA[
444+
Are you sure that this string is uri encoded? Please be careful when converting hadoop Paths
445+
and URIs to and from String. If possible, please use SparkPath.
446+
]]></customMessage>
447+
</check>
440448
</scalastyle>

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.api.java.function._
3434
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
3535
import org.apache.spark.api.r.RRDD
3636
import org.apache.spark.broadcast.Broadcast
37+
import org.apache.spark.paths.SparkPath
3738
import org.apache.spark.rdd.RDD
3839
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QueryPlanningTracker, ScalaReflection, TableIdentifier}
3940
import org.apache.spark.sql.catalyst.analysis._
@@ -3924,18 +3925,18 @@ class Dataset[T] private[sql](
39243925
* @since 2.0.0
39253926
*/
39263927
def inputFiles: Array[String] = {
3927-
val files: Seq[String] = queryExecution.optimizedPlan.collect {
3928+
val files: Seq[SparkPath] = queryExecution.optimizedPlan.collect {
39283929
case LogicalRelation(fsBasedRelation: FileRelation, _, _, _) =>
39293930
fsBasedRelation.inputFiles
39303931
case fr: FileRelation =>
39313932
fr.inputFiles
39323933
case r: HiveTableRelation =>
3933-
r.tableMeta.storage.locationUri.map(_.toString).toArray
3934+
r.tableMeta.storage.locationUri.map(SparkPath.fromUri).toArray
39343935
case DataSourceV2ScanRelation(DataSourceV2Relation(table: FileTable, _, _, _, _),
39353936
_, _, _, _) =>
39363937
table.fileIndex.inputFiles
39373938
}.flatten
3938-
files.toSet.toArray
3939+
files.iterator.map(_.urlEncoded).toSet.toArray
39393940
}
39403941

39413942
/**

0 commit comments

Comments
 (0)