Skip to content

Commit fa757ee

Browse files
tdaszsxwing
authored andcommitted
[SPARK-20883][SPARK-20376][SS] Refactored StateStore APIs and added conf to choose implementation
## What changes were proposed in this pull request? A bunch of changes to the StateStore APIs and implementation. Current state store API has a bunch of problems that causes too many transient objects causing memory pressure. - `StateStore.get(): Option` forces creation of Some/None objects for every get. Changed this to return the row or null. - `StateStore.iterator(): (UnsafeRow, UnsafeRow)` forces creation of new tuple for each record returned. Changed this to return a UnsafeRowTuple which can be reused across records. - `StateStore.updates()` requires the implementation to keep track of updates, while this is used minimally (only by Append mode in streaming aggregations). Removed updates() and updated StateStoreSaveExec accordingly. - `StateStore.filter(condition)` and `StateStore.remove(condition)` has been merge into a single API `getRange(start, end)` which allows a state store to do optimized range queries (i.e. avoid full scans). Stateful operators have been updated accordingly. - Removed a lot of unnecessary row copies Each operator copied rows before calling StateStore.put() even if the implementation does not require it to be copied. It is left up to the implementation on whether to copy the row or not. Additionally, - Added a name to the StateStoreId so that each operator+partition can use multiple state stores (different names) - Added a configuration that allows the user to specify which implementation to use. - Added new metrics to understand the time taken to update keys, remove keys and commit all changes to the state store. These metrics will be visible on the plan diagram in the SQL tab of the UI. - Refactored unit tests such that they can be reused to test any implementation of StateStore. ## How was this patch tested? Old and new unit tests Author: Tathagata Das <[email protected]> Closes #18107 from tdas/SPARK-20376.
1 parent 4bb6a53 commit fa757ee

File tree

12 files changed

+695
-588
lines changed

12 files changed

