Skip to content

Commit 68ee277

Browse files
committed
Merge remote-tracking branch 'upstream/master' into unresolved_table_or_view
2 parents b97c925 + fdd6c73 commit 68ee277

File tree

52 files changed

+374
-134
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+374
-134
lines changed

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ public void onFailure(Throwable t) {
147147
* @param blockIds block ids to be pushed
148148
* @param buffers buffers to be pushed
149149
* @param listener the listener to receive block push status.
150+
*
151+
* @since 3.1.0
150152
*/
151153
public void pushBlocks(
152154
String host,
@@ -156,4 +158,24 @@ public void pushBlocks(
156158
BlockFetchingListener listener) {
157159
throw new UnsupportedOperationException();
158160
}
161+
162+
/**
163+
* Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge
164+
* for a given shuffle. This allows the driver to start the shuffle reducer stage after properly
165+
* finishing the shuffle merge process associated with the shuffle mapper stage.
166+
*
167+
* @param host host of shuffle server
168+
* @param port port of shuffle server.
169+
* @param shuffleId shuffle ID of the shuffle to be finalized
170+
* @param listener the listener to receive MergeStatuses
171+
*
172+
* @since 3.1.0
173+
*/
174+
public void finalizeShuffleMerge(
175+
String host,
176+
int port,
177+
int shuffleId,
178+
MergeFinalizerListener listener) {
179+
throw new UnsupportedOperationException();
180+
}
159181
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,35 @@ public void pushBlocks(
158158
}
159159
}
160160

161+
@Override
162+
public void finalizeShuffleMerge(
163+
String host,
164+
int port,
165+
int shuffleId,
166+
MergeFinalizerListener listener) {
167+
checkInit();
168+
try {
169+
TransportClient client = clientFactory.createClient(host, port);
170+
ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer();
171+
client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
172+
@Override
173+
public void onSuccess(ByteBuffer response) {
174+
listener.onShuffleMergeSuccess(
175+
(MergeStatuses) BlockTransferMessage.Decoder.fromByteBuffer(response));
176+
}
177+
178+
@Override
179+
public void onFailure(Throwable e) {
180+
listener.onShuffleMergeFailure(e);
181+
}
182+
});
183+
} catch (Exception e) {
184+
logger.error("Exception while sending finalizeShuffleMerge request to {}:{}",
185+
host, port, e);
186+
listener.onShuffleMergeFailure(e);
187+
}
188+
}
189+
161190
@Override
162191
public MetricSet shuffleMetrics() {
163192
checkInit();
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.shuffle;
19+
20+
import java.util.EventListener;
21+
22+
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
23+
24+
/**
25+
* :: DeveloperApi ::
26+
*
27+
* Listener providing a callback function to invoke when driver receives the response for the
28+
* finalize shuffle merge request sent to remote shuffle service.
29+
*
30+
* @since 3.1.0
31+
*/
32+
public interface MergeFinalizerListener extends EventListener {
33+
/**
34+
* Called once upon successful response on finalize shuffle merge on a remote shuffle service.
35+
* The returned {@link MergeStatuses} is passed to the listener for further processing
36+
*/
37+
void onShuffleMergeSuccess(MergeStatuses statuses);
38+
39+
/**
40+
* Called once upon failure response on finalize shuffle merge on a remote shuffle service.
41+
*/
42+
void onShuffleMergeFailure(Throwable e);
43+
}

core/src/main/scala/org/apache/spark/storage/StorageUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ private[spark] class StorageStatus(
169169
.getOrElse((0L, 0L))
170170
case _ if !level.useOffHeap =>
171171
(_nonRddStorageInfo.onHeapUsage, _nonRddStorageInfo.diskUsage)
172-
case _ if level.useOffHeap =>
172+
case _ =>
173173
(_nonRddStorageInfo.offHeapUsage, _nonRddStorageInfo.diskUsage)
174174
}
175175
val newMem = math.max(oldMem + changeInMem, 0L)

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -757,15 +757,15 @@ private[spark] object JsonProtocol {
757757

758758
def taskResourceRequestMapFromJson(json: JValue): Map[String, TaskResourceRequest] = {
759759
val jsonFields = json.asInstanceOf[JObject].obj
760-
jsonFields.map { case JField(k, v) =>
760+
jsonFields.collect { case JField(k, v) =>
761761
val req = taskResourceRequestFromJson(v)
762762
(k, req)
763763
}.toMap
764764
}
765765

766766
def executorResourceRequestMapFromJson(json: JValue): Map[String, ExecutorResourceRequest] = {
767767
val jsonFields = json.asInstanceOf[JObject].obj
768-
jsonFields.map { case JField(k, v) =>
768+
jsonFields.collect { case JField(k, v) =>
769769
val req = executorResourceRequestFromJson(v)
770770
(k, req)
771771
}.toMap
@@ -1229,7 +1229,7 @@ private[spark] object JsonProtocol {
12291229

12301230
def resourcesMapFromJson(json: JValue): Map[String, ResourceInformation] = {
12311231
val jsonFields = json.asInstanceOf[JObject].obj
1232-
jsonFields.map { case JField(k, v) =>
1232+
jsonFields.collect { case JField(k, v) =>
12331233
val resourceInfo = ResourceInformation.parseJson(v)
12341234
(k, resourceInfo)
12351235
}.toMap
@@ -1241,7 +1241,7 @@ private[spark] object JsonProtocol {
12411241

12421242
def mapFromJson(json: JValue): Map[String, String] = {
12431243
val jsonFields = json.asInstanceOf[JObject].obj
1244-
jsonFields.map { case JField(k, JString(v)) => (k, v) }.toMap
1244+
jsonFields.collect { case JField(k, JString(v)) => (k, v) }.toMap
12451245
}
12461246

12471247
def propertiesFromJson(json: JValue): Properties = {

docs/_config.yml

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,20 @@ SCALA_VERSION: "2.12.10"
2626
MESOS_VERSION: 1.0.0
2727
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
2828
SPARK_GITHUB_URL: https://github.com/apache/spark
29-
# Before a new release, we should apply a new `apiKey` for the new Spark documentation
30-
# on https://docsearch.algolia.com/. Otherwise, after release, the search results are always based
31-
# on the latest documentation(https://spark.apache.org/docs/latest/) even when visiting the
32-
# documentation of previous releases.
29+
# Before a new release, we should:
30+
# 1. update the `version` array for the new Spark documentation
31+
# on https://github.com/algolia/docsearch-configs/blob/master/configs/apache_spark.json.
32+
# 2. update the value of `facetFilters.version` in `algoliaOptions` on the new release branch.
33+
# Otherwise, after release, the search results are always based on the latest documentation
34+
# (https://spark.apache.org/docs/latest/) even when visiting the documentation of previous releases.
3335
DOCSEARCH_SCRIPT: |
3436
docsearch({
3537
apiKey: 'b18ca3732c502995563043aa17bc6ecb',
3638
indexName: 'apache_spark',
3739
inputSelector: '#docsearch-input',
3840
enhancedSearchInput: true,
41+
algoliaOptions: {
42+
'facetFilters': ["version:latest"]
43+
},
3944
debug: false // Set debug to true if you want to inspect the dropdown
4045
});

mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,8 @@ private[spark] object BLAS extends Serializable {
302302
j += 1
303303
prevCol = col
304304
}
305+
case _ =>
306+
throw new IllegalArgumentException(s"spr doesn't support vector type ${v.getClass}.")
305307
}
306308
}
307309

mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ private[ml] object RFormulaParser extends RegexParsers {
286286

287287
private val pow: Parser[Term] = term ~ "^" ~ "^[1-9]\\d*".r ^^ {
288288
case base ~ "^" ~ degree => power(base, degree.toInt)
289+
case t => throw new IllegalArgumentException(s"Invalid term: $t")
289290
} | term
290291

291292
private val interaction: Parser[Term] = pow * (":" ^^^ { interact _ })
@@ -298,7 +299,10 @@ private[ml] object RFormulaParser extends RegexParsers {
298299
private val expr = (sum | term)
299300

300301
private val formula: Parser[ParsedRFormula] =
301-
(label ~ "~" ~ expr) ^^ { case r ~ "~" ~ t => ParsedRFormula(r, t.asTerms.terms) }
302+
(label ~ "~" ~ expr) ^^ {
303+
case r ~ "~" ~ t => ParsedRFormula(r, t.asTerms.terms)
304+
case t => throw new IllegalArgumentException(s"Invalid term: $t")
305+
}
302306

303307
def parse(value: String): ParsedRFormula = parseAll(formula, value) match {
304308
case Success(result, _) => result

mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,8 @@ object StandardScalerModel extends MLReadable[StandardScalerModel] {
314314
case SparseVector(size, indices, values) =>
315315
val newValues = transformSparseWithScale(scale, indices, values.clone())
316316
Vectors.sparse(size, indices, newValues)
317+
case v =>
318+
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
317319
}
318320

319321
case (false, false) =>

mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ private[ml] object JsonMatrixConverter {
7474
("values" -> values.toSeq) ~
7575
("isTransposed" -> isTransposed)
7676
compact(render(jValue))
77+
case _ =>
78+
throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.")
7779
}
7880
}
7981
}

0 commit comments

Comments
 (0)