Skip to content

Commit 3d4fc7e

Browse files
committed
Merge remote-tracking branch 'origin/master' into pr19788_server
2 parents 401bddb + 6d9c54b commit 3d4fc7e

File tree

113 files changed

+611
-749
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

113 files changed

+611
-749
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.slf4j.Logger;
3535
import org.slf4j.LoggerFactory;
3636

37+
import org.apache.spark.internal.config.package$;
3738
import org.apache.spark.Partitioner;
3839
import org.apache.spark.ShuffleDependency;
3940
import org.apache.spark.SparkConf;
@@ -104,7 +105,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
104105
SparkConf conf,
105106
ShuffleWriteMetricsReporter writeMetrics) {
106107
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
107-
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
108+
this.fileBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
108109
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
109110
this.blockManager = blockManager;
110111
final ShuffleDependency<K, V, V> dep = handle.dependency();

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
129129
(int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
130130
this.writeMetrics = writeMetrics;
131131
this.inMemSorter = new ShuffleInMemorySorter(
132-
this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));
132+
this, initialSize, (boolean) conf.get(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT()));
133133
this.peakMemoryUsedBytes = getMemoryUsage();
134134
this.diskWriteBufferSize =
135135
(int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
import org.apache.spark.*;
3939
import org.apache.spark.annotation.Private;
40+
import org.apache.spark.internal.config.package$;
4041
import org.apache.spark.io.CompressionCodec;
4142
import org.apache.spark.io.CompressionCodec$;
4243
import org.apache.spark.io.NioBufferedFileInputStream;
@@ -55,7 +56,6 @@
5556
import org.apache.spark.storage.TimeTrackingOutputStream;
5657
import org.apache.spark.unsafe.Platform;
5758
import org.apache.spark.util.Utils;
58-
import org.apache.spark.internal.config.package$;
5959

6060
@Private
6161
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@@ -143,8 +143,8 @@ public UnsafeShuffleWriter(
143143
this.taskContext = taskContext;
144144
this.sparkConf = sparkConf;
145145
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
146-
this.initialSortBufferSize = sparkConf.getInt("spark.shuffle.sort.initialBufferSize",
147-
DEFAULT_INITIAL_SORT_BUFFER_SIZE);
146+
this.initialSortBufferSize =
147+
(int) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE());
148148
this.inputBufferSizeInBytes =
149149
(int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
150150
this.outputBufferSizeInBytes =
@@ -282,10 +282,10 @@ void forceSorterToSpill() throws IOException {
282282
* @return the partition lengths in the merged file.
283283
*/
284284
private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
285-
final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
285+
final boolean compressionEnabled = (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_COMPRESS());
286286
final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
287287
final boolean fastMergeEnabled =
288-
sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
288+
(boolean) sparkConf.get(package$.MODULE$.SHUFFLE_UNDAFE_FAST_MERGE_ENABLE());
289289
final boolean fastMergeIsSupported = !compressionEnabled ||
290290
CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);
291291
final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -854,7 +854,7 @@ public long getPeakMemoryUsedBytes() {
854854
/**
855855
* Returns the average number of probes per key lookup.
856856
*/
857-
public double getAverageProbesPerLookup() {
857+
public double getAvgHashProbeBucketListIterations() {
858858
return (1.0 * numProbes) / numKeyLookups;
859859
}
860860

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
import com.google.common.io.Closeables;
2222
import org.apache.spark.SparkEnv;
2323
import org.apache.spark.TaskContext;
24+
import org.apache.spark.internal.config.package$;
25+
import org.apache.spark.internal.config.ConfigEntry;
2426
import org.apache.spark.io.NioBufferedFileInputStream;
2527
import org.apache.spark.io.ReadAheadInputStream;
2628
import org.apache.spark.serializer.SerializerManager;
2729
import org.apache.spark.storage.BlockId;
2830
import org.apache.spark.unsafe.Platform;
29-
import org.slf4j.Logger;
30-
import org.slf4j.LoggerFactory;
3131

3232
import java.io.*;
3333

@@ -36,9 +36,7 @@
3636
* of the file format).
3737
*/
3838
public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable {
39-
private static final Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class);
40-
private static final int DEFAULT_BUFFER_SIZE_BYTES = 1024 * 1024; // 1 MB
41-
private static final int MAX_BUFFER_SIZE_BYTES = 16777216; // 16 mb
39+
public static final int MAX_BUFFER_SIZE_BYTES = 16777216; // 16 mb
4240

4341
private InputStream in;
4442
private DataInputStream din;
@@ -59,28 +57,23 @@ public UnsafeSorterSpillReader(
5957
File file,
6058
BlockId blockId) throws IOException {
6159
assert (file.length() > 0);
62-
long bufferSizeBytes =
63-
SparkEnv.get() == null ?
64-
DEFAULT_BUFFER_SIZE_BYTES:
65-
SparkEnv.get().conf().getSizeAsBytes("spark.unsafe.sorter.spill.reader.buffer.size",
66-
DEFAULT_BUFFER_SIZE_BYTES);
67-
if (bufferSizeBytes > MAX_BUFFER_SIZE_BYTES || bufferSizeBytes < DEFAULT_BUFFER_SIZE_BYTES) {
68-
// fall back to a sane default value
69-
logger.warn("Value of config \"spark.unsafe.sorter.spill.reader.buffer.size\" = {} not in " +
70-
"allowed range [{}, {}). Falling back to default value : {} bytes", bufferSizeBytes,
71-
DEFAULT_BUFFER_SIZE_BYTES, MAX_BUFFER_SIZE_BYTES, DEFAULT_BUFFER_SIZE_BYTES);
72-
bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
73-
}
60+
final ConfigEntry<Object> bufferSizeConfigEntry =
61+
package$.MODULE$.UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE();
62+
// This value must be less than or equal to MAX_BUFFER_SIZE_BYTES. Cast to int is always safe.
63+
final int DEFAULT_BUFFER_SIZE_BYTES =
64+
((Long) bufferSizeConfigEntry.defaultValue().get()).intValue();
65+
int bufferSizeBytes = SparkEnv.get() == null ? DEFAULT_BUFFER_SIZE_BYTES :
66+
((Long) SparkEnv.get().conf().get(bufferSizeConfigEntry)).intValue();
7467

75-
final boolean readAheadEnabled = SparkEnv.get() != null &&
76-
SparkEnv.get().conf().getBoolean("spark.unsafe.sorter.spill.read.ahead.enabled", true);
68+
final boolean readAheadEnabled = SparkEnv.get() != null && (boolean)SparkEnv.get().conf().get(
69+
package$.MODULE$.UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED());
7770

7871
final InputStream bs =
79-
new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
72+
new NioBufferedFileInputStream(file, bufferSizeBytes);
8073
try {
8174
if (readAheadEnabled) {
8275
this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs),
83-
(int) bufferSizeBytes);
76+
bufferSizeBytes);
8477
} else {
8578
this.in = serializerManager.wrapStream(blockId, bs);
8679
}

core/src/main/resources/org/apache/spark/ui/static/executorspage.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ $(document).ready(function () {
114114

115115
var endPoint = createRESTEndPointForExecutorsPage(appId);
116116
$.getJSON(endPoint, function (response, status, jqXHR) {
117-
var summary = [];
118117
var allExecCnt = 0;
119118
var allRDDBlocks = 0;
120119
var allMemoryUsed = 0;
@@ -505,7 +504,7 @@ $(document).ready(function () {
505504
{data: 'allTotalTasks'},
506505
{
507506
data: function (row, type) {
508-
return type === 'display' ? (formatDuration(row.allTotalDuration, type) + ' (' + formatDuration(row.allTotalGCTime, type) + ')') : row.allTotalDuration
507+
return type === 'display' ? (formatDuration(row.allTotalDuration) + ' (' + formatDuration(row.allTotalGCTime) + ')') : row.allTotalDuration
509508
},
510509
"fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
511510
if (oData.allTotalDuration > 0) {

core/src/main/resources/org/apache/spark/ui/static/historypage.js

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,20 +103,20 @@ $(document).ready(function() {
103103
pageLength: 20
104104
});
105105

106-
historySummary = $("#history-summary");
107-
searchString = historySummary["context"]["location"]["search"];
108-
requestedIncomplete = getParameterByName("showIncomplete", searchString);
106+
var historySummary = $("#history-summary");
107+
var searchString = historySummary["context"]["location"]["search"];
108+
var requestedIncomplete = getParameterByName("showIncomplete", searchString);
109109
requestedIncomplete = (requestedIncomplete == "true" ? true : false);
110110

111-
appParams = {
111+
var appParams = {
112112
limit: appLimit,
113113
status: (requestedIncomplete ? "running" : "completed")
114114
};
115115

116116
$.getJSON(uiRoot + "/api/v1/applications", appParams, function(response,status,jqXHR) {
117117
var array = [];
118118
var hasMultipleAttempts = false;
119-
for (i in response) {
119+
for (var i in response) {
120120
var app = response[i];
121121
if (app["attempts"][0]["completed"] == requestedIncomplete) {
122122
continue; // if we want to show for Incomplete, we skip the completed apps; otherwise skip incomplete ones.
@@ -127,7 +127,7 @@ $(document).ready(function() {
127127
hasMultipleAttempts = true;
128128
}
129129
var num = app["attempts"].length;
130-
for (j in app["attempts"]) {
130+
for (var j in app["attempts"]) {
131131
var attempt = app["attempts"][j];
132132
attempt["startTime"] = formatTimeMillis(attempt["startTimeEpoch"]);
133133
attempt["endTime"] = formatTimeMillis(attempt["endTimeEpoch"]);
@@ -149,15 +149,15 @@ $(document).ready(function() {
149149
"applications": array,
150150
"hasMultipleAttempts": hasMultipleAttempts,
151151
"showCompletedColumns": !requestedIncomplete,
152-
}
152+
};
153153

154154
$.get(uiRoot + "/static/historypage-template.html", function(template) {
155155
var sibling = historySummary.prev();
156156
historySummary.detach();
157157
var apps = $(Mustache.render($(template).filter("#history-summary-template").html(),data));
158158
var attemptIdColumnName = 'attemptId';
159159
var startedColumnName = 'started';
160-
var defaultSortColumn = completedColumnName = 'completed';
160+
var completedColumnName = 'completed';
161161
var durationColumnName = 'duration';
162162
var conf = {
163163
"columns": [

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ function renderDagVizForJob(svgContainer) {
220220
} else {
221221
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
222222
// Use the link from the stage table so it also works for the history server
223-
var attemptId = 0
223+
var attemptId = 0;
224224
var stageLink = d3.select("#stage-" + stageId + "-" + attemptId)
225225
.select("a.name-link")
226226
.attr("href");
@@ -236,7 +236,7 @@ function renderDagVizForJob(svgContainer) {
236236
// existing ones, taking into account the position and width of the last stage's
237237
// container. We do not need to do this for the first stage of this job.
238238
if (i > 0) {
239-
var existingStages = svgContainer.selectAll("g.cluster.stage")
239+
var existingStages = svgContainer.selectAll("g.cluster.stage");
240240
if (!existingStages.empty()) {
241241
var lastStage = d3.select(existingStages[0].pop());
242242
var lastStageWidth = toFloat(lastStage.select("rect").attr("width"));
@@ -369,8 +369,8 @@ function resizeSvg(svg) {
369369
* here this function is to enable line break.
370370
*/
371371
function interpretLineBreak(svg) {
372-
var allTSpan = svg.selectAll("tspan").each(function() {
373-
node = d3.select(this);
372+
svg.selectAll("tspan").each(function() {
373+
var node = d3.select(this);
374374
var original = node[0][0].innerHTML;
375375
if (original.indexOf("\\n") != -1) {
376376
var arr = original.split("\\n");

core/src/main/resources/org/apache/spark/ui/static/stagepage.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,6 @@ function reselectCheckboxesBasedOnTaskTableState() {
263263

264264
function getStageAttemptId() {
265265
var words = document.baseURI.split('?');
266-
var attemptIdStr = words[1].split('&')[1];
267266
var digitsRegex = /[0-9]+/;
268267
// We are using regex here to extract the stage attempt id as there might be certain url's with format
269268
// like /proxy/application_1539986433979_27115/stages/stage/?id=0&attempt=0#tasksTitle
@@ -433,7 +432,7 @@ $(document).ready(function () {
433432
"oLanguage": {
434433
"sEmptyTable": "No data to show yet"
435434
}
436-
}
435+
};
437436
var executorSummaryTableSelector =
438437
$("#summary-executor-table").DataTable(executorSummaryConf);
439438
$('#parent-container [data-toggle="tooltip"]').tooltip();
@@ -612,7 +611,7 @@ $(document).ready(function () {
612611
"searching": false,
613612
"order": [[0, "asc"]],
614613
"bAutoWidth": false
615-
}
614+
};
616615
$("#accumulator-table").DataTable(accumulatorConf);
617616

618617
// building tasks table that uses server side functionality

core/src/main/resources/org/apache/spark/ui/static/table.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ function onSearchStringChange() {
8989
if($(this).attr('id') && $(this).attr('id').match(/thread_[0-9]+_tr/) ) {
9090
var children = $(this).children()
9191
var found = false
92-
for (i = 0; i < children.length; i++) {
92+
for (var i = 0; i < children.length; i++) {
9393
if (children.eq(i).text().toLowerCase().indexOf(searchString) >= 0) {
9494
found = true
9595
}

0 commit comments

Comments
 (0)