-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source #45977
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@allisonwang-db @HyukjinKwon @HeartSaVioR PTAL, thanks! |
python/pyspark/sql/datasource.py
Outdated
| message_parameters={"feature": "read"}, | ||
| ) | ||
|
|
||
| def read2(self, start: dict, end: dict) -> Iterator[Tuple]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have one method? you can make end argument optional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are fundamentally different, the former read() is to read data and plan end offset, the latter is to read data between already planned start and end offset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have a different name in that case instead of read2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to readBetweenOffsets()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @HeartSaVioR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name itself might be OK. Maybe we have an option to make the both of method names be self-descriptive (not just read), but if we prefer shorter name, maybe OK to have either to be "read".
I see a bigger issue on implementation. Let's address that first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There can't be 2 methods named read() for the same class, python doesn't have method overloading IIRC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can't have overloaded ones but it can dispatch by embedding if-else, and leveraging optional argument. e.g.,
def read(self, start: dict, end: dict = None) -> Union[Tuple[Iterator[Tuple], dict], Iterator[Tuple]:
if end is None:
return # logic for read(start)
else:
return # logic for read(start, end)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we won't be able to enforce that read with end offset is implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that would have to be documented. BTW, in Python you can't enforce anything in any event.
Co-authored-by: Hyukjin Kwon <[email protected]>
sahnib
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making these changes. Still reviewing the testcases. Left some questions/comments.
...main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
Outdated
Show resolved
Hide resolved
| val vectors = root.getFieldVectors().asScala.map { vector => | ||
| new ArrowColumnVector(vector) | ||
| }.toArray[ColumnVector] | ||
| val rows = ArrayBuffer[InternalRow]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are going to buffer all the rows in memory here? Can we create this iterator lazily to avoid buffering all data from Python source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't do lazy initialization here because we need to send the data from python process to JVM, the communication is synchronous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I imagine there may be still a way to avoid materializing all rows at once (e.g. per arrow batch) but I don't concern too much about it as we know simple data source isn't intended to handle a huge amount of data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we call putIterator here we should be able to avoid materializing all rows at once in scala side, but it doesn't matter that much as we already materialize all rows in python side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, pending CI.
|
The documentation build failure will go away if you sync/rebase your branch to |
|
I'll take a look tomorrow. Sorry for the delay. |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still reviewing, but wanted to call out a possible major issue in prior in case I couldn't finish reviewing by today.
| message_parameters={"feature": "initialOffset"}, | ||
| ) | ||
|
|
||
| def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been missed so far - since we are close to completion, it'd be awesome if we can give a try to remove out points of confusion from the doc, e.g. inclusive vs exclusive of offset. No need to deal with the doc update in this PR, probably worth a JIRA ticket.
| self.iterator = iterator | ||
|
|
||
|
|
||
| class _SimpleStreamReaderWrapper(DataSourceStreamReader): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have to separate out private vs public class, what about above classes? Are they needed to be public classes? Here the "private" seems to be very unclear. I'm OK if this is some trick to address some gap on Python language on scoping. I just wanted to know whether this is a standard practice or not.
| return self.initial_offset | ||
|
|
||
| def latestOffset(self) -> dict: | ||
| # when query start for the first time, use initial offset as the start offset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this is the hard part of implementing prefetcher for SS data source. When the query restarts, we assume that prefetcher would be able to start from known committed offset. Unfortunately that is not true. You've mentioned that this relies on getBatch trick but that's only applicable with DSv1 and it's clearly a hack to address some specific data source. That is not a contract streaming engine guarantees.
We have an interface AcceptsLatestSeenOffset for this case (you need to adopt this on determining the start offset for prefetching), but this still does not give you the last committed offset but the latest seen offset, so Spark could still request the offset range before this offset. Though it would work if the simple data source reader can work with all planned-but-not-yet-committed offset range without relying on prefetcher. prefetcher can start prefetching with latest seen offset and previous offset range should be covered with planned batch(es).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, readBetweenOffsets() must be able to work without prefetcher - PREFETCHED_RECORDS_NOT_FOUND is not only happening with error case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't have a test case where you have planned batch in offset log and have to restart from there, you need to have one. Run several batches, stop the query, make the last batch be no-yet-to-be-committed, restart the query. prefetcher should not get a request to read from "initial offset", and also read request for planned batch should work without relying on prefetcher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, never mind. You are dealing with all the thing individually (not just leveraging DSv1 trick). Your comment seems a bit confusing - mentioning getBatch was the starting point I got confused.
Still better to have fault-tolerance test(s) if we don't have it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we have tests where query get restarted multiple times and verify that replay microbatch succeeds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized the trick only works for V1 source and added the individual handling, let me also update the comment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid randomness - you can manipulate both tests on restarting 1) restarting from the query which does not have leftover batch 2) restarting from the query which does have leftover batch (planned-but-yet-to-be-committed). We have several tests which adjusts offset log and commit log to test the behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, added a logic to delete last committed entry in the test.
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
Show resolved
Hide resolved
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yet looked into test suites but I assume people already took a look at tests in depth.
| def read( | ||
| self, input_partition: SimpleInputPartition # type: ignore[override] | ||
| ) -> Iterator[Tuple]: | ||
| return self.simple_reader.readBetweenOffsets(input_partition.start, input_partition.end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds like we also have the case where the read method of wrapper class has to be serialized and being executed in the task, say, simple reader also needs to be serialized and being executed in the task. Do I understand correctly?
If I understand correctly, I'd say you'd need to document this in the SimpleDataSourceStreamReader, as SimpleDataSourceStreamReader isn't driver-only, which means they still need to consider serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, if we make change to either getCache or caller of getCache to call readBetweenOffsets and execute the same path (send the data via arrowbatch), this method must not be called and we wouldn't need to serialize SimpleDataSourceStreamReader instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind I see this is still needed to handle the case of "block not found". That said, this still applies
I'd say you'd need to document this in the SimpleDataSourceStreamReader, as SimpleDataSourceStreamReader isn't driver-only, which means they still need to consider serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to think about the serialization requirement of these methods, it should be documented in the user guide.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
Outdated
Show resolved
Hide resolved
| if json.dumps(entry.end) == json.dumps(end): | ||
| end_idx = idx | ||
| break | ||
| if end_idx > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I'm missing something. According to the interface contract, the offset "end" won't be requested. Doesn't it mean this should be end_idx > -1 and self.cache = self.cache[end_idx+1:]? Any reason we keep the cached entry which matches with end in end offset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am trying to be conservative here when evicting cache by keeping one extra entry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, would be nice to add code comment (probably one-liner) to explicitly provide the intention.
| for idx, entry in enumerate(self.cache): | ||
| # There is no convenient way to compare 2 offsets. | ||
| # Serialize into json string before comparison. | ||
| if json.dumps(entry.start) == json.dumps(start): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean we have a case where the offset range spans to multiple cache entries? Or is it just a sort of defensive programming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is just being defensive, currently we always call plan input partitions after a prefetch in latestOffset()
...main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala
Outdated
Show resolved
Hide resolved
| private val outputIter = if (cachedBlock.isEmpty) { | ||
| // Evaluate the python read UDF if the partition is not cached as block. | ||
| val evaluatorFactory = source.createMapInBatchEvaluatorFactory( | ||
| pickledReadFunc, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it - we need to serialize SimpleDataSourceStreamReader to cover a bad case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a way to trigger this artificially? Never mind if it's not feasible - looks like non-trivial but would be awesome if we can test with this path as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be triggered if the during replay of the last batch when query restart, I added the test.
...apache/spark/sql/execution/datasources/v2/python/PythonStreamingPartitionReaderFactory.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
Show resolved
Hide resolved
| val vectors = root.getFieldVectors().asScala.map { vector => | ||
| new ArrowColumnVector(vector) | ||
| }.toArray[ColumnVector] | ||
| val rows = ArrayBuffer[InternalRow]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I imagine there may be still a way to avoid materializing all rows at once (e.g. per arrow batch) but I don't concern too much about it as we know simple data source isn't intended to handle a huge amount of data.
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
The GA only failed with docker integration test which isn't related. |
|
Thanks! Merging to master. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @chaoqin-li1123 and all.
The newly added test case seems to be flaky. Could you take a look please?
[info] - SimpleDataSourceStreamReader read exactly once *** FAILED *** (8 seconds, 88 milliseconds)
|
Yes, I notice that, will send out a fix PR today. |
|
Thank you so much, @chaoqin-li1123 . |
|
This is the fix #46481 @dongjoon-hyun |
What changes were proposed in this pull request?
SimpleDataSourceStreamReader is a simplified version of the DataSourceStreamReader interface.
There are 3 functions that needs to be defined
Read data and return the end offset.
def read(self, start: Offset) -> (Iterator[Tuple], Offset)
Read data between start and end offset, this is required for exactly once read.
def readBetweenOffset(self, start: Offset, end: Offset) -> Iterator[Tuple]
initial start offset of the streaming query.
def initialOffset() -> dict
The implementation wrap the SimpleDataSourceStreamReader instance in a DataSourceStreamReader that prefetch and cache data in latestOffset. The record prefetched in python process will be sent to JVM as arrow record batches in planInputPartitions() and cached by block manager and read by partition reader from executor later..
Why are the changes needed?
Compared to DataSourceStreamReader interface, the simplified interface has some advantages.
It doesn’t require developers to reason about data partitioning.
It doesn’t require getting the latest offset before reading data.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Add unit test and integration test.
Was this patch authored or co-authored using generative AI tooling?
No.