Skip to content

Commit bdda06d

Browse files
committed
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-going-to-be-shutdown-r4
2 parents bfa06ce + a6a2748 commit bdda06d

File tree

165 files changed

+3758
-1084
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

165 files changed

+3758
-1084
lines changed

.github/workflows/master.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,19 @@ jobs:
2323

2424
steps:
2525
- uses: actions/checkout@master
26+
# We split caches because GitHub Action Cache has a 400MB-size limit.
27+
- uses: actions/cache@v1
28+
with:
29+
path: ~/.m2/repository/com
30+
key: ${{ matrix.java }}-${{ matrix.hadoop }}-maven-com-${{ hashFiles('**/pom.xml') }}
31+
restore-keys: |
32+
${{ matrix.java }}-${{ matrix.hadoop }}-maven-com-
33+
- uses: actions/cache@v1
34+
with:
35+
path: ~/.m2/repository/org
36+
key: ${{ matrix.java }}-${{ matrix.hadoop }}-maven-org-${{ hashFiles('**/pom.xml') }}
37+
restore-keys: |
38+
${{ matrix.java }}-${{ matrix.hadoop }}-maven-org-
2639
- name: Set up JDK ${{ matrix.java }}
2740
uses: actions/setup-java@v1
2841
with:
@@ -32,6 +45,7 @@ jobs:
3245
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=1g -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN"
3346
export MAVEN_CLI_OPTS="--no-transfer-progress"
3447
./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -P${{ matrix.hadoop }} -Phadoop-cloud -Djava.version=${{ matrix.java }} install
48+
rm -rf ~/.m2/repository/org/apache/spark
3549
3650
3751
lint:

