From 755e5b423063d0a44c027d000e8395c135fd7a60 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 3 Jun 2020 01:49:08 +0800 Subject: [PATCH 01/10] initial commit --- ...reamingAggregationCompatibilitySuite.scala | 105 ++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala new file mode 100644 index 000000000000..9ecbbad25ef7 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.io.File + +import org.apache.commons.io.FileUtils + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.functions._ +import org.apache.spark.util.Utils + +class StreamingAggregationCompatibilitySuite extends StreamTest { + import testImplicits._ + + test("common functions") { + val inputData = MemoryStream[Int] + + val aggregated = + inputData.toDF().toDF("value") + .selectExpr( + "value", + "value % 5 AS id", + "CAST(value AS STRING) as str", + "CAST(value AS FLOAT) as f", + "CAST(value AS DOUBLE) as d", + "CAST(value AS DECIMAL) as dec", + "value % 3 AS mod", + "named_struct('key', CAST(value AS STRING), 'value', value) AS s") + .groupBy($"id") + .agg( + avg($"value").as("avg_v"), + avg($"f").as("avg_f"), + avg($"d").as("avg_d"), + avg($"dec").as("avg_dec"), + count($"value").as("cnt"), + first($"value").as("first_v"), + first($"s").as("first_s"), + last($"value").as("last_v"), + last($"s").as("last_s"), + min(struct("value", "str")).as("min_struct"), + max($"value").as("max_v"), + sum($"value").as("sum_v"), + sum($"f").as("sum_f"), + sum($"d").as("sum_d"), + sum($"dec").as("sum_dec"), + collect_list($"value").as("col_list"), + collect_set($"mod").as("col_set")) + .select("id", "avg_v", "avg_f", "avg_d", "avg_dec", "cnt", "first_v", "first_s.value", + "last_v", "last_s.value", "min_struct.value", "max_v", "sum_v", "sum_f", "sum_d", + "sum_dec", "col_list", "col_set") + + val resourceUri = this.getClass.getResource("/structured-streaming/" + + "checkpoint-version-2.4.5-for-compatibility-test-common-functions").toURI + val checkpointDir = Utils.createTempDir().getCanonicalFile + FileUtils.copyDirectory(new File(resourceUri), checkpointDir) + + inputData.addData(0 to 9: _*) + + testStream(aggregated, Complete)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), +// AddData(inputData, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9), +// CheckAnswer( +// Row(0, 2.5, 2.5F, 2.5, 2.5000, 2, 0, 0, 5, 5, 0, 5, 5, 5.0, 5.0, 5, Seq(0, 5), +// Seq(0, 2)), +// Row(1, 3.5, 3.5F, 3.5, 3.5000, 2, 1, 1, 6, 6, 1, 6, 7, 7.0, 7.0, 7, Seq(1, 6), +// Seq(0, 1)), +// Row(2, 4.5, 4.5F, 4.5, 4.5000, 2, 2, 2, 7, 7, 2, 7, 9, 9.0, 9.0, 9, Seq(2, 7), +// Seq(1, 2)), +// Row(3, 5.5, 5.5F, 5.5, 5.5000, 2, 3, 3, 8, 8, 3, 8, 11, 11.0, 11.0, 11, Seq(3, 8), +// Seq(0, 2)), +// Row(4, 6.5, 6.5F, 6.5, 6.5000, 2, 4, 4, 9, 9, 4, 9, 13, 13.0, 13.0, 13, Seq(4, 9), +// Seq(0, 1))), + AddData(inputData, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19), + CheckAnswer( + Row(0, 7.5, 7.5, 7.5, 7.5000, 4, 0, 0, 15, 15, 0, 15, 30, 30.0, 30.0, 30, + Seq(0, 5, 10, 15), Seq(0, 1, 2)), + Row(1, 8.5, 8.5, 8.5, 8.5000, 4, 1, 1, 16, 16, 1, 16, 34, 34.0, 34.0, 34, + Seq(1, 6, 11, 16), Seq(0, 1, 2)), + Row(2, 9.5, 9.5, 9.5, 9.5000, 4, 2, 2, 17, 17, 2, 17, 38, 38.0, 38.0, 38, + Seq(2, 7, 12, 17), Seq(0, 1, 2)), + Row(3, 10.5, 10.5, 10.5, 10.5000, 4, 3, 3, 18, 18, 3, 18, 42, 42.0, 42.0, 42, + Seq(3, 8, 13, 18), Seq(0, 1, 2)), + Row(4, 11.5, 11.5, 11.5, 11.5000, 4, 4, 4, 19, 19, 4, 19, 46, 46.0, 46.0, 46, + Seq(4, 9, 14, 19), Seq(0, 1, 2))) + ) + } +} From 2c36693b259af685eca474aefc54a4e9d1d698fe Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 3 Jun 2020 17:48:00 +0800 Subject: [PATCH 02/10] tmp --- .../scala/org/apache/spark/util/Utils.scala | 2 +- ...reamingAggregationCompatibilitySuite.scala | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 9636fe88c77c..d37f4293969c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -322,7 +322,7 @@ private[spark] object Utils extends Logging { root: String = System.getProperty("java.io.tmpdir"), namePrefix: String = "spark"): File = { val dir = createDirectory(root, namePrefix) - ShutdownHookManager.registerShutdownDeleteDir(dir) + // ShutdownHookManager.registerShutdownDeleteDir(dir) dir } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala index 9ecbbad25ef7..0b33d0377508 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala @@ -102,4 +102,25 @@ class StreamingAggregationCompatibilitySuite extends StreamTest { Seq(4, 9, 14, 19), Seq(0, 1, 2))) ) } + + test("SPARK-28067 change the sum decimal unsafe row format") { + val inputData = MemoryStream[Int] + + val aggregated = + inputData.toDF().toDF("value") + .selectExpr( + "value", + "value % 2 AS id", + "CAST(value AS DECIMAL) as dec") + .groupBy($"id") + .agg(sum($"dec").as("sum_dec")) + .select("id", "sum_dec") + + val checkpointDir = Utils.createTempDir().getCanonicalFile + testStream(aggregated, Complete)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9), + CheckAnswer(Row(0, 2.5, 2.5F, 2.5, 2.5000, 2, 0, 0, 5, 5, 0, 5, 5, 5.0, 5.0, 5, Seq(0, 5))) + ) + } } From 07f30f89a37104d33c52d7822b22769f174ecf3a Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 3 Jun 2020 02:28:33 +0800 Subject: [PATCH 03/10] initial commit --- .../apache/spark/sql/internal/SQLConf.scala | 9 ++++ .../StreamingAggregationStateManager.scala | 46 ++++++++++++++++++- ...treamingAggregationStateManagerSuite.scala | 22 ++++++++- 3 files changed, 74 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6bbeb2de7538..08488da09748 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1568,6 +1568,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STREAMING_STATE_FORMAT_CHECK_ENABLED = + buildConf("spark.sql.streaming.stateFormatCheck.enabled") + .doc("Whether to detect a streaming query may try to use an invalid UnsafeRow in the " + + "state store.") + .version("3.1.0") + .internal() + .booleanConf + .createWithDefault(true) + val PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION = buildConf("spark.sql.statistics.parallelFileListingInStatsComputation.enabled") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala index 9bfb9561b42a..f963b5c95f5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution.streaming.state +import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType /** @@ -59,6 +61,9 @@ sealed trait StreamingAggregationStateManager extends Serializable { /** Return an iterator containing all the values in target state store. */ def values(store: StateStore): Iterator[UnsafeRow] + + /** Check the UnsafeRow format with the expected schema */ + def unsafeRowFormatValidation(row: UnsafeRow, schema: StructType): Unit } object StreamingAggregationStateManager extends Logging { @@ -77,6 +82,14 @@ object StreamingAggregationStateManager extends Logging { } } +/** + * An exception thrown when an invalid UnsafeRow is detected. + */ +class InvalidUnsafeRowException + extends SparkException("The UnsafeRow format is invalid. This may happen when using the old " + + "version or broken checkpoint file. To resolve this problem, you can try to restart the " + + "application or use the legacy way to process streaming state.", null) + abstract class StreamingAggregationStateManagerBaseImpl( protected val keyExpressions: Seq[Attribute], protected val inputRowAttributes: Seq[Attribute]) extends StreamingAggregationStateManager { @@ -84,6 +97,9 @@ abstract class StreamingAggregationStateManagerBaseImpl( @transient protected lazy val keyProjector = GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes) + // Consider about the cost, only check the UnsafeRow format for the first row + private var checkFormat = true + override def getKey(row: UnsafeRow): UnsafeRow = keyProjector(row) override def commit(store: StateStore): Long = store.commit() @@ -94,6 +110,28 @@ abstract class StreamingAggregationStateManagerBaseImpl( // discard and don't convert values to avoid computation store.getRange(None, None).map(_.key) } + + override def unsafeRowFormatValidation(row: UnsafeRow, schema: StructType): Unit = { + if (checkFormat && SQLConf.get.getConf( + SQLConf.STREAMING_STATE_FORMAT_CHECK_ENABLED) && row != null) { + if (schema.fields.length != row.numFields) { + throw new InvalidUnsafeRowException + } + schema.fields.zipWithIndex + .filterNot(field => UnsafeRow.isFixedLength(field._1.dataType)).foreach { + case (_, index) => + val offsetAndSize = row.getLong(index) + val offset = (offsetAndSize >> 32).toInt + val size = offsetAndSize.toInt + if (size < 0 || + offset < UnsafeRow.calculateBitSetWidthInBytes(row.numFields) + 8 * row.numFields || + offset + size > row.getSizeInBytes) { + throw new InvalidUnsafeRowException + } + } + checkFormat = false + } + } } /** @@ -114,7 +152,9 @@ class StreamingAggregationStateManagerImplV1( override def getStateValueSchema: StructType = inputRowAttributes.toStructType override def get(store: StateStore, key: UnsafeRow): UnsafeRow = { - store.get(key) + val res = store.get(key) + unsafeRowFormatValidation(res, inputRowAttributes.toStructType) + res } override def put(store: StateStore, row: UnsafeRow): Unit = { @@ -173,7 +213,9 @@ class StreamingAggregationStateManagerImplV2( return savedState } - restoreOriginalRow(key, savedState) + val res = restoreOriginalRow(key, savedState) + unsafeRowFormatValidation(res, inputRowAttributes.toStructType) + res } override def put(store: StateStore, row: UnsafeRow): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManagerSuite.scala index daacdfd58c7b..2881e2e6f6c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManagerSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming.state import org.apache.spark.sql.catalyst.expressions.{Attribute, SpecificInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.streaming.StreamTest -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} class StreamingAggregationStateManagerSuite extends StreamTest { // ============================ fields and method for test data ============================ @@ -123,4 +123,24 @@ class StreamingAggregationStateManagerSuite extends StreamTest { // state manager should return row which is same as input row regardless of format version assert(inputRow === stateManager.get(memoryStateStore, keyRow)) } + + test("UnsafeRow format invalidation") { + // Pass the checking + val stateManager0 = StreamingAggregationStateManager.createStateManager(testKeyAttributes, + testOutputAttributes, 2) + stateManager0.unsafeRowFormatValidation(testRow, testOutputSchema) + // Fail for fields number not match + val stateManager1 = StreamingAggregationStateManager.createStateManager(testKeyAttributes, + testOutputAttributes, 2) + assertThrows[InvalidUnsafeRowException](stateManager1.unsafeRowFormatValidation( + testRow, StructType(testKeys.map(createIntegerField)))) + // Fail for invalid schema + val stateManager2 = StreamingAggregationStateManager.createStateManager(testKeyAttributes, + testOutputAttributes, 2) + val invalidSchema = StructType(testKeys.map(createIntegerField) ++ + Seq(StructField("struct", StructType(Seq(StructField("value1", StringType, true))), true), + StructField("value2", IntegerType, false))) + assertThrows[InvalidUnsafeRowException](stateManager2.unsafeRowFormatValidation( + testRow, invalidSchema)) + } } From 0c12045ecc9566d1e19c0c9f72acec999e4e8dea Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 4 Jun 2020 11:28:05 +0800 Subject: [PATCH 04/10] add new test --- .../scala/org/apache/spark/util/Utils.scala | 2 +- ...reamingAggregationCompatibilitySuite.scala | 21 +++++++++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d37f4293969c..9636fe88c77c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -322,7 +322,7 @@ private[spark] object Utils extends Logging { root: String = System.getProperty("java.io.tmpdir"), namePrefix: String = "spark"): File = { val dir = createDirectory(root, namePrefix) - // ShutdownHookManager.registerShutdownDeleteDir(dir) + ShutdownHookManager.registerShutdownDeleteDir(dir) dir } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala index 0b33d0377508..fd0869eca70a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala @@ -21,6 +21,7 @@ import java.io.File import org.apache.commons.io.FileUtils +import org.apache.spark.SparkException import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete import org.apache.spark.sql.execution.streaming.MemoryStream @@ -113,14 +114,26 @@ class StreamingAggregationCompatibilitySuite extends StreamTest { "value % 2 AS id", "CAST(value AS DECIMAL) as dec") .groupBy($"id") - .agg(sum($"dec").as("sum_dec")) - .select("id", "sum_dec") + .agg(sum($"dec").as("sum_dec"), collect_list($"value").as("col_list")) + .select("id", "sum_dec", "col_list") + val resourceUri = this.getClass.getResource("/structured-streaming/" + + "checkpoint-version-2.4.5-for-compatibility-test-sum-decimal").toURI val checkpointDir = Utils.createTempDir().getCanonicalFile + + FileUtils.copyDirectory(new File(resourceUri), checkpointDir) + + inputData.addData(0 to 9: _*) + testStream(aggregated, Complete)( StartStream(checkpointLocation = checkpointDir.getAbsolutePath), - AddData(inputData, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9), - CheckAnswer(Row(0, 2.5, 2.5F, 2.5, 2.5000, 2, 0, 0, 5, 5, 0, 5, 5, 5.0, 5.0, 5, Seq(0, 5))) +// AddData(inputData, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9), +// CheckAnswer(Row(0, 20, Seq(0, 2, 4, 6, 8)), Row(1, 25, Seq(1, 3, 5, 7, 9))) + AddData(inputData, 10 to 19: _*), + ExpectFailure[SparkException](e => { + // Check the exception message to make sure the state store format changing. + assert(e.getCause.getMessage.contains("The UnsafeRow format is invalid")) + }) ) } } From 357f17c22449fb5cf32af5b06c450346163a52fa Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 4 Jun 2020 14:05:11 +0800 Subject: [PATCH 05/10] new tests for statistical functions --- ...reamingAggregationCompatibilitySuite.scala | 143 +++++++++++++----- 1 file changed, 109 insertions(+), 34 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala index fd0869eca70a..e131721ae509 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala @@ -28,12 +28,27 @@ import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ import org.apache.spark.util.Utils +/** + * An integrated test for streaming aggregation compatibility. + * For each PR breaks this test, we need to pay attention to the underlying unsafe row format + * changing of aggregate functions. All the checkpoint dirs were generated based on Spark version + * 2.4.5. If we accept the changes, it means the checkpoint for Structured Streaming will become + * non-reusable. Please add a new test for the issue, just like the test suite "SPARK-28067 change + * the sum decimal unsafe row format". + */ class StreamingAggregationCompatibilitySuite extends StreamTest { import testImplicits._ + private def prepareCheckpointDir(testName: String): File = { + val resourceUri = this.getClass.getResource("/structured-streaming/" + + s"checkpoint-version-2.4.5-for-compatibility-test-${testName}").toURI + val checkpointDir = Utils.createTempDir().getCanonicalFile + FileUtils.copyDirectory(new File(resourceUri), checkpointDir) + checkpointDir + } + test("common functions") { val inputData = MemoryStream[Int] - val aggregated = inputData.toDF().toDF("value") .selectExpr( @@ -61,52 +76,114 @@ class StreamingAggregationCompatibilitySuite extends StreamTest { sum($"value").as("sum_v"), sum($"f").as("sum_f"), sum($"d").as("sum_d"), - sum($"dec").as("sum_dec"), + // The test for sum decimal broke by SPARK-28067, use separated test for it + // sum($"dec").as("sum_dec"), collect_list($"value").as("col_list"), collect_set($"mod").as("col_set")) .select("id", "avg_v", "avg_f", "avg_d", "avg_dec", "cnt", "first_v", "first_s.value", "last_v", "last_s.value", "min_struct.value", "max_v", "sum_v", "sum_f", "sum_d", - "sum_dec", "col_list", "col_set") - - val resourceUri = this.getClass.getResource("/structured-streaming/" + - "checkpoint-version-2.4.5-for-compatibility-test-common-functions").toURI - val checkpointDir = Utils.createTempDir().getCanonicalFile - FileUtils.copyDirectory(new File(resourceUri), checkpointDir) + "col_list", "col_set") + val checkpointDir = prepareCheckpointDir("common-functions") inputData.addData(0 to 9: _*) testStream(aggregated, Complete)( StartStream(checkpointLocation = checkpointDir.getAbsolutePath), -// AddData(inputData, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9), -// CheckAnswer( -// Row(0, 2.5, 2.5F, 2.5, 2.5000, 2, 0, 0, 5, 5, 0, 5, 5, 5.0, 5.0, 5, Seq(0, 5), -// Seq(0, 2)), -// Row(1, 3.5, 3.5F, 3.5, 3.5000, 2, 1, 1, 6, 6, 1, 6, 7, 7.0, 7.0, 7, Seq(1, 6), -// Seq(0, 1)), -// Row(2, 4.5, 4.5F, 4.5, 4.5000, 2, 2, 2, 7, 7, 2, 7, 9, 9.0, 9.0, 9, Seq(2, 7), -// Seq(1, 2)), -// Row(3, 5.5, 5.5F, 5.5, 5.5000, 2, 3, 3, 8, 8, 3, 8, 11, 11.0, 11.0, 11, Seq(3, 8), -// Seq(0, 2)), -// Row(4, 6.5, 6.5F, 6.5, 6.5000, 2, 4, 4, 9, 9, 4, 9, 13, 13.0, 13.0, 13, Seq(4, 9), -// Seq(0, 1))), + /* + Note: The checkpoint was generated using the following input in Spark version 2.4.5 + AddData(inputData, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9), + CheckAnswer( + Row(0, 2.5, 2.5F, 2.5, 2.5000, 2, 0, 0, 5, 5, 0, 5, 5, 5.0, 5.0, Seq(0, 5), + Seq(0, 2)), + Row(1, 3.5, 3.5F, 3.5, 3.5000, 2, 1, 1, 6, 6, 1, 6, 7, 7.0, 7.0, Seq(1, 6), + Seq(0, 1)), + Row(2, 4.5, 4.5F, 4.5, 4.5000, 2, 2, 2, 7, 7, 2, 7, 9, 9.0, 9.0, Seq(2, 7), + Seq(1, 2)), + Row(3, 5.5, 5.5F, 5.5, 5.5000, 2, 3, 3, 8, 8, 3, 8, 11, 11.0, 11.0, Seq(3, 8), + Seq(0, 2)), + Row(4, 6.5, 6.5F, 6.5, 6.5000, 2, 4, 4, 9, 9, 4, 9, 13, 13.0, 13.0, Seq(4, 9), + Seq(0, 1))) + */ AddData(inputData, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19), CheckAnswer( - Row(0, 7.5, 7.5, 7.5, 7.5000, 4, 0, 0, 15, 15, 0, 15, 30, 30.0, 30.0, 30, + Row(0, 7.5, 7.5, 7.5, 7.5000, 4, 0, 0, 15, 15, 0, 15, 30, 30.0, 30.0, Seq(0, 5, 10, 15), Seq(0, 1, 2)), - Row(1, 8.5, 8.5, 8.5, 8.5000, 4, 1, 1, 16, 16, 1, 16, 34, 34.0, 34.0, 34, + Row(1, 8.5, 8.5, 8.5, 8.5000, 4, 1, 1, 16, 16, 1, 16, 34, 34.0, 34.0, Seq(1, 6, 11, 16), Seq(0, 1, 2)), - Row(2, 9.5, 9.5, 9.5, 9.5000, 4, 2, 2, 17, 17, 2, 17, 38, 38.0, 38.0, 38, + Row(2, 9.5, 9.5, 9.5, 9.5000, 4, 2, 2, 17, 17, 2, 17, 38, 38.0, 38.0, Seq(2, 7, 12, 17), Seq(0, 1, 2)), - Row(3, 10.5, 10.5, 10.5, 10.5000, 4, 3, 3, 18, 18, 3, 18, 42, 42.0, 42.0, 42, + Row(3, 10.5, 10.5, 10.5, 10.5000, 4, 3, 3, 18, 18, 3, 18, 42, 42.0, 42.0, Seq(3, 8, 13, 18), Seq(0, 1, 2)), - Row(4, 11.5, 11.5, 11.5, 11.5000, 4, 4, 4, 19, 19, 4, 19, 46, 46.0, 46.0, 46, + Row(4, 11.5, 11.5, 11.5, 11.5000, 4, 4, 4, 19, 19, 4, 19, 46, 46.0, 46.0, Seq(4, 9, 14, 19), Seq(0, 1, 2))) ) } + test("statistical functions") { + val inputData = MemoryStream[Long] + val aggregated = + inputData.toDF().toDF("value") + .selectExpr( + "value", + "value % 5 AS id", + "CAST(value AS STRING) as str", + "CAST(value AS FLOAT) as f", + "CAST(value AS DOUBLE) as d", + "CAST(value AS DECIMAL) as dec", + "value % 3 AS mod") + .groupBy($"id") + .agg( + kurtosis($"d").as("kts"), + skewness($"d").as("skew"), + approx_count_distinct($"mod").as("approx_cnt"), + approx_count_distinct($"f").as("approx_cnt_f"), + approx_count_distinct($"d").as("approx_cnt_d"), + approx_count_distinct($"dec").as("approx_cnt_dec"), + approx_count_distinct($"str").as("approx_cnt_str"), + stddev_pop($"d").as("stddev_pop"), + stddev_samp($"d").as("stddev_samp"), + var_pop($"d").as("var_pop"), + var_samp($"d").as("var_samp"), + covar_pop($"value", $"mod").as("covar_pop"), + covar_samp($"value", $"mod").as("covar_samp"), + corr($"value", $"mod").as("corr")) + .select("id", "kts", "skew", "approx_cnt", "approx_cnt_f", "approx_cnt_d", + "approx_cnt_dec", "approx_cnt_str", "stddev_pop", "stddev_samp", "var_pop", "var_samp", + "covar_pop", "covar_samp", "corr") + + val checkpointDir = prepareCheckpointDir("statistical-functions") + inputData.addData(0L to 9L: _*) + + testStream(aggregated, Complete)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + /* + Note: The checkpoint was generated using the following input in Spark version 2.4.5 + AddData(inputData, 0L to 9L: _*), + CheckAnswer( + Row(0, -2.0, 0.0, 2, 2, 2, 2, 2, 2.5, 3.5355339059327378, 6.25, 12.5, 2.5, 5.0, 1.0), + Row(1, -2.0, 0.0, 2, 2, 2, 2, 2, 2.5, 3.5355339059327378, 6.25, 12.5, -1.25, -2.5, -1.0), + Row(2, -2.0, 0.0, 2, 2, 2, 2, 2, 2.5, 3.5355339059327378, 6.25, 12.5, -1.25, -2.5, -1.0), + Row(3, -2.0, 0.0, 2, 2, 2, 2, 2, 2.5, 3.5355339059327378, 6.25, 12.5, 2.5, 5.0, 1.0), + Row(4, -2.0, 0.0, 2, 2, 2, 2, 2, 2.5, 3.5355339059327378, 6.25, 12.5, -1.25, -2.5, -1.0)) + */ + + AddData(inputData, 10L to 19L: _*), + CheckAnswer( + Row(0, -1.36, 0.0, 3, 4, 4, 4, 4, 5.5901699437494745, 6.454972243679028, 31.25, + 41.666666666666664, -0.625, -0.8333333333333334, -0.13483997249264842), + Row(1, -1.36, 0.0, 3, 4, 4, 4, 4, 5.5901699437494745, 6.454972243679028, 31.25, + 41.666666666666664, 1.25, 1.6666666666666667, 0.31622776601683794), + Row(2, -1.36, 0.0, 3, 4, 4, 4, 4, 5.5901699437494745, 6.454972243679028, 31.25, + 41.666666666666664, -0.625, -0.8333333333333334, -0.13483997249264842), + Row(3, -1.36, 0.0, 3, 4, 4, 4, 4, 5.5901699437494745, 6.454972243679028, 31.25, + 41.666666666666664, -0.625, -0.8333333333333334, -0.13483997249264842), + Row(4, -1.36, 0.0, 3, 4, 4, 4, 4, 5.5901699437494745, 6.454972243679028, 31.25, + 41.666666666666664, 1.25, 1.6666666666666667, 0.31622776601683794)) + ) + } + test("SPARK-28067 change the sum decimal unsafe row format") { val inputData = MemoryStream[Int] - val aggregated = inputData.toDF().toDF("value") .selectExpr( @@ -117,18 +194,16 @@ class StreamingAggregationCompatibilitySuite extends StreamTest { .agg(sum($"dec").as("sum_dec"), collect_list($"value").as("col_list")) .select("id", "sum_dec", "col_list") - val resourceUri = this.getClass.getResource("/structured-streaming/" + - "checkpoint-version-2.4.5-for-compatibility-test-sum-decimal").toURI - val checkpointDir = Utils.createTempDir().getCanonicalFile - - FileUtils.copyDirectory(new File(resourceUri), checkpointDir) - + val checkpointDir = prepareCheckpointDir("sum-decimal") inputData.addData(0 to 9: _*) testStream(aggregated, Complete)( StartStream(checkpointLocation = checkpointDir.getAbsolutePath), -// AddData(inputData, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9), -// CheckAnswer(Row(0, 20, Seq(0, 2, 4, 6, 8)), Row(1, 25, Seq(1, 3, 5, 7, 9))) + /* + Note: The checkpoint was generated using the following input in Spark version 2.4.5 + AddData(inputData, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9), + CheckAnswer(Row(0, 20, Seq(0, 2, 4, 6, 8)), Row(1, 25, Seq(1, 3, 5, 7, 9))) + */ AddData(inputData, 10 to 19: _*), ExpectFailure[SparkException](e => { // Check the exception message to make sure the state store format changing. From 59db3f35e924012769c51abad61746febb844c49 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 4 Jun 2020 15:28:13 +0800 Subject: [PATCH 06/10] Add checkpoint files --- .../commits/0 | 2 ++ .../metadata | 1 + .../offsets/0 | 3 +++ .../state/0/0/1.delta | Bin 0 -> 46 bytes .../state/0/1/1.delta | Bin 0 -> 351 bytes .../state/0/2/1.delta | Bin 0 -> 218 bytes .../state/0/3/1.delta | Bin 0 -> 227 bytes .../state/0/4/1.delta | Bin 0 -> 224 bytes .../commits/0 | 2 ++ .../metadata | 1 + .../offsets/0 | 3 +++ .../state/0/0/1.delta | Bin 0 -> 320 bytes .../state/0/1/1.delta | Bin 0 -> 46 bytes .../state/0/2/1.delta | Bin 0 -> 209 bytes .../state/0/3/1.delta | Bin 0 -> 198 bytes .../state/0/4/1.delta | Bin 0 -> 205 bytes .../commits/0 | 2 ++ .../metadata | 1 + .../offsets/0 | 3 +++ .../state/0/0/1.delta | Bin 0 -> 46 bytes .../state/0/1/1.delta | Bin 0 -> 117 bytes .../state/0/2/1.delta | Bin 0 -> 46 bytes .../state/0/3/1.delta | Bin 0 -> 118 bytes .../state/0/4/1.delta | Bin 0 -> 46 bytes .../StreamingAggregationCompatibilitySuite.scala | 4 ++-- 25 files changed, 20 insertions(+), 2 deletions(-) create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/commits/0 create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/metadata create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/offsets/0 create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/state/0/0/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/state/0/1/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/state/0/2/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/state/0/3/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/state/0/4/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/commits/0 create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/metadata create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/offsets/0 create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/state/0/0/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/state/0/1/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/state/0/2/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/state/0/3/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/state/0/4/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/commits/0 create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/metadata create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/offsets/0 create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/state/0/0/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/state/0/1/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/state/0/2/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/state/0/3/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/state/0/4/1.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/commits/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/commits/0 new file mode 100644 index 000000000000..9c1e3021c3ea --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/commits/0 @@ -0,0 +1,2 @@ +v1 +{"nextBatchWatermarkMs":0} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/metadata b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/metadata new file mode 100644 index 000000000000..26a0d5d707d4 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/metadata @@ -0,0 +1 @@ +{"id":"5f1362da-99ce-4ba2-97e1-d992c295e563"} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/offsets/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/offsets/0 new file mode 100644 index 000000000000..43cde63b8f68 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/offsets/0 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1591241624051,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5"}} +0 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/state/0/0/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/state/0/0/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..6352978051846970ca41a0ca97fd79952105726d GIT binary patch literal 46 icmeZ?GI7euPtF!)VPIeY;oA+q9RGp92POd&g989JFAHe^ literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/state/0/1/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/state/0/1/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..e7de33ff90ca7ced9d998eae695c92a5bd25ccde GIT binary patch literal 351 zcmY+Au}T9$5Qe|G-HVracza3Ch&Cyb<4}u$22MzE*r<@g!a_tLpq+}XWja${=^NPl z4uYkRVVO73Sx~`&UHHG5`S)k17yE~I57)QDvB1E<+so_P72puI<*~y-w+d1cN3T^f z0ttE2eqYbKBMsMSI4^sjiX3?)1$IObFz8kh^Fjv;HIL|?-eZnQH(I{i9<2}tu;Z8Q zIoWuVR)8&tE11BE#7j_e=I^ASX_Ouz0jX(t7T2tE1c66zqR|`?xv9+nVqgC#T~A)A$`MCe{rBnkrLwpTosHjfu755Q521b19eQ1()1&z^As#KGr+9ymE!;nfu9ze zDU-Ibt-ma7PlfpTE=JRgT$WqpLIFc*@F>l8B*s*#XZ&6eXVZx<&6!|WM`^r=u;d(+ cV4}=B4nk@dpeckTWt=38k7)XPqAdQeFRWi1`Tzg` literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/state/0/3/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/state/0/3/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..82386b020a045d3272790e9453561825943db24b GIT binary patch literal 227 zcmX|+u}T9$5Qe{9_wE*=+=e?jku46vun>d z)_IDQd4o6$A%8J5|3ClCFx}JjZ29)Gxcmja6|6s2#l6%HMWrke)O${@nfRzrC>`f! zih0IZ9$Rt#Ir{mW+(ApT5y>&ID&UGMGR=jLJ$`%);tg`aQs2FDbVGG!LF2|5g@$38 qH(xXppJX8^iY0Iyp@v&!wippdn1|FY1DT{x*~(7P?Y}t{e!xG0N*iwg literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/state/0/4/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-common-functions/state/0/4/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..ac70b46fc576dcce44561c48dfdcf908d9646bee GIT binary patch literal 224 zcmX|+v1$TA5Qe|qTQ3_7IHI`=Xb}z~3la+vw9Aqzfhz>9Y%MjUO?h2x13o~WB6Uh1 zCC^}EMiB=F=KuMAW~N+>p4aco)x!_4HSo0`Br}}_L(6(1n2(gy(ko&YEDa%d6P!oP z6Q9c7Ywv{SRYodDng6LJ^t=hk^&>S)QN%jmMGVRY-Q#Ib-$-7_?>01tnzYbAhTVp2 l+`(A`kXv3><0jH4Q=BGOXH7Y8#IB1pUS8i2b>K{IF91Z{g literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/commits/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/commits/0 new file mode 100644 index 000000000000..9c1e3021c3ea --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/commits/0 @@ -0,0 +1,2 @@ +v1 +{"nextBatchWatermarkMs":0} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/metadata b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/metadata new file mode 100644 index 000000000000..196f2e0e5c9f --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/metadata @@ -0,0 +1 @@ +{"id":"8fc17276-e48b-4e8a-a9c9-31f0045ed860"} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/offsets/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/offsets/0 new file mode 100644 index 000000000000..622db1a5f4f5 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/offsets/0 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1591253127196,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5"}} +0 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/state/0/0/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/state/0/0/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..31a67ed57d7d139cfc461989aa94d05176b49de4 GIT binary patch literal 320 zcmX|7F-yZx7`*rLQs1+UUr5qe1bIO$s2~bT*9378DN+y|1R*E|Q9*|e>EHYinn}s9Nc$Y^LTsb>iX=m9x`C;0pB0Jxf1{`7}izlgE-1w zqTve(e^tWbSiV1WVM)*mgs~@%-i%nNhieFpY-`?yOC!t|UZkxpmQ#!WXz|PL=!Fe5 zYa7B*rO;$>+)yHIs$q9&6p!HtDSSwXp`-|lc1>Xq_9-j%EXq{fU1&k3;2ch-rUkxA zU1oA-=~)cSZRp#5Gz@v5IuAA=-woq<(v?@l!^yxP-pRuf3}g#1Ffs@;b1*nCFmNWYG4L}nNC-LTFbIor zFbHdMFfcPQsB{DiFaTBXYcQ|?rT_EF8z?Y}$sf>S5Cd|HSeuwYqQde_YK;GdS(q63 zW0-P87&|~l8S(ouSOaMj`34UL6Ch&~e--N-AkE3f&jF;$`5ib|`6U=!ls?%1W)K!h gU=U{DU|>1G!LPs|%PPSc;P4*^e4uU@1v(cF0E|x|bpQYW literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/state/0/3/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/state/0/3/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..e0a21939d2645a3ffe69f5a08a6217b0f8a21a5a GIT binary patch literal 198 zcmeZ?GI7euPtI1I!@$5$!^yy)ang=C7|0f2U}O+x;$U!KVBkz(W8i0EkPvdvVGtJL zU=Y^iU|?oqQ0WL3U|{B8;MZVa0ZKoWXZgusD}Nx6!4}99msikc5C<~8FpDq)sZRdI z3}Nz&Y>b^i&i@Mb2P~}n*nrHN{1cds_+1%{fb1CNEeyO49Q+atE=oV_A25ip6f$vV aFz_od$g)UqCOokJ4+K6?2a5vT2nPV3ek2zF literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/state/0/4/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-statistical-functions/state/0/4/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..91e1e2dbf560be75c5c4b493560cf5e51049058c GIT binary patch literal 205 zcmeZ?GI7euPtI0d%)r1~qiufQP7BEgyP!2UlF_(0t)3Un$Q006^PC6oXF literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/commits/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/commits/0 new file mode 100644 index 000000000000..9c1e3021c3ea --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/commits/0 @@ -0,0 +1,2 @@ +v1 +{"nextBatchWatermarkMs":0} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/metadata b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/metadata new file mode 100644 index 000000000000..1f8077da27c2 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/metadata @@ -0,0 +1 @@ +{"id":"ac9f2680-3a39-4afd-824b-7beefdf7d7a7"} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/offsets/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/offsets/0 new file mode 100644 index 000000000000..b0b5ea1df545 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/offsets/0 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1591234028611,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5"}} +0 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/state/0/0/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/state/0/0/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..6352978051846970ca41a0ca97fd79952105726d GIT binary patch literal 46 icmeZ?GI7euPtF!)VPIeY;oA+q9RGp92POd&g989JFAHe^ literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/state/0/1/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/state/0/1/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..99110e438ff2f1042d6d3dd47d581943946f6c70 GIT binary patch literal 117 zcmeZ?GI7euPtI2LW?*2b0ph#6@AAe0X#oaC29X3|hA&KvK%N8;TL7^J5Q|7Ma5FJ5 o$S^Q?GDx!mWtbQ~Z~_GwSb&%fh&g~51pWho57c~7h!BJX08XS4RR910 literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/state/0/2/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/state/0/2/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..6352978051846970ca41a0ca97fd79952105726d GIT binary patch literal 46 icmeZ?GI7euPtF!)VPIeY;oA+q9RGp92POd&g989JFAHe^ literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/state/0/3/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-sum-decimal/state/0/3/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..cec2e6be7c9f750d302244f5613728329ca456c9 GIT binary patch literal 118 zcmeZ?GI7euPtI2LVPIgW0b*w*U*=#SEx^FYAk4_Y;0R Date: Tue, 16 Jun 2020 22:55:41 +0800 Subject: [PATCH 07/10] Add test for deduplicate --- .../commits/0 | 2 ++ .../metadata | 1 + .../offsets/0 | 3 ++ .../state/0/0/1.delta | Bin 0 -> 46 bytes .../state/0/1/1.delta | Bin 0 -> 127 bytes .../state/0/2/1.delta | Bin 0 -> 46 bytes .../state/0/3/1.delta | Bin 0 -> 126 bytes .../state/0/4/1.delta | Bin 0 -> 94 bytes ...reamingAggregationCompatibilitySuite.scala | 34 ++++++++++++++++++ 9 files changed, 40 insertions(+) create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/commits/0 create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/metadata create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/offsets/0 create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/state/0/0/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/state/0/1/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/state/0/2/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/state/0/3/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/state/0/4/1.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/commits/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/commits/0 new file mode 100644 index 000000000000..9c1e3021c3ea --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/commits/0 @@ -0,0 +1,2 @@ +v1 +{"nextBatchWatermarkMs":0} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/metadata b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/metadata new file mode 100644 index 000000000000..0b78699f07ba --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/metadata @@ -0,0 +1 @@ +{"id":"26fe8d3d-d101-44b0-b9c1-a2f9f09cea69"} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/offsets/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/offsets/0 new file mode 100644 index 000000000000..3c0c901ee0c3 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/offsets/0 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1592306585407,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5"}} +0 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/state/0/0/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/state/0/0/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..6352978051846970ca41a0ca97fd79952105726d GIT binary patch literal 46 icmeZ?GI7euPtF!)VPIeY;oA+q9RGp92POd&g989JFAHe^ literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/state/0/1/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/state/0/1/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..71ab60304105ab5efe66ce6125cca48d8247294d GIT binary patch literal 127 zcmeZ?GI7euPtH~iWnf@f0>t69@!Y{c+Jb?RK{%a*L6{eaIXD<5GXez-fS6f;LD-ms z!2!q=kYV6vVqgtm5YCWf5at6aoM8TRDS)v;_krgK!E5gD@8mvvM#5GXez-co-O%7}$9j z1O#LlxS1GO0~mx;B^iXdfoj+|7#K_#7+e?_fD9%E0Yg>>CME`+0EPgD|3KgawMZ0b H3>*LeWO)%| literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/state/0/4/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.5-for-compatibility-test-deduplicate/state/0/4/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..9a014b2029cecf66612c684e44140678ded59bae GIT binary patch literal 94 zcmeZ?GI7euPtH~~VPIeg0pgd6G7P~$+Jb?RK{${GXez-fS5^uLD-0c a!2!q=kYQkFVhCXP4+K6?gG7PiZ~y?m$_}jn literal 0 HcmV?d00001 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala index ed65c0d3ff77..301459c924ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala @@ -182,6 +182,40 @@ class StreamingAggregationCompatibilitySuite extends StreamTest { ) } + test("deduplicate with all columns") { + val inputData = MemoryStream[Long] + val result = inputData.toDF().toDF("value") + .selectExpr( + "value", + "value + 10 AS key", + "CAST(value AS STRING) as topic", + "value + 100 AS partition", + "value + 5 AS offset") + .dropDuplicates() + .select("key", "value", "topic", "partition", "offset") + + val checkpointDir = prepareCheckpointDir("deduplicate") + inputData.addData(0L, 1L, 2L, 3L, 4L) + + testStream(result)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + /* + Note: The checkpoint was generated using the following input in Spark version 2.4.5 + AddData(inputData, 0L, 1L, 2L, 3L, 4L), + CheckAnswer( + Row(10, 0, "0", 100, 5), + Row(11, 1, "1", 101, 6), + Row(12, 2, "2", 102, 7), + Row(13, 3, "3", 103, 8), + Row(14, 4, "4", 104, 9)) + */ + AddData(inputData, 3L, 4L, 5L, 6L), + CheckLastBatch( + Row(15, 5, "5", 105, 10), + Row(16, 6, "6", 106, 11)) + ) + } + test("SPARK-28067 changed the sum decimal unsafe row format") { val inputData = MemoryStream[Int] val aggregated = From b654ca9c485acb1570db19360bd29d535468b994 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 16 Jun 2020 23:01:06 +0800 Subject: [PATCH 08/10] rename --- ...treamingStateStoreFormatCompatibilitySuite.scala} | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/streaming/{StreamingAggregationCompatibilitySuite.scala => StreamingStateStoreFormatCompatibilitySuite.scala} (95%) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala similarity index 95% rename from sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala index 301459c924ea..4abae576b67a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala @@ -29,14 +29,14 @@ import org.apache.spark.sql.functions._ import org.apache.spark.util.Utils /** - * An integrated test for streaming aggregation compatibility. + * An integrated test for streaming state store format compatibility. * For each PR breaks this test, we need to pay attention to the underlying unsafe row format - * changing of aggregate functions. All the checkpoint dirs were generated based on Spark version - * 2.4.5. If we accept the changes, it means the checkpoint for Structured Streaming will become - * non-reusable. Please add a new test for the issue, just like the test suite "SPARK-28067 changed - * the sum decimal unsafe row format". + * changing. All the checkpoint dirs were generated based on Spark version 2.4.5. If we accept the + * changes, it means the checkpoint for Structured Streaming will become non-reusable. Please add + * a new test for the issue, just like the test suite "SPARK-28067 changed the sum decimal unsafe + * row format". */ -class StreamingAggregationCompatibilitySuite extends StreamTest { +class StreamingStateStoreFormatCompatibilitySuite extends StreamTest { import testImplicits._ private def prepareCheckpointDir(testName: String): File = { From 90d71d46326e290d221fe8f2c935add787b2289b Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 19 Jun 2020 16:01:50 +0800 Subject: [PATCH 09/10] Revert "initial commit" This reverts commit 64a6a98e8c61579828a61defec4f8446f3e3455d. --- .../apache/spark/sql/internal/SQLConf.scala | 9 ---- .../StreamingAggregationStateManager.scala | 46 +------------------ ...treamingAggregationStateManagerSuite.scala | 22 +-------- 3 files changed, 3 insertions(+), 74 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 08488da09748..6bbeb2de7538 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1568,15 +1568,6 @@ object SQLConf { .booleanConf .createWithDefault(true) - val STREAMING_STATE_FORMAT_CHECK_ENABLED = - buildConf("spark.sql.streaming.stateFormatCheck.enabled") - .doc("Whether to detect a streaming query may try to use an invalid UnsafeRow in the " + - "state store.") - .version("3.1.0") - .internal() - .booleanConf - .createWithDefault(true) - val PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION = buildConf("spark.sql.statistics.parallelFileListingInStatsComputation.enabled") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala index f963b5c95f5e..9bfb9561b42a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala @@ -17,11 +17,9 @@ package org.apache.spark.sql.execution.streaming.state -import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType /** @@ -61,9 +59,6 @@ sealed trait StreamingAggregationStateManager extends Serializable { /** Return an iterator containing all the values in target state store. */ def values(store: StateStore): Iterator[UnsafeRow] - - /** Check the UnsafeRow format with the expected schema */ - def unsafeRowFormatValidation(row: UnsafeRow, schema: StructType): Unit } object StreamingAggregationStateManager extends Logging { @@ -82,14 +77,6 @@ object StreamingAggregationStateManager extends Logging { } } -/** - * An exception thrown when an invalid UnsafeRow is detected. - */ -class InvalidUnsafeRowException - extends SparkException("The UnsafeRow format is invalid. This may happen when using the old " + - "version or broken checkpoint file. To resolve this problem, you can try to restart the " + - "application or use the legacy way to process streaming state.", null) - abstract class StreamingAggregationStateManagerBaseImpl( protected val keyExpressions: Seq[Attribute], protected val inputRowAttributes: Seq[Attribute]) extends StreamingAggregationStateManager { @@ -97,9 +84,6 @@ abstract class StreamingAggregationStateManagerBaseImpl( @transient protected lazy val keyProjector = GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes) - // Consider about the cost, only check the UnsafeRow format for the first row - private var checkFormat = true - override def getKey(row: UnsafeRow): UnsafeRow = keyProjector(row) override def commit(store: StateStore): Long = store.commit() @@ -110,28 +94,6 @@ abstract class StreamingAggregationStateManagerBaseImpl( // discard and don't convert values to avoid computation store.getRange(None, None).map(_.key) } - - override def unsafeRowFormatValidation(row: UnsafeRow, schema: StructType): Unit = { - if (checkFormat && SQLConf.get.getConf( - SQLConf.STREAMING_STATE_FORMAT_CHECK_ENABLED) && row != null) { - if (schema.fields.length != row.numFields) { - throw new InvalidUnsafeRowException - } - schema.fields.zipWithIndex - .filterNot(field => UnsafeRow.isFixedLength(field._1.dataType)).foreach { - case (_, index) => - val offsetAndSize = row.getLong(index) - val offset = (offsetAndSize >> 32).toInt - val size = offsetAndSize.toInt - if (size < 0 || - offset < UnsafeRow.calculateBitSetWidthInBytes(row.numFields) + 8 * row.numFields || - offset + size > row.getSizeInBytes) { - throw new InvalidUnsafeRowException - } - } - checkFormat = false - } - } } /** @@ -152,9 +114,7 @@ class StreamingAggregationStateManagerImplV1( override def getStateValueSchema: StructType = inputRowAttributes.toStructType override def get(store: StateStore, key: UnsafeRow): UnsafeRow = { - val res = store.get(key) - unsafeRowFormatValidation(res, inputRowAttributes.toStructType) - res + store.get(key) } override def put(store: StateStore, row: UnsafeRow): Unit = { @@ -213,9 +173,7 @@ class StreamingAggregationStateManagerImplV2( return savedState } - val res = restoreOriginalRow(key, savedState) - unsafeRowFormatValidation(res, inputRowAttributes.toStructType) - res + restoreOriginalRow(key, savedState) } override def put(store: StateStore, row: UnsafeRow): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManagerSuite.scala index 2881e2e6f6c1..daacdfd58c7b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManagerSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming.state import org.apache.spark.sql.catalyst.expressions.{Attribute, SpecificInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.streaming.StreamTest -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} class StreamingAggregationStateManagerSuite extends StreamTest { // ============================ fields and method for test data ============================ @@ -123,24 +123,4 @@ class StreamingAggregationStateManagerSuite extends StreamTest { // state manager should return row which is same as input row regardless of format version assert(inputRow === stateManager.get(memoryStateStore, keyRow)) } - - test("UnsafeRow format invalidation") { - // Pass the checking - val stateManager0 = StreamingAggregationStateManager.createStateManager(testKeyAttributes, - testOutputAttributes, 2) - stateManager0.unsafeRowFormatValidation(testRow, testOutputSchema) - // Fail for fields number not match - val stateManager1 = StreamingAggregationStateManager.createStateManager(testKeyAttributes, - testOutputAttributes, 2) - assertThrows[InvalidUnsafeRowException](stateManager1.unsafeRowFormatValidation( - testRow, StructType(testKeys.map(createIntegerField)))) - // Fail for invalid schema - val stateManager2 = StreamingAggregationStateManager.createStateManager(testKeyAttributes, - testOutputAttributes, 2) - val invalidSchema = StructType(testKeys.map(createIntegerField) ++ - Seq(StructField("struct", StructType(Seq(StructField("value1", StringType, true))), true), - StructField("value2", IntegerType, false))) - assertThrows[InvalidUnsafeRowException](stateManager2.unsafeRowFormatValidation( - testRow, invalidSchema)) - } } From 9ad2c068226ffc03b139b1c0a99226ac8c36b8b7 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 19 Jun 2020 16:31:12 +0800 Subject: [PATCH 10/10] rebase and UT fix --- .../StreamingStateStoreFormatCompatibilitySuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala index 4abae576b67a..33f6b02acb6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala @@ -241,7 +241,8 @@ class StreamingStateStoreFormatCompatibilitySuite extends StreamTest { AddData(inputData, 10 to 19: _*), ExpectFailure[SparkException](e => { // Check the exception message to make sure the state store format changing. - assert(e.getCause.getMessage.contains("The UnsafeRow format is invalid")) + assert(e.getCause.getCause.getMessage.contains( + "The streaming query failed by state format invalidation.")) }) ) }