Skip to content

Commit d6200d1

Browse files
committed
Added progress to termination event
1 parent 32ff04e commit d6200d1

File tree

3 files changed

+30
-18
lines changed

3 files changed

+30
-18
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ class StreamExecution(
186186
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
187187
}
188188

189-
postEvent(new QueryStartedEvent(id)) // Assumption: Does not throw exception.
189+
postEvent(new QueryStartedEvent(id, name)) // Assumption: Does not throw exception.
190190

191191
// Unblock starting thread
192192
startLatch.countDown()
@@ -259,7 +259,7 @@ class StreamExecution(
259259
// Notify others
260260
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
261261
postEvent(
262-
new QueryTerminatedEvent(id, exception.map(_.cause).map(Utils.exceptionString)))
262+
new QueryTerminatedEvent(lastProgress, exception.map(_.cause).map(Utils.exceptionString)))
263263
terminationLatch.countDown()
264264
}
265265
}

sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ object StreamingQueryListener {
8686
* @since 2.1.0
8787
*/
8888
@Experimental
89-
class QueryStartedEvent private[sql](val id: UUID) extends Event
89+
class QueryStartedEvent private[sql](val id: UUID, val name: String) extends Event
9090

9191
/**
9292
* :: Experimental ::
@@ -100,11 +100,13 @@ object StreamingQueryListener {
100100
* :: Experimental ::
101101
* Event representing that termination of a query.
102102
*
103-
* @param id The unique id of the query that terminated.
104-
* @param exception The exception message of the [[StreamingQuery]] if the query was terminated
103+
* @param lastProgress The last progress the query made before it was terminated.
104+
* @param exception The exception message of the query if the query was terminated
105105
* with an exception. Otherwise, it will be `None`.
106106
* @since 2.1.0
107107
*/
108108
@Experimental
109-
class QueryTerminatedEvent private[sql](val id: UUID, val exception: Option[String]) extends Event
109+
class QueryTerminatedEvent private[sql](
110+
val lastProgress: StreamingQueryProgress,
111+
val exception: Option[String]) extends Event
110112
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ import java.util.UUID
2222
import scala.collection.mutable
2323

2424
import org.scalactic.TolerantNumerics
25+
import org.scalatest.concurrent.Eventually._
26+
import org.scalatest.concurrent.PatienceConfiguration.Timeout
2527
import org.scalatest.BeforeAndAfter
2628
import org.scalatest.PrivateMethodTester._
2729

2830
import org.apache.spark.SparkException
2931
import org.apache.spark.scheduler._
3032
import org.apache.spark.sql.execution.streaming._
31-
import org.apache.spark.util.{JsonProtocol, ManualClock}
33+
import org.apache.spark.util.JsonProtocol
3234

3335
class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
3436

@@ -50,21 +52,26 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
5052
val df = inputData.toDS().as[Long].map { 10 / _ }
5153
val listener = new EventCollector
5254
try {
55+
// No events until started
5356
spark.streams.addListener(listener)
5457
assert(listener.startEvent === null)
5558
assert(listener.progressEvents.isEmpty)
5659
assert(listener.terminationEvent === null)
5760

5861
testStream(df, OutputMode.Append)(
62+
63+
// Start event generated when query started
5964
StartStream(ProcessingTime(100), triggerClock = clock),
60-
AssertOnQuery(query => {
65+
AssertOnQuery { query =>
6166
assert(listener.startEvent !== null)
6267
assert(listener.startEvent.id === query.id)
68+
assert(listener.startEvent.name === query.name)
6369
assert(listener.progressEvents.isEmpty)
6470
assert(listener.terminationEvent === null)
6571
true
66-
}),
72+
},
6773

74+
// Progress event generated when data processed
6875
AddData(inputData, 1, 2),
6976
AdvanceManualClock(100),
7077
CheckAnswer(10, 5),
@@ -75,23 +82,27 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
7582
true
7683
},
7784

85+
// Termination event generated when stopped cleanly
7886
StopStream,
7987
AssertOnQuery { query =>
80-
assert(listener.terminationEvent !== null)
81-
assert(listener.terminationEvent.id === query.id)
82-
assert(listener.terminationEvent.exception === None)
88+
eventually(Timeout(streamingTimeout)) {
89+
assert(listener.terminationEvent !== null)
90+
assert(listener.terminationEvent.lastProgress === query.lastProgress)
91+
assert(listener.terminationEvent.exception === None)
92+
}
8393
listener.checkAsyncErrors()
8494
listener.reset()
8595
true
8696
},
8797

98+
// Termination event generated with exception message when stopped with error
8899
StartStream(ProcessingTime(100), triggerClock = clock),
89100
AddData(inputData, 0),
90101
AdvanceManualClock(100),
91102
ExpectFailure[SparkException],
92103
AssertOnQuery { query =>
93104
assert(listener.terminationEvent !== null)
94-
assert(listener.terminationEvent.id === query.id)
105+
assert(listener.terminationEvent.lastProgress === query.lastProgress)
95106
assert(listener.terminationEvent.exception.nonEmpty)
96107
listener.checkAsyncErrors()
97108
true
@@ -100,7 +111,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
100111
} finally {
101112
spark.streams.removeListener(listener)
102113
}
103-
104114
}
105115

106116
test("adding and removing listener") {
@@ -170,7 +180,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
170180
}
171181

172182
test("QueryStartedEvent serialization") {
173-
val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID())
183+
val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID(), "name")
174184
val json = JsonProtocol.sparkEventToJson(queryStarted)
175185
val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
176186
.asInstanceOf[StreamingQueryListener.QueryStartedEvent]
@@ -182,17 +192,17 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
182192
val json = JsonProtocol.sparkEventToJson(event)
183193
val newEvent = JsonProtocol.sparkEventFromJson(json)
184194
.asInstanceOf[StreamingQueryListener.QueryProgressEvent]
185-
assert(event.progress.jsonValue === newEvent.progress.jsonValue)
195+
assert(event.progress.json === newEvent.progress.json)
186196
}
187197

188198
test("QueryTerminatedEvent serialization") {
189199
val exception = new RuntimeException("exception")
190200
val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent(
191-
UUID.randomUUID(), Some(exception.getMessage))
201+
StreamingQueryProgressSuite.testProgress, Some(exception.getMessage))
192202
val json = JsonProtocol.sparkEventToJson(queryQueryTerminated)
193203
val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
194204
.asInstanceOf[StreamingQueryListener.QueryTerminatedEvent]
195-
assert(queryQueryTerminated.id === newQueryTerminated.id)
205+
assert(queryQueryTerminated.lastProgress.json === newQueryTerminated.lastProgress.json)
196206
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
197207
}
198208

0 commit comments

Comments
 (0)