Skip to content

Commit 43ce929

Browse files
authored
Merge branch 'master' into fncAlias
2 parents 8e9e42b + 0945633 commit 43ce929

File tree

330 files changed

+6465
-3627
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

330 files changed

+6465
-3627
lines changed

.github/workflows/stale.yml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
name: Close stale PRs
2+
3+
on:
4+
schedule:
5+
- cron: "0 0 * * *"
6+
7+
jobs:
8+
stale:
9+
runs-on: ubuntu-latest
10+
steps:
11+
- uses: actions/[email protected]
12+
with:
13+
repo-token: ${{ secrets.GITHUB_TOKEN }}
14+
stale-pr-message: |
15+
We're closing this PR because it hasn't been updated in a while.
16+
This isn't a judgement on the merit of the PR in any way. It's just
17+
a way of keeping the PR queue manageable.
18+
19+
If you'd like to revive this PR, please reopen it!
20+
days-before-stale: 100
21+
# Setting this to 0 is the same as setting it to 1.
22+
# See: https://github.com/actions/stale/issues/28
23+
days-before-close: 0

LICENSE-binary

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,6 @@ org.eclipse.jetty:jetty-util
393393
org.eclipse.jetty:jetty-webapp
394394
org.eclipse.jetty:jetty-xml
395395
org.scala-lang.modules:scala-xml_2.12
396-
org.opencypher:okapi-shade
397396
com.github.joshelser:dropwizard-metrics-hadoop-metrics2-reporter
398397
com.zaxxer.HikariCP
399398
org.apache.hive:hive-beeline