+695
-588
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,15 @@ object SQLConf {
552552
.booleanConf
553553
.createWithDefault(true)
554554

555+
val STATE_STORE_PROVIDER_CLASS =
556+
buildConf("spark.sql.streaming.stateStore.providerClass")
557+
.internal()
558+
.doc(
559+
"The class used to manage state data in stateful streaming queries. This class must " +
560+
"be a subclass of StateStoreProvider, and must have a zero-arg constructor.")
561+
.stringConf
562+
.createOptional
563+
555564
val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT =
556565
buildConf("spark.sql.streaming.stateStore.minDeltasForSnapshot")
557566
.internal()
@@ -828,6 +837,8 @@ class SQLConf extends Serializable with Logging {
828837

829838
def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)
830839

840+
def stateStoreProviderClass: Option[String] = getConf(STATE_STORE_PROVIDER_CLASS)
841+
831842
def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
832843

833844
def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,11 @@ case class FlatMapGroupsWithStateExec(
109109
child.execute().mapPartitionsWithStateStore[InternalRow](
110110
getStateId.checkpointLocation,
111111
getStateId.operatorId,
112+
storeName = "default",
112113
getStateId.batchId,
113114
groupingAttributes.toStructType,
114115
stateAttributes.toStructType,
116+
indexOrdinal = None,
115117
sqlContext.sessionState,
116118
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
117119
val updater = new StateStoreUpdater(store)
@@ -191,12 +193,12 @@ case class FlatMapGroupsWithStateExec(
191193
throw new IllegalStateException(
192194
s"Cannot filter timed out keys for $timeoutConf")
193195
}
194-
val timingOutKeys = store.filter { case (_, stateRow) =>
195-
val timeoutTimestamp = getTimeoutTimestamp(stateRow)
196+
val timingOutKeys = store.getRange(None, None).filter { rowPair =>
197+
val timeoutTimestamp = getTimeoutTimestamp(rowPair.value)
196198
timeoutTimestamp != NO_TIMESTAMP && timeoutTimestamp < timeoutThreshold
197199
}
198-
timingOutKeys.flatMap { case (keyRow, stateRow) =>
199-
callFunctionAndUpdateState(keyRow, Iterator.empty, Some(stateRow), hasTimedOut = true)
200+
timingOutKeys.flatMap { rowPair =>
201+
callFunctionAndUpdateState(rowPair.key, Iterator.empty, rowPair.value, hasTimedOut = true)
200202
}
201203
} else Iterator.empty
202204
}
@@ -205,18 +207,23 @@ case class FlatMapGroupsWithStateExec(
205207
* Call the user function on a key's data, update the state store, and return the return data
206208
* iterator. Note that the store updating is lazy, that is, the store will be updated only
207209
* after the returned iterator is fully consumed.
210+
*
211+
* @param keyRow Row representing the key, cannot be null
212+
* @param valueRowIter Iterator of values as rows, cannot be null, but can be empty
213+
* @param prevStateRow Row representing the previous state, can be null
214+
* @param hasTimedOut Whether this function is being called for a key timeout
208215
*/
209216
private def callFunctionAndUpdateState(
210217
keyRow: UnsafeRow,
211218
valueRowIter: Iterator[InternalRow],
212-
prevStateRowOption: Option[UnsafeRow],
219+
prevStateRow: UnsafeRow,
213220
hasTimedOut: Boolean): Iterator[InternalRow] = {
214221

215222
val keyObj = getKeyObj(keyRow) // convert key to objects
216223
val valueObjIter = valueRowIter.map(getValueObj.apply) // convert value rows to objects
217-
val stateObjOption = getStateObj(prevStateRowOption)
224+
val stateObj = getStateObj(prevStateRow)
218225
val keyedState = GroupStateImpl.createForStreaming(
219-
stateObjOption,
226+
Option(stateObj),
220227
batchTimestampMs.getOrElse(NO_TIMESTAMP),
221228
eventTimeWatermark.getOrElse(NO_TIMESTAMP),
222229
timeoutConf,
@@ -249,14 +256,11 @@ case class FlatMapGroupsWithStateExec(
249256
numUpdatedStateRows += 1
250257

251258
} else {
252-
val previousTimeoutTimestamp = prevStateRowOption match {
253-
case Some(row) => getTimeoutTimestamp(row)
254-
case None => NO_TIMESTAMP
255-
}
259+
val previousTimeoutTimestamp = getTimeoutTimestamp(prevStateRow)
256260
val stateRowToWrite = if (keyedState.hasUpdated) {
257261
getStateRow(keyedState.get)
258262
} else {
259-
prevStateRowOption.orNull
263+
prevStateRow
260264
}
261265

262266
val hasTimeoutChanged = currentTimeoutTimestamp != previousTimeoutTimestamp
@@ -269,7 +273,7 @@ case class FlatMapGroupsWithStateExec(
269273
throw new IllegalStateException("Attempting to write empty state")
270274
}
271275
setTimeoutTimestamp(stateRowToWrite, currentTimeoutTimestamp)
272-
store.put(keyRow.copy(), stateRowToWrite.copy())
276+
store.put(keyRow, stateRowToWrite)
273277
numUpdatedStateRows += 1
274278
}
275279
}
@@ -280,18 +284,21 @@ case class FlatMapGroupsWithStateExec(
280284
}
281285

282286
/** Returns the state as Java object if defined */
283-
def getStateObj(stateRowOption: Option[UnsafeRow]): Option[Any] = {
284-
stateRowOption.map(getStateObjFromRow)
287+
def getStateObj(stateRow: UnsafeRow): Any = {
288+
if (stateRow != null) getStateObjFromRow(stateRow) else null
285289
}
286290

287291
/** Returns the row for an updated state */
288292
def getStateRow(obj: Any): UnsafeRow = {
293+
assert(obj != null)
289294
getStateRowFromObj(obj)
290295
}
291296

292297
/** Returns the timeout timestamp of a state row is set */
293298
def getTimeoutTimestamp(stateRow: UnsafeRow): Long = {
294-
if (isTimeoutEnabled) stateRow.getLong(timeoutTimestampIndex) else NO_TIMESTAMP
299+
if (isTimeoutEnabled && stateRow != null) {
300+
stateRow.getLong(timeoutTimestampIndex)
301+
} else NO_TIMESTAMP
295302
}
296303

297304
/** Set the timestamp in a state row */

0 commit comments

Comments
 (0)