Skip to content

Commit 6b34745

Browse files
aokolnychyidongjoon-hyun
authored andcommitted
[SPARK-34049][SS] DataSource V2: Use Write abstraction in StreamExecution
### What changes were proposed in this pull request? This PR makes `StreamExecution` use the `Write` abstraction introduced in SPARK-33779. Note: we will need separate plans for streaming writes in order to support the required distribution and ordering in SS. This change only migrates to the `Write` abstraction. ### Why are the changes needed? These changes prevent exceptions from data sources that implement only the `build` method in `WriteBuilder`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #31093 from aokolnychyi/spark-34049. Authored-by: Anton Okolnychyi <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent d00f069 commit 6b34745

File tree

2 files changed

+11
-8
lines changed

2 files changed

+11
-8
lines changed

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,11 +274,13 @@ class InMemoryTable(
274274
this
275275
}
276276

277-
override def buildForBatch(): BatchWrite = writer
277+
override def build(): Write = new Write {
278+
override def toBatch: BatchWrite = writer
278279

279-
override def buildForStreaming(): StreamingWrite = streamingWriter match {
280-
case exc: StreamingNotSupportedOperation => exc.throwsException()
281-
case s => s
280+
override def toStreaming: StreamingWrite = streamingWriter match {
281+
case exc: StreamingNotSupportedOperation => exc.throwsException()
282+
case s => s
283+
}
282284
}
283285
}
284286
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -627,21 +627,22 @@ abstract class StreamExecution(
627627
inputPlan.schema,
628628
new CaseInsensitiveStringMap(options.asJava))
629629
val writeBuilder = table.newWriteBuilder(info)
630-
outputMode match {
630+
val write = outputMode match {
631631
case Append =>
632-
writeBuilder.buildForStreaming()
632+
writeBuilder.build()
633633

634634
case Complete =>
635635
// TODO: we should do this check earlier when we have capability API.
636636
require(writeBuilder.isInstanceOf[SupportsTruncate],
637637
table.name + " does not support Complete mode.")
638-
writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming()
638+
writeBuilder.asInstanceOf[SupportsTruncate].truncate().build()
639639

640640
case Update =>
641641
require(writeBuilder.isInstanceOf[SupportsStreamingUpdateAsAppend],
642642
table.name + " does not support Update mode.")
643-
writeBuilder.asInstanceOf[SupportsStreamingUpdateAsAppend].buildForStreaming()
643+
writeBuilder.asInstanceOf[SupportsStreamingUpdateAsAppend].build()
644644
}
645+
write.toStreaming
645646
}
646647

647648
protected def purge(threshold: Long): Unit = {

0 commit comments

Comments
 (0)