Skip to content

Commit 7d5c2cd

Browse files
committed
Move custom metric classes to proper package.
1 parent 1f150b9 commit 7d5c2cd

File tree

13 files changed

+114
-66
lines changed

13 files changed

+114
-66
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.{util => ju}
2222
import org.apache.spark.internal.Logging
2323
import org.apache.spark.sql.catalyst.InternalRow
2424
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
25-
import org.apache.spark.sql.connector.CustomTaskMetric
25+
import org.apache.spark.sql.connector.metric.CustomTaskMetric
2626
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
2727
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
2828

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,12 @@ import org.apache.spark.internal.Logging
3030
import org.apache.spark.kafka010.KafkaConfigUpdater
3131
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
3232
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
33-
import org.apache.spark.sql.connector.CustomMetric
3433
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
34+
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomSumMetric}
3535
import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder}
3636
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
3737
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, SupportsTruncate, WriteBuilder}
3838
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
39-
import org.apache.spark.sql.execution.metric.CustomSumMetric
4039
import org.apache.spark.sql.execution.streaming.{Sink, Source}
4140
import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStreamingUpdateAsAppend}
4241
import org.apache.spark.sql.sources._
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.metric;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
22+
import java.util.Arrays;
23+
import java.text.DecimalFormat;
24+
25+
/**
26+
* Built-in `CustomMetric` that computes average of metric values. Note that please extend this
27+
* class and override `name` and `description` to create your custom metric for real usage.
28+
*
29+
* @since 3.2.0
30+
*/
31+
@Evolving
32+
public abstract class CustomAvgMetric implements CustomMetric {
33+
@Override
34+
public String aggregateTaskMetrics(long[] taskMetrics) {
35+
if (taskMetrics.length > 0) {
36+
double average = ((double)Arrays.stream(taskMetrics).sum()) / taskMetrics.length;
37+
return new DecimalFormat("#0.000").format(average);
38+
} else {
39+
return "0";
40+
}
41+
}
42+
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java renamed to sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomMetric.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.connector;
18+
package org.apache.spark.sql.connector.metric;
1919

2020
import org.apache.spark.annotation.Evolving;
2121

@@ -29,23 +29,23 @@
2929
*/
3030
@Evolving
3131
public interface CustomMetric {
32-
/**
33-
* Returns the name of custom metric.
34-
*/
35-
String name();
32+
/**
33+
* Returns the name of custom metric.
34+
*/
35+
String name();
3636

37-
/**
38-
* Returns the description of custom metric.
39-
*/
40-
String description();
37+
/**
38+
* Returns the description of custom metric.
39+
*/
40+
String description();
4141

42-
/**
43-
* The initial value of this metric.
44-
*/
45-
long initialValue = 0L;
42+
/**
43+
* The initial value of this metric.
44+
*/
45+
long initialValue = 0L;
4646

47-
/**
48-
* Given an array of task metric values, returns aggregated final metric value.
49-
*/
50-
String aggregateTaskMetrics(long[] taskMetrics);
47+
/**
48+
* Given an array of task metric values, returns aggregated final metric value.
49+
*/
50+
String aggregateTaskMetrics(long[] taskMetrics);
5151
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.metric;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
22+
import java.util.Arrays;
23+
24+
/**
25+
* Built-in `CustomMetric` that sums up metric values. Note that please extend this class
26+
* and override `name` and `description` to create your custom metric for real usage.
27+
*
28+
* @since 3.2.0
29+
*/
30+
@Evolving
31+
public abstract class CustomSumMetric implements CustomMetric {
32+
@Override
33+
public String aggregateTaskMetrics(long[] taskMetrics) {
34+
return String.valueOf(Arrays.stream(taskMetrics).sum());
35+
}
36+
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java renamed to sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.connector;
18+
package org.apache.spark.sql.connector.metric;
1919

2020
import org.apache.spark.annotation.Evolving;
2121
import org.apache.spark.sql.connector.read.PartitionReader;
@@ -34,13 +34,13 @@
3434
*/
3535
@Evolving
3636
public interface CustomTaskMetric {
37-
/**
38-
* Returns the name of custom task metric.
39-
*/
40-
String name();
37+
/**
38+
* Returns the name of custom task metric.
39+
*/
40+
String name();
4141

42-
/**
43-
* Returns the long value of custom task metric.
44-
*/
45-
long value();
42+
/**
43+
* Returns the long value of custom task metric.
44+
*/
45+
long value();
4646
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.io.IOException;
2222

2323
import org.apache.spark.annotation.Evolving;
24-
import org.apache.spark.sql.connector.CustomTaskMetric;
24+
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
2525

2626
/**
2727
* A partition reader returned by {@link PartitionReaderFactory#createReader(InputPartition)} or

sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.connector.read;
1919

2020
import org.apache.spark.annotation.Evolving;
21-
import org.apache.spark.sql.connector.CustomMetric;
21+
import org.apache.spark.sql.connector.metric.CustomMetric;
2222
import org.apache.spark.sql.connector.read.streaming.ContinuousStream;
2323
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
2424
import org.apache.spark.sql.types.StructType;

sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala

Lines changed: 1 addition & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.metric
1919

20-
import java.text.NumberFormat
21-
import java.util.Locale
22-
23-
import org.apache.spark.sql.connector.CustomMetric
20+
import org.apache.spark.sql.connector.metric.CustomMetric
2421

2522
object CustomMetrics {
2623
private[spark] val V2_CUSTOM = "v2Custom"
@@ -45,31 +42,3 @@ object CustomMetrics {
4542
}
4643
}
4744
}
48-
49-
/**
50-
* Built-in `CustomMetric` that sums up metric values. Note that please extend this class
51-
* and override `name` and `description` to create your custom metric for real usage.
52-
*/
53-
abstract class CustomSumMetric extends CustomMetric {
54-
55-
override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
56-
taskMetrics.sum.toString
57-
}
58-
}
59-
60-
/**
61-
* Built-in `CustomMetric` that computes average of metric values. Note that please extend this
62-
* class and override `name` and `description` to create your custom metric for real usage.
63-
*/
64-
abstract class CustomAvgMetric extends CustomMetric {
65-
66-
override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
67-
val average = if (taskMetrics.isEmpty) {
68-
0.0
69-
} else {
70-
taskMetrics.sum.toDouble / taskMetrics.length
71-
}
72-
val numberFormat = NumberFormat.getNumberInstance(Locale.US)
73-
numberFormat.format(average)
74-
}
75-
}

sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.concurrent.duration._
2424

2525
import org.apache.spark.SparkContext
2626
import org.apache.spark.scheduler.AccumulableInfo
27-
import org.apache.spark.sql.connector.CustomMetric
27+
import org.apache.spark.sql.connector.metric.CustomMetric
2828
import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
2929
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
3030

0 commit comments

Comments
 (0)