Skip to content

Commit e12ec46

Browse files
tdaszsxwing
authored andcommitted
[SPARK-15131][SQL] Shutdown StateStore management thread when SparkContext has been shutdown
## What changes were proposed in this pull request? Make sure that whenever the StateStoreCoordinator cannot be contacted, assume that the SparkContext and RpcEnv on the driver has been shutdown, and therefore stop the StateStore management thread, and unload all loaded stores. ## How was this patch tested? Updated unit tests. Author: Tathagata Das <[email protected]> Closes #12905 from tdas/SPARK-15131. (cherry picked from commit bde27b8) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 689b0fc commit e12ec46

File tree

3 files changed

+48
-23
lines changed

3 files changed

+48
-23
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ case class KeyRemoved(key: UnsafeRow) extends StoreUpdate
113113
* the store is the active instance. Accordingly, it either keeps it loaded and performs
114114
* maintenance, or unloads the store.
115115
*/
116-
private[state] object StateStore extends Logging {
116+
private[sql] object StateStore extends Logging {
117117

118118
val MAINTENANCE_INTERVAL_CONFIG = "spark.streaming.stateStore.maintenanceInterval"
119119
val MAINTENANCE_INTERVAL_DEFAULT_SECS = 60
@@ -155,6 +155,10 @@ private[state] object StateStore extends Logging {
155155
loadedProviders.contains(storeId)
156156
}
157157

158+
def isMaintenanceRunning: Boolean = loadedProviders.synchronized {
159+
maintenanceTask != null
160+
}
161+
158162
/** Unload and stop all state store providers */
159163
def stop(): Unit = loadedProviders.synchronized {
160164
loadedProviders.clear()
@@ -187,44 +191,44 @@ private[state] object StateStore extends Logging {
187191
*/
188192
private def doMaintenance(): Unit = {
189193
logDebug("Doing maintenance")
190-
loadedProviders.synchronized { loadedProviders.toSeq }.foreach { case (id, provider) =>
191-
try {
192-
if (verifyIfStoreInstanceActive(id)) {
193-
provider.doMaintenance()
194-
} else {
195-
unload(id)
196-
logInfo(s"Unloaded $provider")
194+
if (SparkEnv.get == null) {
195+
stop()
196+
} else {
197+
loadedProviders.synchronized { loadedProviders.toSeq }.foreach { case (id, provider) =>
198+
try {
199+
if (verifyIfStoreInstanceActive(id)) {
200+
provider.doMaintenance()
201+
} else {
202+
unload(id)
203+
logInfo(s"Unloaded $provider")
204+
}
205+
} catch {
206+
case NonFatal(e) =>
207+
logWarning(s"Error managing $provider, stopping management thread")
208+
stop()
197209
}
198-
} catch {
199-
case NonFatal(e) =>
200-
logWarning(s"Error managing $provider")
201210
}
202211
}
203212
}
204213

205214
private def reportActiveStoreInstance(storeId: StateStoreId): Unit = {
206-
try {
215+
if (SparkEnv.get != null) {
207216
val host = SparkEnv.get.blockManager.blockManagerId.host
208217
val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
209218
coordinatorRef.foreach(_.reportActiveInstance(storeId, host, executorId))
210219
logDebug(s"Reported that the loaded instance $storeId is active")
211-
} catch {
212-
case NonFatal(e) =>
213-
logWarning(s"Error reporting active instance of $storeId")
214220
}
215221
}
216222

217223
private def verifyIfStoreInstanceActive(storeId: StateStoreId): Boolean = {
218-
try {
224+
if (SparkEnv.get != null) {
219225
val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
220226
val verified =
221227
coordinatorRef.map(_.verifyIfInstanceActive(storeId, executorId)).getOrElse(false)
222-
logDebug(s"Verified whether the loaded instance $storeId is active: $verified" )
228+
logDebug(s"Verified whether the loaded instance $storeId is active: $verified")
223229
verified
224-
} catch {
225-
case NonFatal(e) =>
226-
logWarning(s"Error verifying active instance of $storeId")
227-
false
230+
} else {
231+
false
228232
}
229233
}
230234

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,14 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
4747
private val keySchema = StructType(Seq(StructField("key", StringType, true)))
4848
private val valueSchema = StructType(Seq(StructField("value", IntegerType, true)))
4949

50+
before {
51+
StateStore.stop()
52+
require(!StateStore.isMaintenanceRunning)
53+
}
54+
5055
after {
5156
StateStore.stop()
57+
require(!StateStore.isMaintenanceRunning)
5258
}
5359

5460
test("get, put, remove, commit, and all data iterator") {
@@ -352,7 +358,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
352358
}
353359
}
354360

355-
ignore("maintenance") {
361+
test("maintenance") {
356362
val conf = new SparkConf()
357363
.setMaster("local")
358364
.setAppName("test")
@@ -366,20 +372,26 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
366372
val provider = new HDFSBackedStateStoreProvider(
367373
storeId, keySchema, valueSchema, storeConf, hadoopConf)
368374

375+
369376
quietly {
370377
withSpark(new SparkContext(conf)) { sc =>
371378
withCoordinatorRef(sc) { coordinatorRef =>
379+
require(!StateStore.isMaintenanceRunning, "StateStore is unexpectedly running")
380+
372381
for (i <- 1 to 20) {
373382
val store = StateStore.get(
374383
storeId, keySchema, valueSchema, i - 1, storeConf, hadoopConf)
375384
put(store, "a", i)
376385
store.commit()
377386
}
387+
378388
eventually(timeout(10 seconds)) {
379389
assert(coordinatorRef.getLocation(storeId).nonEmpty, "active instance was not reported")
380390
}
381391

382392
// Background maintenance should clean up and generate snapshots
393+
assert(StateStore.isMaintenanceRunning, "Maintenance task is not running")
394+
383395
eventually(timeout(10 seconds)) {
384396
// Earliest delta file should get cleaned up
385397
assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted")
@@ -418,6 +430,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
418430
require(SparkEnv.get === null)
419431
eventually(timeout(10 seconds)) {
420432
assert(!StateStore.isLoaded(storeId))
433+
assert(!StateStore.isMaintenanceRunning)
421434
}
422435
}
423436
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.spark.sql.streaming
1919

20+
import org.scalatest.BeforeAndAfterAll
21+
2022
import org.apache.spark.SparkException
2123
import org.apache.spark.sql.StreamTest
2224
import org.apache.spark.sql.catalyst.analysis.Update
2325
import org.apache.spark.sql.execution.streaming._
26+
import org.apache.spark.sql.execution.streaming.state.StateStore
2427
import org.apache.spark.sql.expressions.scala.typed
2528
import org.apache.spark.sql.functions._
2629
import org.apache.spark.sql.test.SharedSQLContext
@@ -29,7 +32,12 @@ object FailureSinglton {
2932
var firstTime = true
3033
}
3134

32-
class StreamingAggregationSuite extends StreamTest with SharedSQLContext {
35+
class StreamingAggregationSuite extends StreamTest with SharedSQLContext with BeforeAndAfterAll {
36+
37+
override def afterAll(): Unit = {
38+
super.afterAll()
39+
StateStore.stop()
40+
}
3341

3442
import testImplicits._
3543

0 commit comments

Comments
 (0)