R/pkg/R/functions.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3620,8 +3620,8 @@ setMethod("size",
36203620
#' (array indices start at 1, or from the end if start is negative) with the specified length.
36213621
#'
36223622
#' @rdname column_collection_functions
3623-
#' @param start an index indicating the first element occurring in the result.
3624-
#' @param length a number of consecutive elements chosen to the result.
3623+
#' @param start the starting index
3624+
#' @param length the length of the slice
36253625
#' @aliases slice slice,Column-method
36263626
#' @note slice since 2.4.0
36273627
setMethod("slice",

assembly/pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,6 @@
6464
<artifactId>spark-graphx_${scala.binary.version}</artifactId>
6565
<version>${project.version}</version>
6666
</dependency>
67-
<dependency>
68-
<groupId>org.apache.spark</groupId>
69-
<artifactId>spark-graph_${scala.binary.version}</artifactId>
70-
<version>${project.version}</version>
71-
</dependency>
7267
<dependency>
7368
<groupId>org.apache.spark</groupId>
7469
<artifactId>spark-sql_${scala.binary.version}</artifactId>

common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,4 +133,16 @@ private void appendUnit(StringBuilder sb, long value, String unit) {
133133
* @throws ArithmeticException if a numeric overflow occurs
134134
*/
135135
public Duration extractAsDuration() { return Duration.of(microseconds, ChronoUnit.MICROS); }
136+
137+
/**
138+
* A constant holding the minimum value an {@code CalendarInterval} can have.
139+
*/
140+
public static CalendarInterval MIN_VALUE =
141+
new CalendarInterval(Integer.MIN_VALUE, Integer.MIN_VALUE, Long.MIN_VALUE);
142+
143+
/**
144+
* A constant holding the maximum value an {@code CalendarInterval} can have.
145+
*/
146+
public static CalendarInterval MAX_VALUE =
147+
new CalendarInterval(Integer.MAX_VALUE, Integer.MAX_VALUE, Long.MAX_VALUE);
136148
}

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -694,7 +694,10 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
694694
assert (vlen % 8 == 0);
695695
assert (longArray != null);
696696

697-
if (numKeys == MAX_CAPACITY
697+
// We should not increase number of keys to be MAX_CAPACITY. The usage pattern of this map is
698+
// lookup + append. If we append key until the number of keys to be MAX_CAPACITY, next time
699+
// the call of lookup will hang forever because it cannot find an empty slot.
700+
if (numKeys == MAX_CAPACITY - 1
698701
// The map could be reused from last spill (because of no enough memory to grow),
699702
// then we don't try to grow again if hit the `growthThreshold`.
700703
|| !canGrowArray && numKeys >= growthThreshold) {
@@ -741,7 +744,9 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
741744
longArray.set(pos * 2 + 1, keyHashcode);
742745
isDefined = true;
743746

744-
if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) {
747+
// We use two array entries per key, so the array size is twice the capacity.
748+
// We should compare the current capacity of the array, instead of its size.
749+
if (numKeys >= growthThreshold && longArray.size() / 2 < MAX_CAPACITY) {
745750
try {
746751
growAndRehash();
747752
} catch (SparkOutOfMemoryError oom) {

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.apache.spark.annotation.DeveloperApi
4242
import org.apache.spark.broadcast.Broadcast
4343
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
4444
import org.apache.spark.deploy.StandaloneResourceUtils._
45-
import org.apache.spark.executor.ExecutorMetrics
45+
import org.apache.spark.executor.{ExecutorMetrics, ExecutorMetricsSource}
4646
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
4747
import org.apache.spark.internal.Logging
4848
import org.apache.spark.internal.config._
@@ -551,9 +551,16 @@ class SparkContext(config: SparkConf) extends Logging {
551551
_dagScheduler = new DAGScheduler(this)
552552
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
553553

554+
val _executorMetricsSource =
555+
if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) {
556+
Some(new ExecutorMetricsSource)
557+
} else {
558+
None
559+
}
560+
554561
// create and start the heartbeater for collecting memory metrics
555562
_heartbeater = new Heartbeater(
556-
() => SparkContext.this.reportHeartBeat(),
563+
() => SparkContext.this.reportHeartBeat(_executorMetricsSource),
557564
"driver-heartbeater",
558565
conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
559566
_heartbeater.start()
@@ -622,6 +629,7 @@ class SparkContext(config: SparkConf) extends Logging {
622629
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
623630
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
624631
_env.metricsSystem.registerSource(new JVMCPUSource())
632+
_executorMetricsSource.foreach(_.register(_env.metricsSystem))
625633
_executorAllocationManager.foreach { e =>
626634
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
627635
}
@@ -1525,17 +1533,17 @@ class SparkContext(config: SparkConf) extends Logging {
15251533
*/
15261534
def addFile(path: String, recursive: Boolean): Unit = {
15271535
val uri = new Path(path).toUri
1528-
val schemeCorrectedPath = uri.getScheme match {
1529-
case null => new File(path).getCanonicalFile.toURI.toString
1536+
val schemeCorrectedURI = uri.getScheme match {
1537+
case null => new File(path).getCanonicalFile.toURI
15301538
case "local" =>
15311539
logWarning("File with 'local' scheme is not supported to add to file server, since " +
15321540
"it is already available on every node.")
15331541
return
1534-
case _ => path
1542+
case _ => uri
15351543
}
15361544

1537-
val hadoopPath = new Path(schemeCorrectedPath)
1538-
val scheme = new URI(schemeCorrectedPath).getScheme
1545+
val hadoopPath = new Path(schemeCorrectedURI)
1546+
val scheme = schemeCorrectedURI.getScheme
15391547
if (!Array("http", "https", "ftp").contains(scheme)) {
15401548
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
15411549
val isDir = fs.getFileStatus(hadoopPath).isDirectory
@@ -1555,7 +1563,11 @@ class SparkContext(config: SparkConf) extends Logging {
15551563
val key = if (!isLocal && scheme == "file") {
15561564
env.rpcEnv.fileServer.addFile(new File(uri.getPath))
15571565
} else {
1558-
schemeCorrectedPath
1566+
if (uri.getScheme == null) {
1567+
schemeCorrectedURI.toString
1568+
} else {
1569+
path
1570+
}
15591571
}
15601572
val timestamp = System.currentTimeMillis
15611573
if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
@@ -1848,7 +1860,7 @@ class SparkContext(config: SparkConf) extends Logging {
18481860

18491861
def checkRemoteJarFile(path: String): String = {
18501862
val hadoopPath = new Path(path)
1851-
val scheme = new URI(path).getScheme
1863+
val scheme = hadoopPath.toUri.getScheme
18521864
if (!Array("http", "https", "ftp").contains(scheme)) {
18531865
try {
18541866
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
@@ -1870,21 +1882,21 @@ class SparkContext(config: SparkConf) extends Logging {
18701882
}
18711883
}
18721884

1873-
if (path == null) {
1874-
logWarning("null specified as parameter to addJar")
1885+
if (path == null || path.isEmpty) {
1886+
logWarning("null or empty path specified as parameter to addJar")
18751887
} else {
18761888
val key = if (path.contains("\\")) {
18771889
// For local paths with backslashes on Windows, URI throws an exception
18781890
addLocalJarFile(new File(path))
18791891
} else {
1880-
val uri = new URI(path)
1892+
val uri = new Path(path).toUri
18811893
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
18821894
Utils.validateURL(uri)
18831895
uri.getScheme match {
18841896
// A JAR file which exists only on the driver node
18851897
case null =>
18861898
// SPARK-22585 path without schema is not url encoded
1887-
addLocalJarFile(new File(uri.getRawPath))
1899+
addLocalJarFile(new File(uri.getPath))
18881900
// A JAR file which exists only on the driver node
18891901
case "file" => addLocalJarFile(new File(uri.getPath))
18901902
// A JAR file which exists locally on every worker node
@@ -2473,8 +2485,10 @@ class SparkContext(config: SparkConf) extends Logging {
24732485
}
24742486

24752487
/** Reports heartbeat metrics for the driver. */
2476-
private def reportHeartBeat(): Unit = {
2488+
private def reportHeartBeat(executorMetricsSource: Option[ExecutorMetricsSource]): Unit = {
24772489
val currentMetrics = ExecutorMetrics.getCurrentMetrics(env.memoryManager)
2490+
executorMetricsSource.foreach(_.updateMetricsSnapshot(currentMetrics))
2491+
24782492
val driverUpdates = new HashMap[(Int, Int), ExecutorMetrics]
24792493
// In the driver, we do not track per-stage metrics, so use a dummy stage for the key
24802494
driverUpdates.put(EventLoggingListener.DRIVER_STAGE_KEY, new ExecutorMetrics(currentMetrics))

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,16 @@ private[spark] object TestUtils {
247247
url: URL,
248248
method: String = "GET",
249249
headers: Seq[(String, String)] = Nil): Int = {
250+
withHttpConnection(url, method, headers = headers) { connection =>
251+
connection.getResponseCode()
252+
}
253+
}
254+
255+
def withHttpConnection[T](
256+
url: URL,
257+
method: String = "GET",
258+
headers: Seq[(String, String)] = Nil)
259+
(fn: HttpURLConnection => T): T = {
250260
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
251261
connection.setRequestMethod(method)
252262
headers.foreach { case (k, v) => connection.setRequestProperty(k, v) }
@@ -271,7 +281,7 @@ private[spark] object TestUtils {
271281

272282
try {
273283
connection.connect()
274-
connection.getResponseCode()
284+
fn(connection)
275285
} finally {
276286
connection.disconnect()
277287
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
137137
* Remove keys that don't start with "spark." from `sparkProperties`.
138138
*/
139139
private def ignoreNonSparkProperties(): Unit = {
140-
sparkProperties.foreach { case (k, v) =>
140+
sparkProperties.keys.foreach { k =>
141141
if (!k.startsWith("spark.")) {
142142
sparkProperties -= k
143-
logWarning(s"Ignoring non-spark config property: $k=$v")
143+
logWarning(s"Ignoring non-Spark config property: $k")
144144
}
145145
}
146146
}

0 commit comments

Comments
 (0)