Skip to content

Commit 1388fdd

Browse files
Reza SafiMarcelo Vanzin
authored andcommitted
[SPARK-20926][SQL] Removing exposures to guava library caused by directly accessing SessionCatalog's tableRelationCache
There could be test failures because DataStorageStrategy, HiveMetastoreCatalog and also HiveSchemaInferenceSuite were exposed to guava library by directly accessing SessionCatalog's tableRelationCacheg. These failures occur when guava shading is in place. ## What changes were proposed in this pull request? This change removes those guava exposures by introducing new methods in SessionCatalog and also changing DataStorageStrategy, HiveMetastoreCatalog and HiveSchemaInferenceSuite so that they use those proxy methods. ## How was this patch tested? Unit tests passed after applying these changes. Author: Reza Safi <[email protected]> Closes #18148 from rezasafi/branch-2.2.
1 parent acd4481 commit 1388fdd

File tree

4 files changed

+38
-15
lines changed

4 files changed

+38
-15
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.catalog
1919

2020
import java.net.URI
2121
import java.util.Locale
22+
import java.util.concurrent.Callable
2223
import javax.annotation.concurrent.GuardedBy
2324

2425
import scala.collection.mutable
@@ -125,14 +126,36 @@ class SessionCatalog(
125126
if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
126127
}
127128

128-
/**
129-
* A cache of qualified table names to table relation plans.
130-
*/
131-
val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
129+
private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
132130
val cacheSize = conf.tableRelationCacheSize
133131
CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]()
134132
}
135133

134+
/** This method provides a way to get a cached plan. */
135+
def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]): LogicalPlan = {
136+
tableRelationCache.get(t, c)
137+
}
138+
139+
/** This method provides a way to get a cached plan if the key exists. */
140+
def getCachedTable(key: QualifiedTableName): LogicalPlan = {
141+
tableRelationCache.getIfPresent(key)
142+
}
143+
144+
/** This method provides a way to cache a plan. */
145+
def cacheTable(t: QualifiedTableName, l: LogicalPlan): Unit = {
146+
tableRelationCache.put(t, l)
147+
}
148+
149+
/** This method provides a way to invalidate a cached plan. */
150+
def invalidateCachedTable(key: QualifiedTableName): Unit = {
151+
tableRelationCache.invalidate(key)
152+
}
153+
154+
/** This method provides a way to invalidate all the cached plans. */
155+
def invalidateAllCachedTables(): Unit = {
156+
tableRelationCache.invalidateAll()
157+
}
158+
136159
/**
137160
* This method is used to make the given path qualified before we
138161
* store this path in the underlying external catalog. So, when a path

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,9 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
215215
private def readDataSourceTable(r: CatalogRelation): LogicalPlan = {
216216
val table = r.tableMeta
217217
val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
218-
val cache = sparkSession.sessionState.catalog.tableRelationCache
218+
val catalogProxy = sparkSession.sessionState.catalog
219219

220-
val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
220+
val plan = catalogProxy.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
221221
override def call(): LogicalPlan = {
222222
val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
223223
val dataSource =

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.spark.sql.types._
4141
private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
4242
// these are def_s and not val/lazy val since the latter would introduce circular references
4343
private def sessionState = sparkSession.sessionState
44-
private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
44+
private def catalogProxy = sparkSession.sessionState.catalog
4545
import HiveMetastoreCatalog._
4646

4747
/** These locks guard against multiple attempts to instantiate a table, which wastes memory. */
@@ -61,7 +61,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
6161
val key = QualifiedTableName(
6262
table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase,
6363
table.table.toLowerCase)
64-
tableRelationCache.getIfPresent(key)
64+
catalogProxy.getCachedTable(key)
6565
}
6666

6767
private def getCached(
@@ -71,7 +71,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
7171
expectedFileFormat: Class[_ <: FileFormat],
7272
partitionSchema: Option[StructType]): Option[LogicalRelation] = {
7373

74-
tableRelationCache.getIfPresent(tableIdentifier) match {
74+
catalogProxy.getCachedTable(tableIdentifier) match {
7575
case null => None // Cache miss
7676
case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
7777
val cachedRelationFileFormatClass = relation.fileFormat.getClass
@@ -92,21 +92,21 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
9292
Some(logical)
9393
} else {
9494
// If the cached relation is not updated, we invalidate it right away.
95-
tableRelationCache.invalidate(tableIdentifier)
95+
catalogProxy.invalidateCachedTable(tableIdentifier)
9696
None
9797
}
9898
case _ =>
9999
logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " +
100100
s"However, we are getting a ${relation.fileFormat} from the metastore cache. " +
101101
"This cached entry will be invalidated.")
102-
tableRelationCache.invalidate(tableIdentifier)
102+
catalogProxy.invalidateCachedTable(tableIdentifier)
103103
None
104104
}
105105
case other =>
106106
logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " +
107107
s"However, we are getting a $other from the metastore cache. " +
108108
"This cached entry will be invalidated.")
109-
tableRelationCache.invalidate(tableIdentifier)
109+
catalogProxy.invalidateCachedTable(tableIdentifier)
110110
None
111111
}
112112
}
@@ -176,7 +176,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
176176
fileFormat = fileFormat,
177177
options = options)(sparkSession = sparkSession)
178178
val created = LogicalRelation(fsRelation, updatedTable)
179-
tableRelationCache.put(tableIdentifier, created)
179+
catalogProxy.cacheTable(tableIdentifier, created)
180180
created
181181
}
182182

@@ -205,7 +205,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
205205
className = fileType).resolveRelation(),
206206
table = updatedTable)
207207

208-
tableRelationCache.put(tableIdentifier, created)
208+
catalogProxy.cacheTable(tableIdentifier, created)
209209
created
210210
}
211211

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class HiveSchemaInferenceSuite
4646

4747
override def afterEach(): Unit = {
4848
super.afterEach()
49-
spark.sessionState.catalog.tableRelationCache.invalidateAll()
49+
spark.sessionState.catalog.invalidateAllCachedTables()
5050
FileStatusCache.resetForTesting()
5151
}
5252

0 commit comments

Comments
 (0)