Skip to content

Commit ec2a2b8

Browse files
committed
address comments
1 parent 009d760 commit ec2a2b8

File tree

5 files changed

+14
-11
lines changed

5 files changed

+14
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,8 @@ case class ColumnarToRowExec(child: SparkPlan) extends UnaryExecNode with Codege
6666

6767
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
6868

69-
// `ColumnarToRowExec` is the beginning of a codegen stage, so it doesn't need to copy result and
70-
// it can add limit condition check.
71-
override def needCopyResult: Boolean = false
69+
// `ColumnarToRowExec` processes the input RDD directly, which is kind of a leaf node in the
70+
// codegen stage and needs to do the limit check.
7271
protected override def canCheckLimitNotReached: Boolean = true
7372

7473
override lazy val metrics: Map[String, SQLMetric] = Map(
@@ -431,7 +430,7 @@ case class RowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
431430
// Instead of creating a new config we are reusing columnBatchSize. In the future if we do
432431
// combine with some of the Arrow conversion tools we will need to unify some of the configs.
433432
val numRows = conf.columnBatchSize
434-
// This avoids calling `output` in the RDD closure, so that we don't need to include the entire
433+
// This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
435434
// plan (this) in the closure.
436435
val localSchema = this.schema
437436
child.execute().mapPartitionsInternal { rowIterator =>

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,9 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod
517517
child.executeColumnar()
518518
}
519519

520+
// `InputAdapter` can only generate code to process the rows from its child. If the child produces
521+
// columnar batches, there must be a `ColumnarToRowExec` above `InputAdapter` to handle it by
522+
// overriding `inputRDD`.
520523
override def inputRDD: RDD[InternalRow] = child.execute()
521524

522525
// This is a leaf node so the node can produce limit not reached checks.
@@ -868,9 +871,6 @@ case class CollapseCodegenStages(
868871
// The children of SortMergeJoin should do codegen separately.
869872
j.withNewChildren(j.children.map(
870873
child => InputAdapter(insertWholeStageCodegen(child))))
871-
// `ColumnarToRowExec` is kind of a leaf node to whole-stage-codegen. Its generated code can
872-
// process data from the input RDD directly.
873-
case c: ColumnarToRowExec => c
874874
case p => p.withNewChildren(p.children.map(insertInputAdapter))
875875
}
876876
}
@@ -889,6 +889,9 @@ case class CollapseCodegenStages(
889889
// to support the fast driver-local collect/take paths.
890890
plan
891891
case plan: CodegenSupport if supportCodegen(plan) =>
892+
// The whole-stage-codegen framework is row-based. If a plan supports columnar execution,
893+
// it can't support whole-stage-codegen at the same time.
894+
assert(!plan.supportsColumnar)
892895
WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet())
893896
case other =>
894897
other.withNewChildren(other.children.map(insertWholeStageCodegen))

sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1293,8 +1293,8 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
12931293
checkAnswer(df, Seq(Row(0, 0), Row(2, 0)))
12941294
// need to execute the query before we can examine fs.inputRDDs()
12951295
assert(df.queryExecution.executedPlan match {
1296-
case WholeStageCodegenExec(ColumnarToRowExec(
1297-
fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _))) =>
1296+
case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
1297+
fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _)))) =>
12981298
partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
12991299
fs.inputRDDs().forall(
13001300
_.asInstanceOf[FileScanRDD].filePartitions.forall(

sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
128128
val dsIntFilter = dsInt.filter(_ > 0)
129129
val planInt = dsIntFilter.queryExecution.executedPlan
130130
assert(planInt.collect {
131-
case WholeStageCodegenExec(FilterExec(_, ColumnarToRowExec(_: InMemoryTableScanExec))) => ()
131+
case WholeStageCodegenExec(FilterExec(_,
132+
ColumnarToRowExec(InputAdapter(_: InMemoryTableScanExec)))) => ()
132133
}.length == 1)
133134
assert(dsIntFilter.collect() === Array(1, 2))
134135

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
487487

488488
val planBeforeFilter = df2.queryExecution.executedPlan.collect {
489489
case FilterExec(_, c: ColumnarToRowExec) => c.child
490-
case WholeStageCodegenExec(FilterExec(_, c: ColumnarToRowExec)) => c.child
490+
case WholeStageCodegenExec(FilterExec(_, ColumnarToRowExec(i: InputAdapter))) => i.child
491491
}
492492
assert(planBeforeFilter.head.isInstanceOf[InMemoryTableScanExec])
493493

0 commit comments

Comments
 (0)