-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark 3.4: Codegen support for UpdateRowsExec #7691
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| copy(child = newChild) | ||
| } | ||
|
|
||
| override def usedInputs: AttributeSet = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a similar optimization as in ProjectExec. It ensures if the same input attribute is used in multiple output expressions, we only fetch it once.
Suppose I have c1 = id + 10, c2 = id - 10. Instead of fetching id for each output expression, we can do that once and reuse the same variable.
| child.asInstanceOf[CodegenSupport].produce(ctx, this) | ||
| } | ||
|
|
||
| override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is inspired by ProjectExec and GenerateExec in Spark. Checking out those nodes would help reviewing this change.
| val deleteOutputVars = deleteExprs.map(_.genCode(ctx)) | ||
|
|
||
| val insertExprs = BindReferences.bindReferences(insertOutput, child.output) | ||
| val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block ensures the same sub expressions are evaluated once.
For instance, I have c1 = id - 10, c2 = id - 10. This ensures id - 10 is not computed for each output.
| |// generate DELETE record | ||
| |${consume(ctx, deleteOutputVars)} | ||
| |// generate INSERT records | ||
| |${evaluateVariables(insertLocalInputVars)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to ProjectExec but also generates a delete record before.
| withSQLConf( | ||
| ImmutableMap.of(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false"), | ||
| () -> { | ||
| String code = generateCode("UPDATE %s SET c1 = c1 - 11, c2 = c1 - 11", commitTarget()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the output for the first stage of this operation.
== Subtree 1 / 2 (maxMethodCodeSize:530; maxConstantPoolSize:144(0.22% used); numInnerClasses:0) ==
*(1) UpdateRowsExec[__row_operation#43, id#44, c1#45, c2#46, _file#47, _pos#48L, _spec_id#49, _partition#50]
+- *(1) Project [id#34, c1#35, c2#36, _file#39, _pos#40L, _spec_id#37, _partition#38]
+- BatchScan testhive.default.table[id#34, c1#35, c2#36, _file#39, _pos#40L, _spec_id#37, _partition#38] testhive.default.table (branch=main) [filters=, groupedBy=] RuntimeFilters: []
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private scala.collection.Iterator inputadapter_input_0;
/* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] project_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[6];
/* 011 */
/* 012 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 013 */ this.references = references;
/* 014 */ }
/* 015 */
/* 016 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 017 */ partitionIndex = index;
/* 018 */ this.inputs = inputs;
/* 019 */ inputadapter_input_0 = inputs[0];
/* 020 */ project_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(7, 64);
/* 021 */ project_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_mutableStateArray_0[0], 0);
/* 022 */ project_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(8, 64);
/* 023 */ project_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_mutableStateArray_0[2], 0);
/* 024 */ project_mutableStateArray_0[4] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(8, 64);
/* 025 */ project_mutableStateArray_0[5] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_mutableStateArray_0[4], 0);
/* 026 */
/* 027 */ }
/* 028 */
/* 029 */ protected void processNext() throws java.io.IOException {
/* 030 */ while ( inputadapter_input_0.hasNext()) {
/* 031 */ InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next();
/* 032 */
/* 033 */ // common sub-expressions
/* 034 */
/* 035 */ boolean inputadapter_isNull_1 = inputadapter_row_0.isNullAt(1);
/* 036 */ int inputadapter_value_1 = inputadapter_isNull_1 ?
/* 037 */ -1 : (inputadapter_row_0.getInt(1));
/* 038 */
/* 039 */ // generate DELETE record
/* 040 */
/* 041 */ UTF8String inputadapter_value_3 = inputadapter_row_0.getUTF8String(3);
/* 042 */ long inputadapter_value_4 = inputadapter_row_0.getLong(4);
/* 043 */ int inputadapter_value_5 = inputadapter_row_0.getInt(5);
/* 044 */ boolean inputadapter_isNull_6 = inputadapter_row_0.isNullAt(6);
/* 045 */ InternalRow inputadapter_value_6 = inputadapter_isNull_6 ?
/* 046 */ null : (inputadapter_row_0.getStruct(6, 0));
/* 047 */ project_mutableStateArray_0[2].reset();
/* 048 */
/* 049 */ project_mutableStateArray_0[2].zeroOutNullBytes();
/* 050 */
/* 051 */ project_mutableStateArray_0[2].write(0, 1);
/* 052 */
/* 053 */ if (true) {
/* 054 */ project_mutableStateArray_0[2].setNullAt(1);
/* 055 */ } else {
/* 056 */ project_mutableStateArray_0[2].write(1, -1);
/* 057 */ }
/* 058 */
/* 059 */ if (true) {
/* 060 */ project_mutableStateArray_0[2].setNullAt(2);
/* 061 */ } else {
/* 062 */ project_mutableStateArray_0[2].write(2, -1);
/* 063 */ }
/* 064 */
/* 065 */ if (true) {
/* 066 */ project_mutableStateArray_0[2].setNullAt(3);
/* 067 */ } else {
/* 068 */ project_mutableStateArray_0[2].write(3, -1);
/* 069 */ }
/* 070 */
/* 071 */ if (false) {
/* 072 */ project_mutableStateArray_0[2].setNullAt(4);
/* 073 */ } else {
/* 074 */ project_mutableStateArray_0[2].write(4, inputadapter_value_3);
/* 075 */ }
/* 076 */
/* 077 */ if (false) {
/* 078 */ project_mutableStateArray_0[2].setNullAt(5);
/* 079 */ } else {
/* 080 */ project_mutableStateArray_0[2].write(5, inputadapter_value_4);
/* 081 */ }
/* 082 */
/* 083 */ if (false) {
/* 084 */ project_mutableStateArray_0[2].setNullAt(6);
/* 085 */ } else {
/* 086 */ project_mutableStateArray_0[2].write(6, inputadapter_value_5);
/* 087 */ }
/* 088 */
/* 089 */ if (inputadapter_isNull_6) {
/* 090 */ project_mutableStateArray_0[2].setNullAt(7);
/* 091 */ } else {
/* 092 */ final InternalRow updaterows_tmpInput_0 = inputadapter_value_6;
/* 093 */ if (updaterows_tmpInput_0 instanceof UnsafeRow) {
/* 094 */ project_mutableStateArray_0[2].write(7, (UnsafeRow) updaterows_tmpInput_0);
/* 095 */ } else {
/* 096 */ // Remember the current cursor so that we can calculate how many bytes are
/* 097 */ // written later.
/* 098 */ final int updaterows_previousCursor_0 = project_mutableStateArray_0[2].cursor();
/* 099 */
/* 100 */ project_mutableStateArray_0[3].resetRowWriter();
/* 101 */
/* 102 */ project_mutableStateArray_0[2].setOffsetAndSizeFromPreviousCursor(7, updaterows_previousCursor_0);
/* 103 */ }
/* 104 */ }
/* 105 */ append((project_mutableStateArray_0[2].getRow()));
/* 106 */
/* 107 */ // generate INSERT records
/* 108 */
/* 109 */ boolean updaterows_isNull_8 = true;
/* 110 */ int updaterows_value_8 = -1;
/* 111 */
/* 112 */ if (!inputadapter_isNull_1) {
/* 113 */ updaterows_isNull_8 = false; // resultCode could change nullability.
/* 114 */
/* 115 */ updaterows_value_8 = inputadapter_value_1 - 11;
/* 116 */
/* 117 */ }
/* 118 */
/* 119 */ boolean inputadapter_isNull_0 = inputadapter_row_0.isNullAt(0);
/* 120 */ int inputadapter_value_0 = inputadapter_isNull_0 ?
/* 121 */ -1 : (inputadapter_row_0.getInt(0));
/* 122 */ project_mutableStateArray_0[4].reset();
/* 123 */
/* 124 */ project_mutableStateArray_0[4].zeroOutNullBytes();
/* 125 */
/* 126 */ project_mutableStateArray_0[4].write(0, 3);
/* 127 */
/* 128 */ if (inputadapter_isNull_0) {
/* 129 */ project_mutableStateArray_0[4].setNullAt(1);
/* 130 */ } else {
/* 131 */ project_mutableStateArray_0[4].write(1, inputadapter_value_0);
/* 132 */ }
/* 133 */
/* 134 */ if (updaterows_isNull_8) {
/* 135 */ project_mutableStateArray_0[4].setNullAt(2);
/* 136 */ } else {
/* 137 */ project_mutableStateArray_0[4].write(2, updaterows_value_8);
/* 138 */ }
/* 139 */
/* 140 */ if (updaterows_isNull_8) {
/* 141 */ project_mutableStateArray_0[4].setNullAt(3);
/* 142 */ } else {
/* 143 */ project_mutableStateArray_0[4].write(3, updaterows_value_8);
/* 144 */ }
/* 145 */
/* 146 */ if (true) {
/* 147 */ project_mutableStateArray_0[4].setNullAt(4);
/* 148 */ } else {
/* 149 */ project_mutableStateArray_0[4].write(4, ((UTF8String)null));
/* 150 */ }
/* 151 */
/* 152 */ if (true) {
/* 153 */ project_mutableStateArray_0[4].setNullAt(5);
/* 154 */ } else {
/* 155 */ project_mutableStateArray_0[4].write(5, -1L);
/* 156 */ }
/* 157 */
/* 158 */ if (true) {
/* 159 */ project_mutableStateArray_0[4].setNullAt(6);
/* 160 */ } else {
/* 161 */ project_mutableStateArray_0[4].write(6, -1);
/* 162 */ }
/* 163 */
/* 164 */ if (true) {
/* 165 */ project_mutableStateArray_0[4].setNullAt(7);
/* 166 */ } else {
/* 167 */ final InternalRow wholestagecodegen_tmpInput_0 = ((InternalRow)null);
/* 168 */ if (wholestagecodegen_tmpInput_0 instanceof UnsafeRow) {
/* 169 */ project_mutableStateArray_0[4].write(7, (UnsafeRow) wholestagecodegen_tmpInput_0);
/* 170 */ } else {
/* 171 */ // Remember the current cursor so that we can calculate how many bytes are
/* 172 */ // written later.
/* 173 */ final int wholestagecodegen_previousCursor_0 = project_mutableStateArray_0[4].cursor();
/* 174 */
/* 175 */ project_mutableStateArray_0[5].resetRowWriter();
/* 176 */
/* 177 */ project_mutableStateArray_0[4].setOffsetAndSizeFromPreviousCursor(7, wholestagecodegen_previousCursor_0);
/* 178 */ }
/* 179 */ }
/* 180 */ append((project_mutableStateArray_0[4].getRow()));
/* 181 */ if (shouldStop()) return;
/* 182 */ }
/* 183 */ }
/* 184 */
/* 185 */ }
| .config(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED().key(), "false") | ||
| .config(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false") | ||
| .config(SQLConf.SHUFFLE_PARTITIONS().key(), "2") | ||
| .config(SQLConf.CODEGEN_FACTORY_MODE().key(), "CODEGEN_ONLY") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not see a big performance improvement in the existing benchmark as read and write dominate, except the case with lots of updates. However, I've seen reduced memory pressure. Remember that the new approach without codegen was just a bit slower than the original projection. That said, codegen provides other benefits like sub expression elimination. It is important to be on par with the projection in terms of features.
Benchmark Mode Cnt Score Error Units
[OLD] UpdateProjectionBenchmark.mergeOnRead10Percent ss 5 4.915 ± 0.058 s/op
[OLD] UpdateProjectionBenchmark.mergeOnRead10Percent:·gc.count ss 5 12.000 counts
[NEW] UpdateProjectionBenchmark.mergeOnRead10Percent ss 5 4.920 ± 0.080 s/op
[NEW] UpdateProjectionBenchmark.mergeOnRead10Percent:·gc.count ss 5 11.000 counts
[OLD] UpdateProjectionBenchmark.mergeOnReadUpdate30Percent ss 5 10.146 ± 0.347 s/op
[OLD] UpdateProjectionBenchmark.mergeOnReadUpdate30Percent:·gc.count ss 5 25.000 counts
[NEW] UpdateProjectionBenchmark.mergeOnReadUpdate30Percent ss 5 10.104 ± 0.122 s/op
[NEW] UpdateProjectionBenchmark.mergeOnReadUpdate30Percent:·gc.count ss 5 20.000
[OLD] UpdateProjectionBenchmark.mergeOnReadUpdate75Percent ss 5 26.108 ± 0.343 s/op
[OLD] UpdateProjectionBenchmark.mergeOnReadUpdate75Percent:·gc.count ss 5 102.000 counts
[NEW] UpdateProjectionBenchmark.mergeOnReadUpdate75Percent ss 5 24.331 ± 0.392 s/op
[NEW] UpdateProjectionBenchmark.mergeOnReadUpdate75Percent:·gc.count ss 5 32.000 counts
| ("", insertExprs.map(_.genCode(ctx)), Seq.empty) | ||
| } | ||
|
|
||
| val nonDeterministicInsertAttrs = insertOutput.zip(output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to follow this, is there any test that activates this part?
Walking through the new tests but didnt see this path activated, but maybe its covered by existing ones.
And there's no concept of this in delete records, is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, Spark only allows non-deterministic expressions in a few nodes and all custom nodes are prohibited. We will need to add this to Spark to allow non-deterministic expressions in assignments. It is a similar problem in MERGE operations. Until then, I can't add a test. That said, some vendors that control Spark may be able to modify Spark so this logic would apply.
szehon-ho
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK it looks ok to me with my limited knowledge, just comparing to ProjectExec
| |${consume(ctx, deleteOutputVars)} | ||
| |// generate INSERT records | ||
| |${evaluateVariables(insertLocalInputVars)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering if we should split DELETE & insert record generation in separate functions as it might potentially cause CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT limit exceeded, expand exec, also had to this at one point apache/spark#32457, Thoughts @aokolnychyi ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this but was not sure given that we only have two projections. It is probably safer to split. I'll make the change.
| references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId)) | ||
| } | ||
|
|
||
| override def inputRDDs(): Seq[RDD[InternalRow]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also make needCopyResult true, considering this operator produces two row for a single row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good catch.
| sql("INSERT INTO TABLE %s VALUES (1, 11, 111), (2, 22, 222)", tableName); | ||
| createBranchIfNeeded(); | ||
|
|
||
| // disable AQE to see the final plan with codegen in EXPLAIN |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do something like :
def getCodeAndCommentForUpdateRowExec(df: DataFrame): CodeAndComment = {
val plan = df.queryExecution.executedPlan
plan.execute()
val updateRowExec = findTopLevelUpdateRowExec(plan)
getCodeAndComment(updateRowExec.head)
}
def findTopLevelUpdateRowExec(plan: SparkPlan): Seq[UpdateRowExec] = {
filterByType[UpdateRowExec](plan)
}
private def getCodeAndComment(plan: SparkPlan): CodeAndComment = {
val codeGenSubTree = WholeStageCodegenExec(plan)(1)
val codeAndComment = codeGenSubTree.doCodeGen()._2
try {
CodeGenerator.compile(codeAndComment)
} catch {
case e: Exception =>
val msg =
s"""
|failed to compile:
|Subtree:
|$codeGenSubTree
|Generated code:
|${CodeFormatter.format(codeAndComment)}
""".stripMargin
fail(msg, e)
}
codeAndComment
}
This way we can test both AQE without AQE plans, thoughts @aokolnychyi ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me play around.
| } | ||
|
|
||
| @Test | ||
| public void subExpressionEliminationInCodegen() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we now run TestUpdate both with / without Codegen ? as if at the moment there is a fallback is there is an issue in codegen that it silently fallbacks to interpreted mode which we don't intend, thoughts ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about disabling the fallback to cut the time. Maybe, we can further parameterize this suite. Let me see.
|
I will change this approach in a bit. |
This PR adds codegen support for the newly added
UpdateRowsExecthat splits updates into deletes and inserts. This is a follow-up PR to #7646 that added this node. It is needed to be on par with the initial implementation that used a projection.