From 08fd20007ce2dc751208167e7f7c44cbeb57e8a2 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Fri, 29 Sep 2023 17:37:19 +0200 Subject: [PATCH 1/3] SPARK-45386: Fix correctness issue with StorageLevel.NONE --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 +++- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 6 ++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 174f47a3b82b..6b78bc62e25a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3833,7 +3833,9 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def persist(newLevel: StorageLevel): this.type = { - sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel) + if (newLevel != StorageLevel.NONE) { + sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel) + } this } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 324695349787..0683fc4694e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -47,6 +47,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ +import org.apache.spark.storage.StorageLevel case class TestDataPoint(x: Int, y: Double, s: String, t: TestDataPoint2) case class TestDataPoint2(x: Int, s: String) @@ -2597,6 +2598,11 @@ class DatasetSuite extends QueryTest parameters = Map("cls" -> classOf[Array[Int]].getName)) } } + + test("SPARK-45386: persist with StorageLevel.NONE should give correct count") { + val ds = Seq(1, 2).toDS().persist(StorageLevel.NONE) + assert(ds.count() == 2) + } } class DatasetLargeResultCollectingSuite extends QueryTest From 4434ebca48f167a980dbb0b687726bffdaa54e6b Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Sun, 1 Oct 2023 09:06:45 +0200 Subject: [PATCH 2/3] Move to CacheManager --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 +--- .../scala/org/apache/spark/sql/execution/CacheManager.scala | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 6b78bc62e25a..174f47a3b82b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3833,9 +3833,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def persist(newLevel: StorageLevel): this.type = { - if (newLevel != StorageLevel.NONE) { - sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel) - } + sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel) this } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 064819275e00..4e16bf904bc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -113,7 +113,8 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { planToCache: LogicalPlan, tableName: Option[String], storageLevel: StorageLevel): Unit = { - if (lookupCachedData(planToCache).nonEmpty) { + if (storageLevel == StorageLevel.NONE) { + } else if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") } else { val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark) From e228dfdedd517da3311ff2a92b9026c6d78c428b Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Mon, 2 Oct 2023 09:58:46 +0200 Subject: [PATCH 3/3] Add comment --- .../main/scala/org/apache/spark/sql/execution/CacheManager.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 4e16bf904bc9..e906c74f8a5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -114,6 +114,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { tableName: Option[String], storageLevel: StorageLevel): Unit = { if (storageLevel == StorageLevel.NONE) { + // Do nothing for StorageLevel.NONE since it will not actually cache any data. } else if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") } else {