Skip to content

Commit 1c98218

Browse files
author
Louiszr
committed
Merge remote-tracking branch 'upstream/master'
2 parents fcfac36 + 3722ed4 commit 1c98218

File tree

713 files changed

+132803
-919
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

713 files changed

+132803
-919
lines changed

.github/workflows/master.yml renamed to .github/workflows/build_and_test.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,12 @@ jobs:
183183
with:
184184
name: test-results-${{ matrix.modules }}-${{ matrix.comment }}-${{ matrix.java }}-${{ matrix.hadoop }}-${{ matrix.hive }}
185185
path: "**/target/test-reports/*.xml"
186+
- name: Upload unit tests log files
187+
if: failure()
188+
uses: actions/upload-artifact@v2
189+
with:
190+
name: unit-tests-log-${{ matrix.modules }}-${{ matrix.comment }}-${{ matrix.java }}-${{ matrix.hadoop }}-${{ matrix.hive }}
191+
path: "**/target/unit-tests.log"
186192

187193
# Static analysis, and documentation build
188194
lint:

.github/workflows/test_report.yml

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,15 @@ jobs:
1010
runs-on: ubuntu-latest
1111
steps:
1212
- name: Download test results to report
13-
# TODO(SPARK-32605): It was forked to have a custom fix
14-
# https://github.com/HyukjinKwon/action-surefire-report/commit/c96094cc35061fcf154a7cb46807f2f3e2339476
15-
# in order to add the support of custom target commit SHA. It should be contributed back to the original
16-
# plugin and avoid using the fork.
17-
uses: HyukjinKwon/action-download-artifact@master
13+
uses: dawidd6/action-download-artifact@v2
1814
with:
1915
github_token: ${{ secrets.GITHUB_TOKEN }}
2016
workflow: ${{ github.event.workflow_run.workflow_id }}
2117
commit: ${{ github.event.workflow_run.head_commit.id }}
2218
- name: Publish test report
23-
# TODO(SPARK-32606): It was forked to have a custom fix
24-
# https://github.com/HyukjinKwon/action-download-artifact/commit/750b71af351aba467757d7be6924199bb08db4ed
25-
# in order to add the support to download all artifacts. It should be contributed back to the original
26-
# plugin and avoid using the fork.
27-
# Alternatively, we can use the official actions/download-artifact once they support to download artifacts
28-
# between different workloads, see also https://github.com/actions/download-artifact/issues/3
29-
uses: HyukjinKwon/action-surefire-report@master
19+
uses: scacap/action-surefire-report@v1
3020
with:
31-
check_name: Test report
21+
check_name: Report test results
3222
github_token: ${{ secrets.GITHUB_TOKEN }}
3323
report_paths: "**/target/test-reports/*.xml"
3424
commit: ${{ github.event.workflow_run.head_commit.id }}
35-

