Skip to content

Commit 4f0be6c

Browse files
committed
For review comment.
1 parent 31262d7 commit 4f0be6c

File tree

3 files changed

+16
-20
lines changed

3 files changed

+16
-20
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,7 @@ class DataSourceRDD(
6969
context.addTaskCompletionListener[Unit] { _ =>
7070
// In case of early stopping before consuming the entire iterator,
7171
// we need to do one more metric update at the end of the task.
72-
reader.currentMetricsValues.foreach { metric =>
73-
assert(customMetrics.contains(metric.name()),
74-
s"Custom metrics ${customMetrics.keys.mkString(", ")} do not contain the metric " +
75-
s"${metric.name()}")
76-
customMetrics(metric.name()).set(metric.value())
77-
}
72+
CustomMetrics.updateMetrics(reader.currentMetricsValues, customMetrics)
7873
reader.close()
7974
}
8075
// TODO: SPARK-25083 remove the type erasure hack in data source scan
@@ -105,12 +100,7 @@ private class PartitionIterator[T](
105100
throw QueryExecutionErrors.endOfStreamError()
106101
}
107102
if (numRow % CustomMetrics.numRowsPerUpdate == 0) {
108-
reader.currentMetricsValues.foreach { metric =>
109-
assert(customMetrics.contains(metric.name()),
110-
s"Custom metrics ${customMetrics.keys.mkString(", ")} do not contain the metric " +
111-
s"${metric.name()}")
112-
customMetrics(metric.name()).set(metric.value())
113-
}
103+
CustomMetrics.updateMetrics(reader.currentMetricsValues, customMetrics)
114104
}
115105
numRow += 1
116106
valuePrepared = false

sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
package org.apache.spark.sql.execution.metric
1919

20-
import org.apache.spark.sql.connector.metric.CustomMetric
20+
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
2121

2222
object CustomMetrics {
2323
private[spark] val V2_CUSTOM = "v2Custom"
2424

25-
private[spark] val numRowsPerUpdate = 100L
25+
private[spark] val numRowsPerUpdate = 100
2626

2727
/**
2828
* Given a class name, builds and returns a metric type for a V2 custom metric class
@@ -43,4 +43,15 @@ object CustomMetrics {
4343
None
4444
}
4545
}
46+
47+
/**
48+
* Updates given custom metrics.
49+
*/
50+
def updateMetrics(
51+
currentMetricsValues: Seq[CustomTaskMetric],
52+
customMetrics: Map[String, SQLMetric]): Unit = {
53+
currentMetricsValues.foreach { metric =>
54+
customMetrics(metric.name()).set(metric.value())
55+
}
56+
}
4657
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,7 @@ class ContinuousDataSourceRDD(
9696

9797
override def getNext(): InternalRow = {
9898
if (numRow % CustomMetrics.numRowsPerUpdate == 0) {
99-
partitionReader.currentMetricsValues.foreach { metric =>
100-
assert(customMetrics.contains(metric.name()),
101-
s"Custom metrics ${customMetrics.keys.mkString(", ")} do not contain the metric " +
102-
s"${metric.name()}")
103-
customMetrics(metric.name()).set(metric.value())
104-
}
99+
CustomMetrics.updateMetrics(partitionReader.currentMetricsValues, customMetrics)
105100
}
106101
numRow += 1
107102
readerForPartition.next() match {

0 commit comments

Comments
 (0)