Skip to content

Commit c1a0c66

Browse files
lw-linrxin
authored andcommitted
[SPARK-18261][STRUCTURED STREAMING] Add statistics to MemorySink for joining
## What changes were proposed in this pull request? Right now, there is no way to join the output of a memory sink with any table: > UnsupportedOperationException: LeafNode MemoryPlan must implement statistics This patch adds statistics to MemorySink, making joining snapshots of memory streams with tables possible. ## How was this patch tested? Added a test case. Author: Liwei Lin <[email protected]> Closes #15786 from lw-lin/memory-sink-stat.
1 parent 9b0593d commit c1a0c66

File tree

2 files changed

+21
-1
lines changed

2 files changed

+21
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql._
2828
import org.apache.spark.sql.catalyst.encoders.encoderFor
2929
import org.apache.spark.sql.catalyst.expressions.Attribute
30-
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
30+
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
3131
import org.apache.spark.sql.streaming.OutputMode
3232
import org.apache.spark.sql.types.StructType
3333
import org.apache.spark.util.Utils
@@ -212,4 +212,8 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi
212212
*/
213213
case class MemoryPlan(sink: MemorySink, output: Seq[Attribute]) extends LeafNode {
214214
def this(sink: MemorySink) = this(sink, sink.schema.toAttributes)
215+
216+
private val sizePerRow = sink.schema.toAttributes.map(_.dataType.defaultSize).sum
217+
218+
override def statistics: Statistics = Statistics(sizePerRow * sink.allData.size)
215219
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,22 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
187187
query.stop()
188188
}
189189

190+
test("MemoryPlan statistics") {
191+
implicit val schema = new StructType().add(new StructField("value", IntegerType))
192+
val sink = new MemorySink(schema, InternalOutputModes.Append)
193+
val plan = new MemoryPlan(sink)
194+
195+
// Before adding data, check output
196+
checkAnswer(sink.allData, Seq.empty)
197+
assert(plan.statistics.sizeInBytes === 0)
198+
199+
sink.addBatch(0, 1 to 3)
200+
assert(plan.statistics.sizeInBytes === 12)
201+
202+
sink.addBatch(1, 4 to 6)
203+
assert(plan.statistics.sizeInBytes === 24)
204+
}
205+
190206
ignore("stress test") {
191207
// Ignore the stress test as it takes several minutes to run
192208
(0 until 1000).foreach { _ =>

0 commit comments

Comments
 (0)