-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28213][SQL][followup] code cleanup and bug fix for columnar execution framework #25264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
revans2
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for cleaning up after my patch. I really appreciate it.
I just had one comment about a metric you were removing that I think is still useful.
| protected override def canCheckLimitNotReached: Boolean = true | ||
|
|
||
| override lazy val metrics: Map[String, SQLMetric] = Map( | ||
| "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find num output rows to be useful because ColumnarToRowExec can happen at other times too, I am working on getting it to happen after pandas UDF operations. Plus the performance impact is only on the order of the number of batches. Not on the order of the number of rows, so it should have minimal impact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I was too conservative. I'll add it back, and revisit this when I benchmark Spark 3.0.
|
Test build #108228 has finished for PR 25264 at commit
|
|
cc @rednaxelafx Please post your comment about the changes. Thanks! |
|
Test build #108242 has finished for PR 25264 at commit
|
rednaxelafx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question on making ColumarToRowExec effectively a leaf in a codegen stage:
| child => InputAdapter(insertWholeStageCodegen(child)))) | ||
| // `ColumnarToRowExec` is kind of a leaf node to whole-stage-codegen. Its generated code can | ||
| // process data from the input RDD directly. | ||
| case c: ColumnarToRowExec => c |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this work well with the WholeStageCodegenExec.treeString? i.e. does this work well with printing the * (id) prefix?
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change isn't small, and maybe we need a new JIRA?
| case p => | ||
| p.withNewChildren(p.children.map(insertInputAdapter(_, isColumnar))) | ||
| child => InputAdapter(insertWholeStageCodegen(child)))) | ||
| // `ColumnarToRowExec` is kind of a leaf node to whole-stage-codegen. Its generated code can |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want ColumnarToRowExec to be leaf node without InputAdapter, should we move it before case p if !supportCodegen(p) =>? Otherwise, isn't InputAdapter still be added between ColumnarToRowExec and p if p doesn't support codegen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case p if !supportCodegen(p) won't match ColumnarToRowExec, so it doesn't matter where we put the case c: ColumnarToRowExec. I just want to be consistent with the existing code style and put it after the case j: SortMergeJoinExec
| } | ||
|
|
||
| override def doExecuteColumnar(): RDD[ColumnarBatch] = { | ||
| child.executeColumnar() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InputAdapter doesn't support columnar execution now? Seems we can change supportsColumnar and remove doExecuteColumnar?
| child => InputAdapter(insertWholeStageCodegen(child)))) | ||
| // `ColumnarToRowExec` is kind of a leaf node to whole-stage-codegen. Its generated code can | ||
| // process data from the input RDD directly. | ||
| case c: ColumnarToRowExec => c |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we recursively call insertInputAdapter on ColumnarToRowExec's children?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, because ColumnarToRowExec is leaf node of the codegen stage. But you remind me that we should call insertWholeStageCodegen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, yes, it should be insertWholeStageCodegen.
| if (cb != null) { | ||
| cb.close() | ||
| cb = null | ||
| // This avoids calling `output` in the RDD closure, so that we don't need to include the entire |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: output -> schema?
|
After more thoughts, I think we should still insert |
|
Test build #108290 has finished for PR 25264 at commit
|
|
Test build #108371 has finished for PR 25264 at commit
|
|
retest this please |
|
Test build #108375 has finished for PR 25264 at commit
|
|
retest this please |
|
Test build #108380 has finished for PR 25264 at commit
|
|
@cloud-fan are you going to update InputAdapter supportsColumnar and doExecuteColumnar? |
|
@tgravescs no I'm not going to. It's correct that |
| // The `FileScanRDD` returns an iterator which scans the file during the `hasNext` call. | ||
| val startNs = System.nanoTime() | ||
| val re = fileScanIterator.hasNext | ||
| scanTimeMetrics += ((System.nanoTime() - startNs) / (1000 * 1000)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NANOSECONDS.toMillis
| } | ||
| // `InputAdapter` can only generate code to process the rows from its child. If the child produces | ||
| // columnar batches, there must be a `ColumnarToRowExec` above `InputAdapter` to handle it by | ||
| // overriding `inputRDD`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had misread/misunderstood this originally.
we are overriding "inputRDDs" (note the s) in ColumnarToRowExec which is bypassing calling inputRDD here and calling InputAdapter.executeColumnar from ColumnarToRowExec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the ColumnarToRowExec doubles as in InputAdapter? Wouldn't it be less confusing to replace the InputAdapter with ColumnarToRowExec?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried the idea of replace the InputAdapter with ColumnarToRowExec before. But then it's a little weird to see ColumnarToRowExec act as both the boundary of columnar execution stage and codegen stage. We need to
- handle
ColumnarToRowExecspecially when planning whole-stage-codegen - implement
ColumnarToRowExec.treeString, which should print this columnar node and remove the whole-stage-codegen mark from its child's treeString
I feel it's simpler to always let InputAdapter be the boundary of codegen stage.
|
Test build #108524 has finished for PR 25264 at commit
|
| }) | ||
| // This avoids calling `output` in the RDD closure, so that we don't need to include the entire | ||
| // plan (this) in the closure. | ||
| val localOutput = this.output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| // plan (this) in the closure. | ||
| val localOutput = this.output | ||
| child.executeColumnar().mapPartitionsInternal { batches => | ||
| val outputProject = UnsafeProjection.create(localOutput, localOutput) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: maybe name this toUnsafe to better convey the intent of the projection
| scanTimeMetrics: SQLMetric) extends Iterator[T] { | ||
|
|
||
| override def hasNext: Boolean = { | ||
| // The `FileScanRDD` returns an iterator which scans the file during the `hasNext` call. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling System.nanoTime() per tuple isn't exactly cheap. This probably regresses cases where the scan produces rows instead of batches. I am actually not sure if there is an (easy) way around this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only workaround is that we don't expose scan time for row based formats.
|
Test build #108543 has finished for PR 25264 at commit
|
|
retest this please |
|
Test build #108559 has finished for PR 25264 at commit
|
tgravescs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
thanks for the review, merging to master! |
…side in ColumnarToRowExec https://issues.apache.org/jira/browse/SPARK-52484 ### What changes were proposed in this pull request? The PR removes the unnecessary assertion in `ColumnarToRowExec` introduced by #25264 to guarantee some flexibilities for 3rd Spark plugins. Especially in Apache Gluten, the assertion blocks some of our effort in query optimization because we needed an intermediate state of the query plan which Spark may see as illegal. Moreover, some typical reasons this intermediate state is needed in Gluten are: 1. Gluten has a cost evaluator API to evaluate the cost of a `transition rule` (which adds a unary node on top of an input plan). In the case Gluten will need a fake leaf to let the rule apply on it for cost evaluation. This leaf node has to be made a columnar one to bypass this assertion, which is a bit hacky. 2. Gluten has a cascades-style query optimizer (RAS) which could set a leaf, dummy, row-based plan node to hide up a child-tree of a brach query plan node, during which this leaf is to represent a so-called cascades 'group'. Although this pattern (C2R on a row-based plan) is illegal, it could still be used as the input of an optimizer rule to potentially be matched on and then to be converted into a valid query plan. This PR is to remove the assertion to ensure some flexibilities to the 3rd plugins. This should be no harm for the upstream Apache Spark, because the query execution will still be failed by [this error](https://github.com/apache/spark/blob/5d0b2f41794bf4dd25b3ce19bc4f634082b40876/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L343-L351) without this assertion on an illegal query plan. Some workarounds used by Gluten for bypassing this assertion: 1. https://github.com/apache/incubator-gluten/blob/0a1b5c28678653242ab0fd7b28ebba1dca43ccb1/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala#L83 2. https://github.com/apache/incubator-gluten/blob/0a1b5c28678653242ab0fd7b28ebba1dca43ccb1/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala#L51-L55 Once the assertion is removed, Gluten will be able to remove these workarounds to simply code. ### Does this PR introduce _any_ user-facing change? Basically no. An assertion error in plan-building time will be replaced by an exception in execution time (still from the driver side) when an illegal query plan is generated. ### How was this patch tested? Existing UTs. Closes #51183 from zhztheplayer/wip-rm-c2r-check. Authored-by: Hongze Zhang <[email protected]> Signed-off-by: Kent Yao <[email protected]>
What changes were proposed in this pull request?
I did a post-hoc review of #25008 , and would like to propose some cleanups/fixes/improvements:
ColumnarToRowExec. This metrics is specific to file scan, and doesn't make sense for a general batch-to-row operator.RDD#mapPartitionsInternalinstead offlatMapin several places, asmapPartitionsInternalis created for Spark SQL and we use it in almost all the SQL operators.limitNotReachedCondinColumnarToRowExec. This was in theColumnarBatchScanbefore and is critical for performance.RowToColumnarExecaboveWholeStageExec, or aColumnarToRowExecabove theInputAdapter.ColumnarBatchinRowToColumnarExec. We don't need to create a new one every time, just need to reset it.LogicalPlanTagInSparkPlanSuiteWholeStageCodegenSuite.How was this patch tested?
existing tests