common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
/**
3030
* The internal representation of interval type.
3131
*/
32-
public final class CalendarInterval implements Serializable {
32+
public final class CalendarInterval implements Serializable, Comparable<CalendarInterval> {
3333
public final int months;
3434
public final int days;
3535
public final long microseconds;
@@ -40,24 +40,6 @@ public CalendarInterval(int months, int days, long microseconds) {
4040
this.microseconds = microseconds;
4141
}
4242

43-
public CalendarInterval add(CalendarInterval that) {
44-
int months = this.months + that.months;
45-
int days = this.days + that.days;
46-
long microseconds = this.microseconds + that.microseconds;
47-
return new CalendarInterval(months, days, microseconds);
48-
}
49-
50-
public CalendarInterval subtract(CalendarInterval that) {
51-
int months = this.months - that.months;
52-
int days = this.days - that.days;
53-
long microseconds = this.microseconds - that.microseconds;
54-
return new CalendarInterval(months, days, microseconds);
55-
}
56-
57-
public CalendarInterval negate() {
58-
return new CalendarInterval(-this.months, -this.days, -this.microseconds);
59-
}
60-
6143
@Override
6244
public boolean equals(Object o) {
6345
if (this == o) return true;
@@ -73,6 +55,29 @@ public int hashCode() {
7355
return Objects.hash(months, days, microseconds);
7456
}
7557

58+
@Override
59+
public int compareTo(CalendarInterval that) {
60+
long thisAdjustDays =
61+
this.microseconds / MICROS_PER_DAY + this.days + this.months * DAYS_PER_MONTH;
62+
long thatAdjustDays =
63+
that.microseconds / MICROS_PER_DAY + that.days + that.months * DAYS_PER_MONTH;
64+
long daysDiff = thisAdjustDays - thatAdjustDays;
65+
if (daysDiff == 0) {
66+
long msDiff = (this.microseconds % MICROS_PER_DAY) - (that.microseconds % MICROS_PER_DAY);
67+
if (msDiff == 0) {
68+
return 0;
69+
} else if (msDiff > 0) {
70+
return 1;
71+
} else {
72+
return -1;
73+
}
74+
} else if (daysDiff > 0){
75+
return 1;
76+
} else {
77+
return -1;
78+
}
79+
}
80+
7681
@Override
7782
public String toString() {
7883
if (months == 0 && days == 0 && microseconds == 0) {

common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -76,28 +76,6 @@ public void toStringTest() {
7676
i.toString());
7777
}
7878

79-
@Test
80-
public void addTest() {
81-
CalendarInterval input1 = new CalendarInterval(3, 1, 1 * MICROS_PER_HOUR);
82-
CalendarInterval input2 = new CalendarInterval(2, 4, 100 * MICROS_PER_HOUR);
83-
assertEquals(new CalendarInterval(5, 5, 101 * MICROS_PER_HOUR), input1.add(input2));
84-
85-
input1 = new CalendarInterval(-10, -30, -81 * MICROS_PER_HOUR);
86-
input2 = new CalendarInterval(75, 150, 200 * MICROS_PER_HOUR);
87-
assertEquals(new CalendarInterval(65, 120, 119 * MICROS_PER_HOUR), input1.add(input2));
88-
}
89-
90-
@Test
91-
public void subtractTest() {
92-
CalendarInterval input1 = new CalendarInterval(3, 1, 1 * MICROS_PER_HOUR);
93-
CalendarInterval input2 = new CalendarInterval(2, 4, 100 * MICROS_PER_HOUR);
94-
assertEquals(new CalendarInterval(1, -3, -99 * MICROS_PER_HOUR), input1.subtract(input2));
95-
96-
input1 = new CalendarInterval(-10, -30, -81 * MICROS_PER_HOUR);
97-
input2 = new CalendarInterval(75, 150, 200 * MICROS_PER_HOUR);
98-
assertEquals(new CalendarInterval(-85, -180, -281 * MICROS_PER_HOUR), input1.subtract(input2));
99-
}
100-
10179
@Test
10280
public void periodAndDurationTest() {
10381
CalendarInterval interval = new CalendarInterval(120, -40, 123456);

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ class SparkContext(config: SparkConf) extends Logging {
573573

574574
// The metrics system for Driver need to be set spark.app.id to app ID.
575575
// So it should start after we get app ID from the task scheduler and set spark.app.id.
576-
_env.metricsSystem.start()
576+
_env.metricsSystem.start(_conf.get(METRICS_STATIC_SOURCES_ENABLED))
577577
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
578578
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
579579

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ object SparkEnv extends Logging {
383383
conf.set(EXECUTOR_ID, executorId)
384384
val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.EXECUTOR, conf,
385385
securityManager)
386-
ms.start()
386+
ms.start(conf.get(METRICS_STATIC_SOURCES_ENABLED))
387387
ms
388388
}
389389

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.python
19+
20+
import java.net.InetAddress
21+
import java.util.Locale
22+
23+
import org.apache.spark.SparkConf
24+
import org.apache.spark.internal.Logging
25+
import org.apache.spark.util.Utils
26+
27+
/**
28+
* A wrapper for both GatewayServer, and ClientServer to pin Python thread to JVM thread.
29+
*/
30+
private[spark] class Py4JServer(sparkConf: SparkConf) extends Logging {
31+
private[spark] val secret: String = Utils.createSecret(sparkConf)
32+
33+
// Launch a Py4J gateway or client server for the process to connect to; this will let it see our
34+
// Java system properties and such
35+
private val localhost = InetAddress.getLoopbackAddress()
36+
private[spark] val server = if (sys.env.getOrElse(
37+
"PYSPARK_PIN_THREAD", "false").toLowerCase(Locale.ROOT) == "true") {
38+
new py4j.ClientServer.ClientServerBuilder()
39+
.authToken(secret)
40+
.javaPort(0)
41+
.javaAddress(localhost)
42+
.build()
43+
} else {
44+
new py4j.GatewayServer.GatewayServerBuilder()
45+
.authToken(secret)
46+
.javaPort(0)
47+
.javaAddress(localhost)
48+
.callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
49+
.build()
50+
}
51+
52+
def start(): Unit = server match {
53+
case clientServer: py4j.ClientServer => clientServer.startServer()
54+
case gatewayServer: py4j.GatewayServer => gatewayServer.start()
55+
case other => throw new RuntimeException(s"Unexpected Py4J server ${other.getClass}")
56+
}
57+
58+
def getListeningPort: Int = server match {
59+
case clientServer: py4j.ClientServer => clientServer.getJavaServer.getListeningPort
60+
case gatewayServer: py4j.GatewayServer => gatewayServer.getListeningPort
61+
case other => throw new RuntimeException(s"Unexpected Py4J server ${other.getClass}")
62+
}
63+
64+
def shutdown(): Unit = server match {
65+
case clientServer: py4j.ClientServer => clientServer.shutdown()
66+
case gatewayServer: py4j.GatewayServer => gatewayServer.shutdown()
67+
case other => throw new RuntimeException(s"Unexpected Py4J server ${other.getClass}")
68+
}
69+
}

core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,42 +18,28 @@
1818
package org.apache.spark.api.python
1919

2020
import java.io.{DataOutputStream, File, FileOutputStream}
21-
import java.net.InetAddress
2221
import java.nio.charset.StandardCharsets.UTF_8
2322
import java.nio.file.Files
2423

25-
import py4j.GatewayServer
26-
2724
import org.apache.spark.SparkConf
2825
import org.apache.spark.internal.Logging
29-
import org.apache.spark.util.Utils
3026

3127
/**
32-
* Process that starts a Py4J GatewayServer on an ephemeral port.
28+
* Process that starts a Py4J server on an ephemeral port.
3329
*
3430
* This process is launched (via SparkSubmit) by the PySpark driver (see java_gateway.py).
3531
*/
3632
private[spark] object PythonGatewayServer extends Logging {
3733
initializeLogIfNecessary(true)
3834

3935
def main(args: Array[String]): Unit = {
40-
val secret = Utils.createSecret(new SparkConf())
41-
42-
// Start a GatewayServer on an ephemeral port. Make sure the callback client is configured
43-
// with the same secret, in case the app needs callbacks from the JVM to the underlying
44-
// python processes.
45-
val localhost = InetAddress.getLoopbackAddress()
46-
val gatewayServer: GatewayServer = new GatewayServer.GatewayServerBuilder()
47-
.authToken(secret)
48-
.javaPort(0)
49-
.javaAddress(localhost)
50-
.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
51-
.build()
36+
val sparkConf = new SparkConf()
37+
val gatewayServer: Py4JServer = new Py4JServer(sparkConf)
5238

5339
gatewayServer.start()
5440
val boundPort: Int = gatewayServer.getListeningPort
5541
if (boundPort == -1) {
56-
logError("GatewayServer failed to bind; exiting")
42+
logError(s"${gatewayServer.server.getClass} failed to bind; exiting")
5743
System.exit(1)
5844
} else {
5945
logDebug(s"Started PythonGatewayServer on port $boundPort")
@@ -68,7 +54,7 @@ private[spark] object PythonGatewayServer extends Logging {
6854
val dos = new DataOutputStream(new FileOutputStream(tmpPath))
6955
dos.writeInt(boundPort)
7056

71-
val secretBytes = secret.getBytes(UTF_8)
57+
val secretBytes = gatewayServer.secret.getBytes(UTF_8)
7258
dos.writeInt(secretBytes.length)
7359
dos.write(secretBytes, 0, secretBytes.length)
7460
dos.close()

core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@
1818
package org.apache.spark.deploy
1919

2020
import java.io.File
21-
import java.net.{InetAddress, URI}
21+
import java.net.URI
2222
import java.nio.file.Files
2323

2424
import scala.collection.JavaConverters._
2525
import scala.collection.mutable.ArrayBuffer
2626
import scala.util.Try
2727

2828
import org.apache.spark.{SparkConf, SparkUserAppException}
29-
import org.apache.spark.api.python.PythonUtils
29+
import org.apache.spark.api.python.{Py4JServer, PythonUtils}
3030
import org.apache.spark.internal.config._
3131
import org.apache.spark.util.{RedirectThread, Utils}
3232

@@ -40,7 +40,6 @@ object PythonRunner {
4040
val pyFiles = args(1)
4141
val otherArgs = args.slice(2, args.length)
4242
val sparkConf = new SparkConf()
43-
val secret = Utils.createSecret(sparkConf)
4443
val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
4544
.orElse(sparkConf.get(PYSPARK_PYTHON))
4645
.orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
@@ -51,15 +50,8 @@ object PythonRunner {
5150
val formattedPythonFile = formatPath(pythonFile)
5251
val formattedPyFiles = resolvePyFiles(formatPaths(pyFiles))
5352

54-
// Launch a Py4J gateway server for the process to connect to; this will let it see our
55-
// Java system properties and such
56-
val localhost = InetAddress.getLoopbackAddress()
57-
val gatewayServer = new py4j.GatewayServer.GatewayServerBuilder()
58-
.authToken(secret)
59-
.javaPort(0)
60-
.javaAddress(localhost)
61-
.callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
62-
.build()
53+
val gatewayServer = new Py4JServer(sparkConf)
54+
6355
val thread = new Thread(() => Utils.logUncaughtExceptions { gatewayServer.start() })
6456
thread.setName("py4j-gateway-init")
6557
thread.setDaemon(true)
@@ -86,7 +78,7 @@ object PythonRunner {
8678
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
8779
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
8880
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
89-
env.put("PYSPARK_GATEWAY_SECRET", secret)
81+
env.put("PYSPARK_GATEWAY_SECRET", gatewayServer.secret)
9082
// pass conf spark.pyspark.python to python process, the only way to pass info to
9183
// python process is through environment variable.
9284
sparkConf.get(PYSPARK_PYTHON).foreach(env.put("PYSPARK_PYTHON", _))

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,18 @@ import java.io.{File, FileNotFoundException, IOException}
2121
import java.nio.file.Files
2222
import java.util.{Date, ServiceLoader}
2323
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit}
24-
import java.util.zip.{ZipEntry, ZipOutputStream}
24+
import java.util.zip.ZipOutputStream
2525

2626
import scala.collection.JavaConverters._
2727
import scala.collection.mutable
2828
import scala.concurrent.ExecutionException
2929
import scala.io.Source
30-
import scala.util.Try
3130
import scala.xml.Node
3231

3332
import com.fasterxml.jackson.annotation.JsonIgnore
34-
import com.google.common.io.ByteStreams
3533
import com.google.common.util.concurrent.MoreExecutors
3634
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
37-
import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem}
35+
import org.apache.hadoop.hdfs.DistributedFileSystem
3836
import org.apache.hadoop.hdfs.protocol.HdfsConstants
3937
import org.apache.hadoop.security.AccessControlException
4038
import org.fusesource.leveldbjni.internal.NativeDB
@@ -47,7 +45,6 @@ import org.apache.spark.internal.config.History._
4745
import org.apache.spark.internal.config.Status._
4846
import org.apache.spark.internal.config.Tests.IS_TESTING
4947
import org.apache.spark.internal.config.UI._
50-
import org.apache.spark.io.CompressionCodec
5148
import org.apache.spark.scheduler._
5249
import org.apache.spark.scheduler.ReplayListenerBus._
5350
import org.apache.spark.status._

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,12 @@ package object config {
615615
.stringConf
616616
.createOptional
617617

618+
private[spark] val METRICS_STATIC_SOURCES_ENABLED =
619+
ConfigBuilder("spark.metrics.static.sources.enabled")
620+
.doc("Whether to register static sources with the metrics system.")
621+
.booleanConf
622+
.createWithDefault(true)
623+
618624
private[spark] val PYSPARK_DRIVER_PYTHON = ConfigBuilder("spark.pyspark.driver.python")
619625
.stringConf
620626
.createOptional

0 commit comments

Comments
 (0)