Skip to content

Commit 042804a

Browse files
WweiLHyukjinKwon
authored andcommitted
[SPARK-48567][SS] StreamingQuery.lastProgress should return the actual StreamingQueryProgress
### What changes were proposed in this pull request? This PR is created after discussion in this closed one: #46886 I was trying to fix a bug (in connect, query.lastProgress doesn't have `numInputRows`, `inputRowsPerSecond`, and `processedRowsPerSecond`), and we reached the conclusion that what purposed in this PR should be the ultimate fix. In python, for both classic spark and spark connect, the return type of `lastProgress` is `Dict` (and `recentProgress` is `List[Dict]`), but in scala it's the actual `StreamingQueryProgress` object: https://github.com/apache/spark/blob/1a5d22aa2ffe769435be4aa6102ef961c55b9593/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala#L94-L101 This API discrepancy brings some confusion, like in Scala, users can do `query.lastProgress.batchId`, while in Python they have to do `query.lastProgress["batchId"]`. This PR makes `StreamingQuery.lastProgress` to return the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` to return `List[StreamingQueryProgress]`). To prevent breaking change, we extend `StreamingQueryProgress` to be a subclass of `dict`, so existing code accessing using dictionary method (e.g. `query.lastProgress["id"]`) is still functional. ### Why are the changes needed? API parity ### Does this PR introduce _any_ user-facing change? Yes, now `StreamingQuery.lastProgress` returns the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` returns `List[StreamingQueryProgress]`). ### How was this patch tested? Added unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #46921 from WweiL/SPARK-48567-lastProgress. Authored-by: Wei Liu <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent d8a24b7 commit 042804a

File tree

5 files changed

+227
-99
lines changed

5 files changed

+227
-99
lines changed

python/pyspark/sql/connect/streaming/query.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
QueryProgressEvent,
3434
QueryIdleEvent,
3535
QueryTerminatedEvent,
36+
StreamingQueryProgress,
3637
)
3738
from pyspark.sql.streaming.query import (
3839
StreamingQuery as PySparkStreamingQuery,
@@ -110,21 +111,21 @@ def status(self) -> Dict[str, Any]:
110111
status.__doc__ = PySparkStreamingQuery.status.__doc__
111112

112113
@property
113-
def recentProgress(self) -> List[Dict[str, Any]]:
114+
def recentProgress(self) -> List[StreamingQueryProgress]:
114115
cmd = pb2.StreamingQueryCommand()
115116
cmd.recent_progress = True
116117
progress = self._execute_streaming_query_cmd(cmd).recent_progress.recent_progress_json
117-
return [json.loads(p) for p in progress]
118+
return [StreamingQueryProgress.fromJson(json.loads(p)) for p in progress]
118119

119120
recentProgress.__doc__ = PySparkStreamingQuery.recentProgress.__doc__
120121

121122
@property
122-
def lastProgress(self) -> Optional[Dict[str, Any]]:
123+
def lastProgress(self) -> Optional[StreamingQueryProgress]:
123124
cmd = pb2.StreamingQueryCommand()
124125
cmd.last_progress = True
125126
progress = self._execute_streaming_query_cmd(cmd).recent_progress.recent_progress_json
126127
if len(progress) > 0:
127-
return json.loads(progress[-1])
128+
return StreamingQueryProgress.fromJson(json.loads(progress[-1]))
128129
else:
129130
return None
130131

0 commit comments

Comments
 (0)