Skip to content

Commit 10e526e

Browse files
committed
[SPARK-20213][SQL] Fix DataFrameWriter operations in SQL UI tab
## What changes were proposed in this pull request? Currently the `DataFrameWriter` operations have several problems: 1. non-file-format data source writing action doesn't show up in the SQL tab in Spark UI 2. file-format data source writing action shows a scan node in the SQL tab, without saying anything about writing. (streaming also have this issue, but not fixed in this PR) 3. Spark SQL CLI actions don't show up in the SQL tab. This PR fixes all of them, by refactoring the `ExecuteCommandExec` to make it have children. close #17540 ## How was this patch tested? existing tests. Also test the UI manually. For a simple command: `Seq(1 -> "a").toDF("i", "j").write.parquet("/tmp/qwe")` before this PR: <img width="266" alt="qq20170523-035840 2x" src="https://cloud.githubusercontent.com/assets/3182036/26326050/24e18ba2-3f6c-11e7-8817-6dd275bf6ac5.png"> after this PR: <img width="287" alt="qq20170523-035708 2x" src="https://cloud.githubusercontent.com/assets/3182036/26326054/2ad7f460-3f6c-11e7-8053-d68325beb28f.png"> Author: Wenchen Fan <[email protected]> Closes #18064 from cloud-fan/execution.
1 parent fa757ee commit 10e526e

File tree

37 files changed

+299
-218
lines changed

37 files changed

+299
-218
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,10 @@ private[kafka010] object KafkaWriter extends Logging {
8585
topic: Option[String] = None): Unit = {
8686
val schema = queryExecution.analyzed.output
8787
validateQuery(queryExecution, kafkaParameters, topic)
88-
SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
89-
queryExecution.toRdd.foreachPartition { iter =>
90-
val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
91-
Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
92-
finallyBlock = writeTask.close())
93-
}
88+
queryExecution.toRdd.foreachPartition { iter =>
89+
val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
90+
Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
91+
finallyBlock = writeTask.close())
9492
}
9593
}
9694
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
357357
})
358358
}
359359

360-
override protected def innerChildren: Seq[QueryPlan[_]] = subqueries
360+
override def innerChildren: Seq[QueryPlan[_]] = subqueries
361361

