Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e957c4e
[SPARK-23816][CORE] Killed tasks should ignore FetchFailures.
squito Apr 9, 2018
a902323
[SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might g…
ueshin Apr 18, 2018
041aec4
[SPARK-23963][SQL] Properly handle large number of columns in query o…
bersprockets Apr 13, 2018
e77d62a
[MINOR][DOCS] Fix comments of SQLExecution#withExecutionId
seancxmao Apr 24, 2018
4f1ae3a
[SPARK-23941][MESOS] Mesos task failed on specific spark app name
BounkongK May 1, 2018
154bbc9
[SPARK-23433][CORE] Late zombie task completions update all tasksets
squito May 3, 2018
768d0b7
[SPARK-23489][SQL][TEST][BRANCH-2.2] HiveExternalCatalogVersionsSuite…
dongjoon-hyun May 4, 2018
866270e
[SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero cor…
cloud-fan May 4, 2018
f9d6a16
[SPARK-21278][PYSPARK] Upgrade to Py4J 0.10.6
dongjoon-hyun Jul 5, 2017
850b7d8
[PYSPARK] Update py4j to version 0.10.7.
Apr 13, 2018
f96d13d
[SPARKR] Match pyspark features in SparkR communication protocol.
Apr 17, 2018
8c223b6
[R][BACKPORT-2.2] backport lint fix
felixcheung May 16, 2018
6a55d8b
[SPARK-23850][SQL][BRANCH-2.2] Add separate config for SQL options re…
May 18, 2018
2379074
[SPARK-24257][SQL] LongToUnsafeRowMap calculate the new size may be w…
cxzl25 May 24, 2018
8abd0a7
fix compilation caused by SPARK-24257
cloud-fan May 24, 2018
fb86eb0
[MINOR] Add port SSL config in toString and scaladoc
mgaido91 May 25, 2018
c306a84
[WEBUI] Avoid possibility of script in query param keys
srowen May 31, 2018
bf0b212
[SPARK-24531][TESTS] Remove version 2.2.0 from testing versions in Hi…
mgaido91 Jun 12, 2018
1f81ade
[SPARK-24506][UI] Add UI filters to tabs added after binding
mgaido91 Jun 12, 2018
090b883
[SPARK-23732][DOCS] Fix source links in generated scaladoc.
Jun 12, 2018
8ce9e2a
Preparing Spark release v2.2.2-rc1
tgravescs Jun 18, 2018
e2e4d58
Preparing development version 2.2-3-SNAPSHOT
tgravescs Jun 18, 2018
7bfefc9
Preparing development version 2.2.3-SNAPSHOT
tgravescs Jun 18, 2018
751b008
[SPARK-24589][CORE] Correctly identify tasks in output commit coordin…
Jun 21, 2018
a600004
[SPARK-22897][CORE] Expose stageAttemptId in TaskContext
advancedxy Jun 22, 2018
72575d0
[SPARK-24552][CORE][BRANCH-2.2] Use unique id instead of attempt numb…
Jun 25, 2018
fc28ba3
Preparing Spark release v2.2.2-rc2
tgravescs Jun 27, 2018
4795827
Preparing development version 2.2.3-SNAPSHOT
tgravescs Jun 27, 2018
a8537a5
[SPARK-24603][SQL] Fix findTightestCommonType reference in comments
Jun 28, 2018
e63c97b
Merge branch 'branch-2.2' of github.com:apache/spark into csd-2.2
markhamstra Jul 9, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: SparkR
Type: Package
Version: 2.2.2
Version: 2.2.3
Title: R Frontend for Apache Spark
Description: Provides an R Frontend for Apache Spark.
Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ sparkR.sparkContext <- function(

# Don't use readString() so that we can provide a useful
# error message if the R and Java versions are mismatched.
authSecretLen = readInt(f)
authSecretLen <- readInt(f)
if (length(authSecretLen) == 0 || authSecretLen == 0) {
stop("Unexpected EOF in JVM connection data. Mismatched versions?")
}
Expand Down
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-csd-14-SNAPSHOT</version>
<version>2.2.2-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-csd-14-SNAPSHOT</version>
<version>2.2.2-csd-1-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-csd-14-SNAPSHOT</version>
<version>2.2.2-csd-1-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-csd-14-SNAPSHOT</version>
<version>2.2.2-csd-1-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/sketch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-csd-14-SNAPSHOT</version>
<version>2.2.2-csd-1-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/tags/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-csd-14-SNAPSHOT</version>
<version>2.2.2-csd-1-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-csd-14-SNAPSHOT</version>
<version>2.2.2-csd-1-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-csd-14-SNAPSHOT</version>
<version>2.2.2-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private[spark] case class SSLOptions(
}

/** Returns a string representation of this SSLOptions with all the passwords masked. */
override def toString: String = s"SSLOptions{enabled=$enabled, " +
override def toString: String = s"SSLOptions{enabled=$enabled, port=$port, " +
s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"
Expand All @@ -140,6 +140,7 @@ private[spark] object SSLOptions extends Logging {
*
* The following settings are allowed:
* $ - `[ns].enabled` - `true` or `false`, to enable or disable SSL respectively
* $ - `[ns].port` - the port where to bind the SSL server
* $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
* $ - `[ns].keyStorePassword` - a password to the key-store file
* $ - `[ns].keyPassword` - a password to the private key
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object TaskContext {
* An empty task context that does not represent an actual task. This is only used in tests.
*/
private[spark] def empty(): TaskContextImpl = {
new TaskContextImpl(0, 0, 0, 0, null, new Properties, null)
new TaskContextImpl(0, 0, 0, 0, 0, null, new Properties, null)
}
}

Expand Down Expand Up @@ -150,6 +150,13 @@ abstract class TaskContext extends Serializable {
*/
def stageId(): Int

/**
* How many times the stage that this task belongs to has been attempted. The first stage attempt
* will be assigned stageAttemptNumber = 0, and subsequent attempts will have increasing attempt
* numbers.
*/
def stageAttemptNumber(): Int

/**
* The ID of the RDD partition that is computed by this task.
*/
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ import org.apache.spark.util._
* `TaskMetrics` & `MetricsSystem` objects are not thread safe.
*/
private[spark] class TaskContextImpl(
val stageId: Int,
val partitionId: Int,
override val stageId: Int,
override val stageAttemptNumber: Int,
override val partitionId: Int,
override val taskAttemptId: Long,
override val attemptNumber: Int,
override val taskMemoryManager: TaskMemoryManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ class HistoryServer(
completed: Boolean) {
assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs")
ui.getHandlers.foreach(attachHandler)
addFilters(ui.getHandlers, conf)
}

/** Detach a reconstructed UI from this server. Only valid after bind(). */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ package object config {
"a property key or value, the value is redacted from the environment UI and various logs " +
"like YARN and event logs.")
.regexConf
.createWithDefault("(?i)secret|password|url|user|username".r)
.createWithDefault("(?i)secret|password".r)

private[spark] val STRING_REDACTION_PATTERN =
ConfigBuilder("spark.redaction.string.regex")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,16 @@ object SparkHadoopMapReduceWriter extends Logging {
// Try to write all RDD partitions as a Hadoop OutputFormat.
try {
val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
// SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers.
// Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently.
val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber

executeTask(
context = context,
jobTrackerId = jobTrackerId,
commitJobId = commitJobId,
sparkPartitionId = context.partitionId,
sparkAttemptNumber = context.attemptNumber,
sparkAttemptNumber = attemptId,
committer = committer,
hadoopConf = conf.value,
outputFormat = format.asInstanceOf[Class[OutputFormat[K, V]]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ object SparkHadoopMapRedUtil extends Logging {

if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val taskAttemptNumber = TaskContext.get().attemptNumber()
val stageId = TaskContext.get().stageId()
val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber)
val ctx = TaskContext.get()
val canCommit = outputCommitCoordinator.canCommit(ctx.stageId(), ctx.stageAttemptNumber(),
splitId, ctx.attemptNumber())

if (canCommit) {
performCommit()
Expand All @@ -81,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(message)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
committer.abortTask(mrTaskContext)
throw new CommitDeniedException(message, stageId, splitId, taskAttemptNumber)
throw new CommitDeniedException(message, ctx.stageId(), splitId, ctx.attemptNumber())
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
Expand Down
23 changes: 15 additions & 8 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,7 @@ class DAGScheduler(

outputCommitCoordinator.taskCompleted(
stageId,
task.stageAttemptId,
task.partitionId,
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)
Expand Down Expand Up @@ -1323,23 +1324,24 @@ class DAGScheduler(
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
} else {
failedStage.fetchFailedAttemptIds.add(task.stageAttemptId)
val shouldAbortStage =
failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts ||
disallowStageRetryForTest

// It is likely that we receive multiple FetchFailed for a single stage (because we have
// multiple tasks running concurrently on different executors). In that case, it is
// possible the fetch failure has already been handled by the scheduler.
if (runningStages.contains(failedStage)) {
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
s"due to a fetch failure from $mapStage (${mapStage.name})")
markStageAsFinished(failedStage, Some(failureMessage))
markStageAsFinished(failedStage, errorMessage = Some(failureMessage),
willRetry = !shouldAbortStage)
} else {
logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " +
s"longer running")
}

failedStage.fetchFailedAttemptIds.add(task.stageAttemptId)
val shouldAbortStage =
failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts ||
disallowStageRetryForTest

if (shouldAbortStage) {
val abortMessage = if (disallowStageRetryForTest) {
"Fetch failure will not retry stage due to testing config"
Expand Down Expand Up @@ -1485,7 +1487,10 @@ class DAGScheduler(
/**
* Marks a stage as finished and removes it from the list of running stages.
*/
private def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = {
private def markStageAsFinished(
stage: Stage,
errorMessage: Option[String] = None,
willRetry: Boolean = false): Unit = {
val serviceTime = stage.latestInfo.submissionTime match {
case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
case _ => "Unknown"
Expand All @@ -1504,7 +1509,9 @@ class DAGScheduler(
logInfo(s"$stage (${stage.name}) failed in $serviceTime s due to ${errorMessage.get}")
}

outputCommitCoordinator.stageEnd(stage.id)
if (!willRetry) {
outputCommitCoordinator.stageEnd(stage.id)
}
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
runningStages -= stage
}
Expand Down
Loading