Skip to content

Commit 8f40a0c

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into SPARK-27495
2 parents 954ba00 + cfb1706 commit 8f40a0c

File tree

408 files changed

+14759
-8387
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

408 files changed

+14759
-8387
lines changed

.github/workflows/master.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,13 @@ jobs:
6666
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=1g -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN"
6767
export MAVEN_CLI_OPTS="--no-transfer-progress"
6868
mkdir -p ~/.m2
69+
# `Maven Central` is too flaky in terms of downloading artifacts in `GitHub Action` environment.
70+
# `Google Maven Central Mirror` is too slow in terms of sycing upstream. To get the best combination,
71+
# 1) we set `Google Maven Central` as a mirror of `central` in `GitHub Action` environment only.
72+
# 2) we duplicates `Maven Central` in pom.xml with ID `central_without_mirror`.
73+
# In other words, in GitHub Action environment, `central` is mirrored by `Google Maven Central` first.
74+
# If `Google Maven Central` doesn't provide the artifact due to its slowness, `central_without_mirror` will be used.
75+
# Note that we aim to achieve the above while keeping the existing behavior of non-`GitHub Action` environment unchanged.
6976
echo "<settings><mirrors><mirror><id>google-maven-central</id><name>GCS Maven Central mirror</name><url>https://maven-central.storage-download.googleapis.com/repos/central/data/</url><mirrorOf>central</mirrorOf></mirror></mirrors></settings>" > ~/.m2/settings.xml
7077
./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Phive -P${{ matrix.hive }} -Phive-thriftserver -P${{ matrix.hadoop }} -Phadoop-cloud -Djava.version=${{ matrix.java }} install
7178
rm -rf ~/.m2/repository/org/apache/spark

