Skip to content

Commit 5e5ad65

Browse files
yhuairxin
authored andcommitted
[SPARK-6024][SQL] When a data source table has too many columns, it's schema cannot be stored in metastore.
JIRA: https://issues.apache.org/jira/browse/SPARK-6024 Author: Yin Huai <[email protected]> Closes #4795 from yhuai/wideSchema and squashes the following commits: 4882e6f [Yin Huai] Address comments. 73e71b4 [Yin Huai] Address comments. 143927a [Yin Huai] Simplify code. cc1d472 [Yin Huai] Make the schema wider. 12bacae [Yin Huai] If the JSON string of a schema is too large, split it before storing it in metastore. e9b4f70 [Yin Huai] Failed test.
1 parent 4ad5153 commit 5e5ad65

File tree

3 files changed

+54
-6
lines changed

3 files changed

+54
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ private[spark] object SQLConf {
5151

5252
// This is used to set the default data source
5353
val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default"
54+
// This is used to control the when we will split a schema's JSON string to multiple pieces
55+
// in order to fit the JSON string in metastore's table property (by default, the value has
56+
// a length restriction of 4000 characters). We will split the JSON string of a schema
57+
// to its length exceeds the threshold.
58+
val SCHEMA_STRING_LENGTH_THRESHOLD = "spark.sql.sources.schemaStringLengthThreshold"
5459

5560
// Whether to perform eager analysis when constructing a dataframe.
5661
// Set to false when debugging requires the ability to look at invalid query plans.
@@ -177,6 +182,11 @@ private[sql] class SQLConf extends Serializable {
177182
private[spark] def defaultDataSourceName: String =
178183
getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")
179184

185+
// Do not use a value larger than 4000 as the default value of this property.
186+
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
187+
private[spark] def schemaStringLengthThreshold: Int =
188+
getConf(SCHEMA_STRING_LENGTH_THRESHOLD, "4000").toInt
189+
180190
private[spark] def dataFrameEagerAnalysis: Boolean =
181191
getConf(DATAFRAME_EAGER_ANALYSIS, "true").toBoolean
182192

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,23 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
6969
val table = synchronized {
7070
client.getTable(in.database, in.name)
7171
}
72-
val schemaString = table.getProperty("spark.sql.sources.schema")
7372
val userSpecifiedSchema =
74-
if (schemaString == null) {
75-
None
76-
} else {
77-
Some(DataType.fromJson(schemaString).asInstanceOf[StructType])
73+
Option(table.getProperty("spark.sql.sources.schema.numParts")).map { numParts =>
74+
val parts = (0 until numParts.toInt).map { index =>
75+
val part = table.getProperty(s"spark.sql.sources.schema.part.${index}")
76+
if (part == null) {
77+
throw new AnalysisException(
78+
s"Could not read schema from the metastore because it is corrupted " +
79+
s"(missing part ${index} of the schema).")
80+
}
81+
82+
part
83+
}
84+
// Stick all parts back to a single schema string in the JSON representation
85+
// and convert it back to a StructType.
86+
DataType.fromJson(parts.mkString).asInstanceOf[StructType]
7887
}
88+
7989
// It does not appear that the ql client for the metastore has a way to enumerate all the
8090
// SerDe properties directly...
8191
val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap
@@ -119,7 +129,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
119129

120130
tbl.setProperty("spark.sql.sources.provider", provider)
121131
if (userSpecifiedSchema.isDefined) {
122-
tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json)
132+
val threshold = hive.conf.schemaStringLengthThreshold
133+
val schemaJsonString = userSpecifiedSchema.get.json
134+
// Split the JSON string.
135+
val parts = schemaJsonString.grouped(threshold).toSeq
136+
tbl.setProperty("spark.sql.sources.schema.numParts", parts.size.toString)
137+
parts.zipWithIndex.foreach { case (part, index) =>
138+
tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part)
139+
}
123140
}
124141
options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }
125142

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,4 +591,25 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
591591
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource)
592592
}
593593
}
594+
595+
test("SPARK-6024 wide schema support") {
596+
// We will need 80 splits for this schema if the threshold is 4000.
597+
val schema = StructType((1 to 5000).map(i => StructField(s"c_${i}", StringType, true)))
598+
assert(
599+
schema.json.size > conf.schemaStringLengthThreshold,
600+
"To correctly test the fix of SPARK-6024, the value of " +
601+
s"spark.sql.sources.schemaStringLengthThreshold needs to be less than ${schema.json.size}")
602+
// Manually create a metastore data source table.
603+
catalog.createDataSourceTable(
604+
tableName = "wide_schema",
605+
userSpecifiedSchema = Some(schema),
606+
provider = "json",
607+
options = Map("path" -> "just a dummy path"),
608+
isExternal = false)
609+
610+
invalidateTable("wide_schema")
611+
612+
val actualSchema = table("wide_schema").schema
613+
assert(schema === actualSchema)
614+
}
594615
}

0 commit comments

Comments
 (0)