362362
/**
363363
* Returns a plan where a best effort attempt has been made to transform `this` in a way

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
2424
* commands can be used by parsers to represent DDL operations. Commands, unlike queries, are
2525
* eagerly executed.
2626
*/
27-
trait Command extends LeafNode {
27+
trait Command extends LogicalPlan {
2828
override def output: Seq[Attribute] = Seq.empty
29+
override def children: Seq[LogicalPlan] = Seq.empty
2930
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import java.math.{MathContext, RoundingMode}
2222
import scala.util.control.NonFatal
2323

2424
import org.apache.spark.internal.Logging
25-
import org.apache.spark.sql.{AnalysisException, Row}
25+
import org.apache.spark.sql.catalyst.InternalRow
26+
import org.apache.spark.sql.AnalysisException
2627
import org.apache.spark.sql.catalyst.expressions._
2728
import org.apache.spark.sql.catalyst.expressions.aggregate._
2829
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -243,9 +244,9 @@ object ColumnStat extends Logging {
243244
}
244245

245246
col.dataType match {
246-
case _: IntegralType => fixedLenTypeStruct(LongType)
247+
case dt: IntegralType => fixedLenTypeStruct(dt)
247248
case _: DecimalType => fixedLenTypeStruct(col.dataType)
248-
case DoubleType | FloatType => fixedLenTypeStruct(DoubleType)
249+
case dt @ (DoubleType | FloatType) => fixedLenTypeStruct(dt)
249250
case BooleanType => fixedLenTypeStruct(col.dataType)
250251
case DateType => fixedLenTypeStruct(col.dataType)
251252
case TimestampType => fixedLenTypeStruct(col.dataType)
@@ -264,14 +265,12 @@ object ColumnStat extends Logging {
264265
}
265266

266267
/** Convert a struct for column stats (defined in statExprs) into [[ColumnStat]]. */
267-
def rowToColumnStat(row: Row, attr: Attribute): ColumnStat = {
268+
def rowToColumnStat(row: InternalRow, attr: Attribute): ColumnStat = {
268269
ColumnStat(
269270
distinctCount = BigInt(row.getLong(0)),
270271
// for string/binary min/max, get should return null
271-
min = Option(row.get(1))
272-
.map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply),
273-
max = Option(row.get(2))
274-
.map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply),
272+
min = Option(row.get(1, attr.dataType)),
273+
max = Option(row.get(2, attr.dataType)),
275274
nullCount = BigInt(row.getLong(3)),
276275
avgLen = row.getLong(4),
277276
maxLen = row.getLong(5)

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
2626
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
2727
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
2828
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
29+
import org.apache.spark.sql.execution.SQLExecution
2930
import org.apache.spark.sql.execution.command.DDLUtils
3031
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, SaveIntoDataSourceCommand}
3132
import org.apache.spark.sql.sources.BaseRelation
@@ -231,12 +232,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
231232
assertNotBucketed("save")
232233

233234
runCommand(df.sparkSession, "save") {
234-
SaveIntoDataSourceCommand(
235-
query = df.logicalPlan,
236-
provider = source,
235+
DataSource(
236+
sparkSession = df.sparkSession,
237+
className = source,
237238
partitionColumns = partitioningColumns.getOrElse(Nil),
238-
options = extraOptions.toMap,
239-
mode = mode)
239+
options = extraOptions.toMap).planForWriting(mode, df.logicalPlan)
240240
}
241241
}
242242

@@ -607,7 +607,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
607607
try {
608608
val start = System.nanoTime()
609609
// call `QueryExecution.toRDD` to trigger the execution of commands.
610-
qe.toRdd
610+
SQLExecution.withNewExecutionId(session, qe)(qe.toRdd)
611611
val end = System.nanoTime()
612612
session.listenerManager.onSuccess(name, qe, end - start)
613613
} catch {

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,9 @@ class Dataset[T] private[sql](
179179
// to happen right away to let these side effects take place eagerly.
180180
queryExecution.analyzed match {
181181
case c: Command =>
182-
LocalRelation(c.output, queryExecution.executedPlan.executeCollect())
182+
LocalRelation(c.output, withAction("command", queryExecution)(_.executeCollect()))
183183
case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
184-
LocalRelation(u.output, queryExecution.executedPlan.executeCollect())
184+
LocalRelation(u.output, withAction("command", queryExecution)(_.executeCollect()))
185185
case _ =>
186186
queryExecution.analyzed
187187
}
@@ -248,8 +248,13 @@ class Dataset[T] private[sql](
248248
_numRows: Int, truncate: Int = 20, vertical: Boolean = false): String = {
249249
val numRows = _numRows.max(0)
250250
val takeResult = toDF().take(numRows + 1)
251-
val hasMoreData = takeResult.length > numRows
252-
val data = takeResult.take(numRows)
251+
showString(takeResult, numRows, truncate, vertical)
252+
}
253+
254+
private def showString(
255+
dataWithOneMoreRow: Array[Row], numRows: Int, truncate: Int, vertical: Boolean): String = {
256+
val hasMoreData = dataWithOneMoreRow.length > numRows
257+
val data = dataWithOneMoreRow.take(numRows)
253258

254259
lazy val timeZone =
255260
DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)
@@ -684,6 +689,18 @@ class Dataset[T] private[sql](
684689
} else {
685690
println(showString(numRows, truncate = 0))
686691
}
692+
693+
// An internal version of `show`, which won't set execution id and trigger listeners.
694+
private[sql] def showInternal(_numRows: Int, truncate: Boolean): Unit = {
695+
val numRows = _numRows.max(0)
696+
val takeResult = toDF().takeInternal(numRows + 1)
697+
698+
if (truncate) {
699+
println(showString(takeResult, numRows, truncate = 20, vertical = false))
700+
} else {
701+
println(showString(takeResult, numRows, truncate = 0, vertical = false))
702+
}
703+
}
687704
// scalastyle:on println
688705

689706
/**
@@ -2453,6 +2470,11 @@ class Dataset[T] private[sql](
24532470
*/
24542471
def take(n: Int): Array[T] = head(n)
24552472

2473+
// An internal version of `take`, which won't set execution id and trigger listeners.
2474+
private[sql] def takeInternal(n: Int): Array[T] = {
2475+
collectFromPlan(limit(n).queryExecution.executedPlan)
2476+
}
2477+
24562478
/**
24572479
* Returns the first `n` rows in the Dataset as a list.
24582480
*
@@ -2477,6 +2499,11 @@ class Dataset[T] private[sql](
24772499
*/
24782500
def collect(): Array[T] = withAction("collect", queryExecution)(collectFromPlan)
24792501

2502+
// An internal version of `collect`, which won't set execution id and trigger listeners.
2503+
private[sql] def collectInternal(): Array[T] = {
2504+
collectFromPlan(queryExecution.executedPlan)
2505+
}
2506+
24802507
/**
24812508
* Returns a Java list that contains all rows in this Dataset.
24822509
*
@@ -2518,6 +2545,11 @@ class Dataset[T] private[sql](
25182545
plan.executeCollect().head.getLong(0)
25192546
}
25202547

2548+
// An internal version of `count`, which won't set execution id and trigger listeners.
2549+
private[sql] def countInternal(): Long = {
2550+
groupBy().count().queryExecution.executedPlan.executeCollect().head.getLong(0)
2551+
}
2552+
25212553
/**
25222554
* Returns a new Dataset that has exactly `numPartitions` partitions.
25232555
*
@@ -2763,7 +2795,7 @@ class Dataset[T] private[sql](
27632795
createTempViewCommand(viewName, replace = true, global = true)
27642796
}
27652797

2766-
private def createTempViewCommand(
2798+
private[spark] def createTempViewCommand(
27672799
viewName: String,
27682800
replace: Boolean,
27692801
global: Boolean): CreateViewCommand = {
@@ -2954,17 +2986,17 @@ class Dataset[T] private[sql](
29542986
}
29552987

29562988
/** A convenient function to wrap a logical plan and produce a DataFrame. */
2957-
@inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = {
2989+
@inline private def withPlan(logicalPlan: LogicalPlan): DataFrame = {
29582990
Dataset.ofRows(sparkSession, logicalPlan)
29592991
}
29602992

29612993
/** A convenient function to wrap a logical plan and produce a Dataset. */
2962-
@inline private def withTypedPlan[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
2994+
@inline private def withTypedPlan[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = {
29632995
Dataset(sparkSession, logicalPlan)
29642996
}
29652997

29662998
/** A convenient function to wrap a set based logical plan and produce a Dataset. */
2967-
@inline private def withSetOperator[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
2999+
@inline private def withSetOperator[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = {
29683000
if (classTag.runtimeClass.isAssignableFrom(classOf[Row])) {
29693001
// Set operators widen types (change the schema), so we cannot reuse the row encoder.
29703002
Dataset.ofRows(sparkSession, logicalPlan).asInstanceOf[Dataset[U]]

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,11 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
113113

114114

115115
/**
116-
* Returns the result as a hive compatible sequence of strings. This is for testing only.
116+
* Returns the result as a hive compatible sequence of strings. This is used in tests and
117+
* `SparkSQLDriver` for CLI applications.
117118
*/
118119
def hiveResultString(): Seq[String] = executedPlan match {
119-
case ExecutedCommandExec(desc: DescribeTableCommand) =>
120+
case ExecutedCommandExec(desc: DescribeTableCommand, _) =>
120121
// If it is a describe command for a Hive table, we want to have the output format
121122
// be similar with Hive.
122123
desc.run(sparkSession).map {
@@ -127,7 +128,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
127128
.mkString("\t")
128129
}
129130
// SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp.
130-
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
131+
case command @ ExecutedCommandExec(s: ShowTablesCommand, _) if !s.isExtended =>
131132
command.executeCollect().map(_.getString(1))
132133
case other =>
133134
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq

sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,19 @@ object SQLExecution {
3939
executionIdToQueryExecution.get(executionId)
4040
}
4141

42+
private val testing = sys.props.contains("spark.testing")
43+
44+
private[sql] def checkSQLExecutionId(sparkSession: SparkSession): Unit = {
45+
// only throw an exception during tests. a missing execution ID should not fail a job.
46+
if (testing && sparkSession.sparkContext.getLocalProperty(EXECUTION_ID_KEY) == null) {
47+
// Attention testers: when a test fails with this exception, it means that the action that
48+
// started execution of a query didn't call withNewExecutionId. The execution ID should be
49+
// set by calling withNewExecutionId in the action that begins execution, like
50+
// Dataset.collect or DataFrameWriter.insertInto.
51+
throw new IllegalStateException("Execution ID should be set")
52+
}
53+
}
54+
4255
/**
4356
* Wrap an action that will execute "queryExecution" to track all Spark jobs in the body so that
4457
* we can connect them with an execution.

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
346346
// Can we automate these 'pass through' operations?
347347
object BasicOperators extends Strategy {
348348
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
349-
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
349+
case r: RunnableCommand => ExecutedCommandExec(r, r.children.map(planLater)) :: Nil
350350

351351
case MemoryPlan(sink, output) =>
352352
val encoder = RowEncoder(sink.schema)

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ case class InMemoryRelation(
6464
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
6565
extends logical.LeafNode with MultiInstanceRelation {
6666

67-
override protected def innerChildren: Seq[SparkPlan] = Seq(child)
67+
override def innerChildren: Seq[SparkPlan] = Seq(child)
6868

6969
override def producedAttributes: AttributeSet = outputSet
7070

0 commit comments

Comments
 (0)