R/pkg/tests/fulltests/test_mllib_recommendation.R

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ test_that("spark.als", {
3131
stats <- summary(model)
3232
expect_equal(stats$rank, 10)
3333
test <- createDataFrame(list(list(0, 2), list(1, 0), list(2, 0)), c("user", "item"))
34-
predictions <- collect(predict(model, test))
34+
result <- predict(model, test)
35+
predictions <- collect(arrange(result, desc(result$item), result$user))
3536

3637
expect_equal(predictions$prediction, c(0.6324540, 3.6218479, -0.4568263),
3738
tolerance = 1e-4)

README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ and Structured Streaming for stream processing.
99

1010
<https://spark.apache.org/>
1111

12-
[![Jenkins Build](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/badge/icon)](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7)
12+
[![Jenkins Build](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7-hive-2.3/badge/icon)](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7-hive-2.3)
1313
[![AppVeyor Build](https://img.shields.io/appveyor/ci/ApacheSoftwareFoundation/spark/master.svg?style=plastic&logo=appveyor)](https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark)
1414
[![PySpark Coverage](https://img.shields.io/badge/dynamic/xml.svg?label=pyspark%20coverage&url=https%3A%2F%2Fspark-test.github.io%2Fpyspark-coverage-site&query=%2Fhtml%2Fbody%2Fdiv%5B1%5D%2Fdiv%2Fh1%2Fspan&colorB=brightgreen&style=plastic)](https://spark-test.github.io/pyspark-coverage-site)
1515

@@ -29,7 +29,6 @@ To build Spark and its example programs, run:
2929

3030
(You do not need to do this if you downloaded a pre-built package.)
3131

32-
You can build Spark using more than one thread by using the -T option with Maven, see ["Parallel builds in Maven 3"](https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3).
3332
More detailed documentation is available from the project site, at
3433
["Building Spark"](https://spark.apache.org/docs/latest/building-spark.html).
3534

common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,21 +117,35 @@ public static Pair<Long, Integer> parseStreamChunkId(String streamChunkId) {
117117

118118
@Override
119119
public void connectionTerminated(Channel channel) {
120+
RuntimeException failedToReleaseBufferException = null;
121+
120122
// Close all streams which have been associated with the channel.
121123
for (Map.Entry<Long, StreamState> entry: streams.entrySet()) {
122124
StreamState state = entry.getValue();
123125
if (state.associatedChannel == channel) {
124126
streams.remove(entry.getKey());
125127

126-
// Release all remaining buffers.
127-
while (state.buffers.hasNext()) {
128-
ManagedBuffer buffer = state.buffers.next();
129-
if (buffer != null) {
130-
buffer.release();
128+
try {
129+
// Release all remaining buffers.
130+
while (state.buffers.hasNext()) {
131+
ManagedBuffer buffer = state.buffers.next();
132+
if (buffer != null) {
133+
buffer.release();
134+
}
135+
}
136+
} catch (RuntimeException e) {
137+
if (failedToReleaseBufferException == null) {
138+
failedToReleaseBufferException = e;
139+
} else {
140+
logger.error("Exception trying to release remaining StreamState buffers", e);
131141
}
132142
}
133143
}
134144
}
145+
146+
if (failedToReleaseBufferException != null) {
147+
throw failedToReleaseBufferException;
148+
}
135149
}
136150

137151
@Override

common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,12 @@ public int numConnectionsPerPeer() {
108108
return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1);
109109
}
110110

111-
/** Requested maximum length of the queue of incoming connections. Default is 64. */
112-
public int backLog() { return conf.getInt(SPARK_NETWORK_IO_BACKLOG_KEY, 64); }
111+
/**
112+
* Requested maximum length of the queue of incoming connections. If &lt; 1,
113+
* the default Netty value of {@link io.netty.util.NetUtil#SOMAXCONN} will be used.
114+
* Default to -1.
115+
*/
116+
public int backLog() { return conf.getInt(SPARK_NETWORK_IO_BACKLOG_KEY, -1); }
113117

114118
/** Number of threads used in the server thread pool. Default to 0, which is 2x#cores. */
115119
public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); }

common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.network.server;
1919

2020
import java.util.ArrayList;
21+
import java.util.Iterator;
2122
import java.util.List;
2223

2324
import io.netty.channel.Channel;
@@ -96,4 +97,42 @@ public void managedBuffersAreFreedWhenConnectionIsClosed() {
9697
Mockito.verify(buffer2, Mockito.times(1)).release();
9798
Assert.assertEquals(0, manager.numStreamStates());
9899
}
100+
101+
@Test
102+
public void streamStatesAreFreedWhenConnectionIsClosedEvenIfBufferIteratorThrowsException() {
103+
OneForOneStreamManager manager = new OneForOneStreamManager();
104+
105+
Iterator<ManagedBuffer> buffers = Mockito.mock(Iterator.class);
106+
Mockito.when(buffers.hasNext()).thenReturn(true);
107+
Mockito.when(buffers.next()).thenThrow(RuntimeException.class);
108+
109+
ManagedBuffer mockManagedBuffer = Mockito.mock(ManagedBuffer.class);
110+
111+
Iterator<ManagedBuffer> buffers2 = Mockito.mock(Iterator.class);
112+
Mockito.when(buffers2.hasNext()).thenReturn(true).thenReturn(true);
113+
Mockito.when(buffers2.next()).thenReturn(mockManagedBuffer).thenThrow(RuntimeException.class);
114+
115+
Channel dummyChannel = Mockito.mock(Channel.class, Mockito.RETURNS_SMART_NULLS);
116+
manager.registerStream("appId", buffers, dummyChannel);
117+
manager.registerStream("appId", buffers2, dummyChannel);
118+
119+
Assert.assertEquals(2, manager.numStreamStates());
120+
121+
try {
122+
manager.connectionTerminated(dummyChannel);
123+
Assert.fail("connectionTerminated should throw exception when fails to release all buffers");
124+
125+
} catch (RuntimeException e) {
126+
127+
Mockito.verify(buffers, Mockito.times(1)).hasNext();
128+
Mockito.verify(buffers, Mockito.times(1)).next();
129+
130+
Mockito.verify(buffers2, Mockito.times(2)).hasNext();
131+
Mockito.verify(buffers2, Mockito.times(2)).next();
132+
133+
Mockito.verify(mockManagedBuffer, Mockito.times(1)).release();
134+
135+
Assert.assertEquals(0, manager.numStreamStates());
136+
}
137+
}
99138
}

common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java

Lines changed: 17 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.unsafe.types;
1919

20+
import org.apache.spark.annotation.Unstable;
21+
2022
import java.io.Serializable;
2123
import java.math.BigDecimal;
2224
import java.time.Duration;
@@ -27,9 +29,22 @@
2729
import static org.apache.spark.sql.catalyst.util.DateTimeConstants.*;
2830

2931
/**
30-
* The internal representation of interval type.
32+
* The class representing calendar intervals. The calendar interval is stored internally in
33+
* three components:
34+
* <ul>
35+
* <li>an integer value representing the number of `months` in this interval,</li>
36+
* <li>an integer value representing the number of `days` in this interval,</li>
37+
* <li>a long value representing the number of `microseconds` in this interval.</li>
38+
* </ul>
39+
*
40+
* The `months` and `days` are not units of time with a constant length (unlike hours, seconds), so
41+
* they are two separated fields from microseconds. One month may be equal to 28, 29, 30 or 31 days
42+
* and one day may be equal to 23, 24 or 25 hours (daylight saving).
43+
*
44+
* @since 3.0.0
3145
*/
32-
public final class CalendarInterval implements Serializable, Comparable<CalendarInterval> {
46+
@Unstable
47+
public final class CalendarInterval implements Serializable {
3348
public final int months;
3449
public final int days;
3550
public final long microseconds;
@@ -59,29 +74,6 @@ public int hashCode() {
5974
return Objects.hash(months, days, microseconds);
6075
}
6176

62-
@Override
63-
public int compareTo(CalendarInterval that) {
64-
long thisAdjustDays =
65-
this.microseconds / MICROS_PER_DAY + this.days + this.months * DAYS_PER_MONTH;
66-
long thatAdjustDays =
67-
that.microseconds / MICROS_PER_DAY + that.days + that.months * DAYS_PER_MONTH;
68-
long daysDiff = thisAdjustDays - thatAdjustDays;
69-
if (daysDiff == 0) {
70-
long msDiff = (this.microseconds % MICROS_PER_DAY) - (that.microseconds % MICROS_PER_DAY);
71-
if (msDiff == 0) {
72-
return 0;
73-
} else if (msDiff > 0) {
74-
return 1;
75-
} else {
76-
return -1;
77-
}
78-
} else if (daysDiff > 0){
79-
return 1;
80-
} else {
81-
return -1;
82-
}
83-
}
84-
8577
@Override
8678
public String toString() {
8779
if (months == 0 && days == 0 && microseconds == 0) {
@@ -133,16 +125,4 @@ private void appendUnit(StringBuilder sb, long value, String unit) {
133125
* @throws ArithmeticException if a numeric overflow occurs
134126
*/
135127
public Duration extractAsDuration() { return Duration.of(microseconds, ChronoUnit.MICROS); }
136-
137-
/**
138-
* A constant holding the minimum value an {@code CalendarInterval} can have.
139-
*/
140-
public static CalendarInterval MIN_VALUE =
141-
new CalendarInterval(Integer.MIN_VALUE, Integer.MIN_VALUE, Long.MIN_VALUE);
142-
143-
/**
144-
* A constant holding the maximum value an {@code CalendarInterval} can have.
145-
*/
146-
public static CalendarInterval MAX_VALUE =
147-
new CalendarInterval(Integer.MAX_VALUE, Integer.MAX_VALUE, Long.MAX_VALUE);
148128
}

common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,6 +1294,52 @@ public boolean toByte(IntWrapper intWrapper) {
12941294
return false;
12951295
}
12961296

1297+
/**
1298+
* Parses UTF8String(trimmed if needed) to long. This method is used when ANSI is enabled.
1299+
*
1300+
* @return If string contains valid numeric value then it returns the long value otherwise a
1301+
* NumberFormatException is thrown.
1302+
*/
1303+
public long toLongExact() {
1304+
LongWrapper result = new LongWrapper();
1305+
if (toLong(result)) {
1306+
return result.value;
1307+
}
1308+
throw new NumberFormatException("invalid input syntax for type numeric: " + this);
1309+
}
1310+
1311+
/**
1312+
* Parses UTF8String(trimmed if needed) to int. This method is used when ANSI is enabled.
1313+
*
1314+
* @return If string contains valid numeric value then it returns the int value otherwise a
1315+
* NumberFormatException is thrown.
1316+
*/
1317+
public int toIntExact() {
1318+
IntWrapper result = new IntWrapper();
1319+
if (toInt(result)) {
1320+
return result.value;
1321+
}
1322+
throw new NumberFormatException("invalid input syntax for type numeric: " + this);
1323+
}
1324+
1325+
public short toShortExact() {
1326+
int value = this.toIntExact();
1327+
short result = (short) value;
1328+
if (result == value) {
1329+
return result;
1330+
}
1331+
throw new NumberFormatException("invalid input syntax for type numeric: " + this);
1332+
}
1333+
1334+
public byte toByteExact() {
1335+
int value = this.toIntExact();
1336+
byte result = (byte) value;
1337+
if (result == value) {
1338+
return result;
1339+
}
1340+
throw new NumberFormatException("invalid input syntax for type numeric: " + this);
1341+
}
1342+
12971343
@Override
12981344
public String toString() {
12991345
return new String(getBytes(), StandardCharsets.UTF_8);

common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,36 @@ public void equalsTest() {
4646
assertEquals(i1, i6);
4747
}
4848

49+
@Test
50+
public void toStringTest() {
51+
CalendarInterval i;
52+
53+
i = new CalendarInterval(0, 0, 0);
54+
assertEquals("0 seconds", i.toString());
55+
56+
i = new CalendarInterval(34, 0, 0);
57+
assertEquals("2 years 10 months", i.toString());
58+
59+
i = new CalendarInterval(-34, 0, 0);
60+
assertEquals("-2 years -10 months", i.toString());
61+
62+
i = new CalendarInterval(0, 31, 0);
63+
assertEquals("31 days", i.toString());
64+
65+
i = new CalendarInterval(0, -31, 0);
66+
assertEquals("-31 days", i.toString());
67+
68+
i = new CalendarInterval(0, 0, 3 * MICROS_PER_HOUR + 13 * MICROS_PER_MINUTE + 123);
69+
assertEquals("3 hours 13 minutes 0.000123 seconds", i.toString());
70+
71+
i = new CalendarInterval(0, 0, -3 * MICROS_PER_HOUR - 13 * MICROS_PER_MINUTE - 123);
72+
assertEquals("-3 hours -13 minutes -0.000123 seconds", i.toString());
73+
74+
i = new CalendarInterval(34, 31, 3 * MICROS_PER_HOUR + 13 * MICROS_PER_MINUTE + 123);
75+
assertEquals("2 years 10 months 31 days 3 hours 13 minutes 0.000123 seconds",
76+
i.toString());
77+
}
78+
4979
@Test
5080
public void periodAndDurationTest() {
5181
CalendarInterval interval = new CalendarInterval(120, -40, 123456);

0 commit comments

Comments
 (0)