R/pkg/tests/run-all.R

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,18 @@ if (identical(Sys.getenv("NOT_CRAN"), "true")) {
6161
set.seed(42)
6262

6363
# TODO (SPARK-30663) To be removed once testthat 1.x is removed from all builds
64-
if (grepl("^1\\..*", packageVersion("testthat"))) {
64+
if (packageVersion("testthat")$major <= 1) {
6565
# testthat 1.x
6666
test_runner <- testthat:::run_tests
6767
reporter <- "summary"
68-
6968
} else {
7069
# testthat >= 2.0.0
7170
test_runner <- testthat:::test_package_dir
72-
reporter <- testthat::default_reporter()
71+
dir.create("target/test-reports", showWarnings = FALSE)
72+
reporter <- MultiReporter$new(list(
73+
SummaryReporter$new(),
74+
JunitReporter$new(file = "target/test-reports/test-results.xml")
75+
))
7376
}
7477

7578
test_runner("SparkR",

appveyor.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ cache:
4141
install:
4242
# Install maven and dependencies
4343
- ps: .\dev\appveyor-install-dependencies.ps1
44-
# Required package for R unit tests
45-
- cmd: Rscript -e "install.packages(c('knitr', 'rmarkdown', 'testthat', 'e1071', 'survival', 'arrow'), repos='https://cloud.r-project.org/')"
44+
# Required package for R unit tests. xml2 is required to use jUnit reporter in testthat.
45+
- cmd: Rscript -e "install.packages(c('knitr', 'rmarkdown', 'testthat', 'e1071', 'survival', 'arrow', 'xml2'), repos='https://cloud.r-project.org/')"
4646
- cmd: Rscript -e "pkg_list <- as.data.frame(installed.packages()[,c(1, 3:4)]); pkg_list[is.na(pkg_list$Priority), 1:2, drop = FALSE]"
4747

4848
build_script:

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,68 @@ public MapIterator destructiveIterator() {
428428
return new MapIterator(numValues, new Location(), true);
429429
}
430430

431+
/**
432+
* Iterator for the entries of this map. This is to first iterate over key indices in
433+
* `longArray` then accessing values in `dataPages`. NOTE: this is different from `MapIterator`
434+
* in the sense that key index is preserved here
435+
* (See `UnsafeHashedRelation` for example of usage).
436+
*/
437+
public final class MapIteratorWithKeyIndex implements Iterator<Location> {
438+
439+
/**
440+
* The index in `longArray` where the key is stored.
441+
*/
442+
private int keyIndex = 0;
443+
444+
private int numRecords;
445+
private final Location loc;
446+
447+
private MapIteratorWithKeyIndex() {
448+
this.numRecords = numValues;
449+
this.loc = new Location();
450+
}
451+
452+
@Override
453+
public boolean hasNext() {
454+
return numRecords > 0;
455+
}
456+
457+
@Override
458+
public Location next() {
459+
if (!loc.isDefined() || !loc.nextValue()) {
460+
while (longArray.get(keyIndex * 2) == 0) {
461+
keyIndex++;
462+
}
463+
loc.with(keyIndex, 0, true);
464+
keyIndex++;
465+
}
466+
numRecords--;
467+
return loc;
468+
}
469+
}
470+
471+
/**
472+
* Returns an iterator for iterating over the entries of this map,
473+
* by first iterating over the key index inside hash map's `longArray`.
474+
*
475+
* For efficiency, all calls to `next()` will return the same {@link Location} object.
476+
*
477+
* The returned iterator is NOT thread-safe. If the map is modified while iterating over it,
478+
* the behavior of the returned iterator is undefined.
479+
*/
480+
public MapIteratorWithKeyIndex iteratorWithKeyIndex() {
481+
return new MapIteratorWithKeyIndex();
482+
}
483+
484+
/**
485+
* The maximum number of allowed keys index.
486+
*
487+
* The value of allowed keys index is in the range of [0, maxNumKeysIndex - 1].
488+
*/
489+
public int maxNumKeysIndex() {
490+
return (int) (longArray.size() / 2);
491+
}
492+
431493
/**
432494
* Looks up a key, and return a {@link Location} handle that can be used to test existence
433495
* and read/write values.
@@ -601,6 +663,14 @@ public boolean isDefined() {
601663
return isDefined;
602664
}
603665

666+
/**
667+
* Returns index for key.
668+
*/
669+
public int getKeyIndex() {
670+
assert (isDefined);
671+
return pos;
672+
}
673+
604674
/**
605675
* Returns the base object for key.
606676
*/

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
2727

2828
import org.apache.spark.internal.{config, Logging}
2929
import org.apache.spark.internal.config._
30+
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
3031
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
31-
import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
3232
import org.apache.spark.metrics.source.Source
3333
import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
3434
import org.apache.spark.resource.ResourceProfileManager
@@ -128,7 +128,7 @@ private[spark] class ExecutorAllocationManager(
128128
private val executorAllocationRatio =
129129
conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
130130

131-
private val decommissionEnabled = conf.get(WORKER_DECOMMISSION_ENABLED)
131+
private val decommissionEnabled = conf.get(DECOMMISSION_ENABLED)
132132

133133
private val defaultProfileId = resourceProfileManager.defaultResourceProfile.id
134134

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ private[deploy] class Worker(
6767
assert (port > 0)
6868

6969
// If worker decommissioning is enabled register a handler on PWR to shutdown.
70-
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
70+
if (conf.get(config.DECOMMISSION_ENABLED)) {
7171
logInfo("Registering SIGPWR handler to trigger decommissioning.")
7272
SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
7373
"disabling worker decommission feature.")(decommissionSelf)
@@ -769,7 +769,7 @@ private[deploy] class Worker(
769769
}
770770

771771
private[deploy] def decommissionSelf(): Boolean = {
772-
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
772+
if (conf.get(config.DECOMMISSION_ENABLED)) {
773773
logDebug("Decommissioning self")
774774
decommissioned = true
775775
sendToMaster(WorkerDecommission(workerId, self))

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,10 +294,15 @@ private[spark] class CoarseGrainedExecutorBackend(
294294
override def run(): Unit = {
295295
var lastTaskRunningTime = System.nanoTime()
296296
val sleep_time = 1000 // 1s
297-
297+
// This config is internal and only used by unit tests to force an executor
298+
// to hang around for longer when decommissioned.
299+
val initialSleepMillis = env.conf.getInt(
300+
"spark.test.executor.decommission.initial.sleep.millis", sleep_time)
301+
if (initialSleepMillis > 0) {
302+
Thread.sleep(initialSleepMillis)
303+
}
298304
while (true) {
299305
logInfo("Checking to see if we can shutdown.")
300-
Thread.sleep(sleep_time)
301306
if (executor == null || executor.numRunningTasks == 0) {
302307
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
303308
logInfo("No running tasks, checking migrations")
@@ -323,6 +328,7 @@ private[spark] class CoarseGrainedExecutorBackend(
323328
// move forward.
324329
lastTaskRunningTime = System.nanoTime()
325330
}
331+
Thread.sleep(sleep_time)
326332
}
327333
}
328334
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,8 @@ private[spark] class Executor(
336336
extends Runnable {
337337

338338
val taskId = taskDescription.taskId
339-
val threadName = s"Executor task launch worker for task $taskId"
340339
val taskName = taskDescription.name
340+
val threadName = s"Executor task launch worker for $taskName"
341341
val mdcProperties = taskDescription.properties.asScala
342342
.filter(_._1.startsWith("mdc.")).toSeq
343343

@@ -364,7 +364,7 @@ private[spark] class Executor(
364364
@volatile var task: Task[Any] = _
365365

366366
def kill(interruptThread: Boolean, reason: String): Unit = {
367-
logInfo(s"Executor is trying to kill $taskName (TID $taskId), reason: $reason")
367+
logInfo(s"Executor is trying to kill $taskName, reason: $reason")
368368
reasonIfKilled = Some(reason)
369369
if (task != null) {
370370
synchronized {
@@ -425,7 +425,7 @@ private[spark] class Executor(
425425
} else 0L
426426
Thread.currentThread.setContextClassLoader(replClassLoader)
427427
val ser = env.closureSerializer.newInstance()
428-
logInfo(s"Running $taskName (TID $taskId)")
428+
logInfo(s"Running $taskName")
429429
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
430430
var taskStartTimeNs: Long = 0
431431
var taskStartCpu: Long = 0
@@ -459,7 +459,7 @@ private[spark] class Executor(
459459
// MapOutputTrackerMaster and its cache invalidation is not based on epoch numbers so
460460
// we don't need to make any special calls here.
461461
if (!isLocal) {
462-
logDebug("Task " + taskId + "'s epoch is " + task.epoch)
462+
logDebug(s"$taskName's epoch is ${task.epoch}")
463463
env.mapOutputTracker.asInstanceOf[MapOutputTrackerWorker].updateEpoch(task.epoch)
464464
}
465465

@@ -485,7 +485,7 @@ private[spark] class Executor(
485485
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
486486

487487
if (freedMemory > 0 && !threwException) {
488-
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
488+
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, $taskName"
489489
if (conf.get(UNSAFE_EXCEPTION_ON_MEMORY_LEAK)) {
490490
throw new SparkException(errMsg)
491491
} else {
@@ -495,7 +495,7 @@ private[spark] class Executor(
495495

496496
if (releasedLocks.nonEmpty && !threwException) {
497497
val errMsg =
498-
s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
498+
s"${releasedLocks.size} block locks were not released by $taskName\n" +
499499
releasedLocks.mkString("[", ", ", "]")
500500
if (conf.get(STORAGE_EXCEPTION_PIN_LEAK)) {
501501
throw new SparkException(errMsg)
@@ -508,7 +508,7 @@ private[spark] class Executor(
508508
// uh-oh. it appears the user code has caught the fetch-failure without throwing any
509509
// other exceptions. Its *possible* this is what the user meant to do (though highly
510510
// unlikely). So we will log an error and keep going.
511-
logError(s"TID ${taskId} completed successfully though internally it encountered " +
511+
logError(s"$taskName completed successfully though internally it encountered " +
512512
s"unrecoverable fetch failures! Most likely this means user code is incorrectly " +
513513
s"swallowing Spark's internal ${classOf[FetchFailedException]}", fetchFailure)
514514
}
@@ -592,7 +592,7 @@ private[spark] class Executor(
592592
// directSend = sending directly back to the driver
593593
val serializedResult: ByteBuffer = {
594594
if (maxResultSize > 0 && resultSize > maxResultSize) {
595-
logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
595+
logWarning(s"Finished $taskName. Result is larger than maxResultSize " +
596596
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
597597
s"dropping it.")
598598
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
@@ -602,11 +602,10 @@ private[spark] class Executor(
602602
blockId,
603603
new ChunkedByteBuffer(serializedDirectResult.duplicate()),
604604
StorageLevel.MEMORY_AND_DISK_SER)
605-
logInfo(
606-
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
605+
logInfo(s"Finished $taskName. $resultSize bytes result sent via BlockManager)")
607606
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
608607
} else {
609-
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
608+
logInfo(s"Finished $taskName. $resultSize bytes result sent to driver")
610609
serializedDirectResult
611610
}
612611
}
@@ -616,7 +615,7 @@ private[spark] class Executor(
616615
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
617616
} catch {
618617
case t: TaskKilledException =>
619-
logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}")
618+
logInfo(s"Executor killed $taskName, reason: ${t.reason}")
620619

621620
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
622621
// Here and below, put task metric peaks in a WrappedArray to expose them as a Seq
@@ -629,7 +628,7 @@ private[spark] class Executor(
629628
case _: InterruptedException | NonFatal(_) if
630629
task != null && task.reasonIfKilled.isDefined =>
631630
val killReason = task.reasonIfKilled.getOrElse("unknown reason")
632-
logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason")
631+
logInfo(s"Executor interrupted and killed $taskName, reason: $killReason")
633632

634633
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
635634
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId))
@@ -643,7 +642,7 @@ private[spark] class Executor(
643642
// there was a fetch failure in the task, but some user code wrapped that exception
644643
// and threw something else. Regardless, we treat it as a fetch failure.
645644
val fetchFailedCls = classOf[FetchFailedException].getName
646-
logWarning(s"TID ${taskId} encountered a ${fetchFailedCls} and " +
645+
logWarning(s"$taskName encountered a ${fetchFailedCls} and " +
647646
s"failed, but the ${fetchFailedCls} was hidden by another " +
648647
s"exception. Spark is handling this like a fetch failure and ignoring the " +
649648
s"other exception: $t")
@@ -659,13 +658,13 @@ private[spark] class Executor(
659658
case t: Throwable if env.isStopped =>
660659
// Log the expected exception after executor.stop without stack traces
661660
// see: SPARK-19147
662-
logError(s"Exception in $taskName (TID $taskId): ${t.getMessage}")
661+
logError(s"Exception in $taskName: ${t.getMessage}")
663662

664663
case t: Throwable =>
665664
// Attempt to exit cleanly by informing the driver of our failure.
666665
// If anything goes wrong (or this was a fatal exception), we will delegate to
667666
// the default uncaught exception handler, which will terminate the Executor.
668-
logError(s"Exception in $taskName (TID $taskId)", t)
667+
logError(s"Exception in $taskName", t)
669668

670669
// SPARK-20904: Do not report failure to driver if if happened during shut down. Because
671670
// libraries may set up shutdown hooks that race with running tasks during shutdown,

core/src/main/scala/org/apache/spark/internal/config/Worker.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,4 @@ private[spark] object Worker {
8282
.version("2.0.2")
8383
.intConf
8484
.createWithDefault(100)
85-
86-
private[spark] val WORKER_DECOMMISSION_ENABLED =
87-
ConfigBuilder("spark.worker.decommission.enabled")
88-
.version("3.1.0")
89-
.booleanConf
90-
.createWithDefault(false)
9185
}

0 commit comments

Comments
 (0)