diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index 380b3ef3d5336..ad723300490f1 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -1,6 +1,6 @@
Package: SparkR
Type: Package
-Version: 2.2.2
+Version: 2.2.3
Title: R Frontend for Apache Spark
Description: Provides an R Frontend for Apache Spark.
Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index daa855b2459f2..22a99e223e3bb 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -196,7 +196,7 @@ sparkR.sparkContext <- function(
# Don't use readString() so that we can provide a useful
# error message if the R and Java versions are mismatched.
- authSecretLen = readInt(f)
+ authSecretLen <- readInt(f)
if (length(authSecretLen) == 0 || authSecretLen == 0) {
stop("Unexpected EOF in JVM connection data. Mismatched versions?")
}
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 1f51b8413fa05..9f61553545712 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../pom.xml
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 082756dbac777..95043024ef5ca 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index 477d717098cd4..2cc2b21da42ed 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index aa169bde7149a..24598bd4529cc 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml
index d74470297151d..8173bc39e813b 100644
--- a/common/sketch/pom.xml
+++ b/common/sketch/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/common/tags/pom.xml b/common/tags/pom.xml
index a13eac83e9728..c7fee37d36623 100644
--- a/common/tags/pom.xml
+++ b/common/tags/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml
index 712bff0ed75ad..3ae6e0620eb4f 100644
--- a/common/unsafe/pom.xml
+++ b/common/unsafe/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/core/pom.xml b/core/pom.xml
index 42ea1ca35011a..0ebc92b1b4c2f 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../pom.xml
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
index 29163e7f30546..48e8c65673136 100644
--- a/core/src/main/scala/org/apache/spark/SSLOptions.scala
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -126,7 +126,7 @@ private[spark] case class SSLOptions(
}
/** Returns a string representation of this SSLOptions with all the passwords masked. */
- override def toString: String = s"SSLOptions{enabled=$enabled, " +
+ override def toString: String = s"SSLOptions{enabled=$enabled, port=$port, " +
s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"
@@ -140,6 +140,7 @@ private[spark] object SSLOptions extends Logging {
*
* The following settings are allowed:
* $ - `[ns].enabled` - `true` or `false`, to enable or disable SSL respectively
+ * $ - `[ns].port` - the port where to bind the SSL server
* $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
* $ - `[ns].keyStorePassword` - a password to the key-store file
* $ - `[ns].keyPassword` - a password to the private key
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 0b87cd503d4fa..69739745aa6cf 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -66,7 +66,7 @@ object TaskContext {
* An empty task context that does not represent an actual task. This is only used in tests.
*/
private[spark] def empty(): TaskContextImpl = {
- new TaskContextImpl(0, 0, 0, 0, null, new Properties, null)
+ new TaskContextImpl(0, 0, 0, 0, 0, null, new Properties, null)
}
}
@@ -150,6 +150,13 @@ abstract class TaskContext extends Serializable {
*/
def stageId(): Int
+ /**
+ * How many times the stage that this task belongs to has been attempted. The first stage attempt
+ * will be assigned stageAttemptNumber = 0, and subsequent attempts will have increasing attempt
+ * numbers.
+ */
+ def stageAttemptNumber(): Int
+
/**
* The ID of the RDD partition that is computed by this task.
*/
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 01d8973e1bb06..cccd3ea457ba4 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -41,8 +41,9 @@ import org.apache.spark.util._
* `TaskMetrics` & `MetricsSystem` objects are not thread safe.
*/
private[spark] class TaskContextImpl(
- val stageId: Int,
- val partitionId: Int,
+ override val stageId: Int,
+ override val stageAttemptNumber: Int,
+ override val partitionId: Int,
override val taskAttemptId: Long,
override val attemptNumber: Int,
override val taskMemoryManager: TaskMemoryManager,
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index d9c8fda99ef97..967cf14ad3535 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -151,7 +151,6 @@ class HistoryServer(
completed: Boolean) {
assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs")
ui.getHandlers.foreach(attachHandler)
- addFilters(ui.getHandlers, conf)
}
/** Detach a reconstructed UI from this server. Only valid after bind(). */
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index db4c9f9d07e08..f65a9d750c513 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -247,7 +247,7 @@ package object config {
"a property key or value, the value is redacted from the environment UI and various logs " +
"like YARN and event logs.")
.regexConf
- .createWithDefault("(?i)secret|password|url|user|username".r)
+ .createWithDefault("(?i)secret|password".r)
private[spark] val STRING_REDACTION_PATTERN =
ConfigBuilder("spark.redaction.string.regex")
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
index 3b0a15848cd3b..dd72f94303666 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
@@ -86,12 +86,16 @@ object SparkHadoopMapReduceWriter extends Logging {
// Try to write all RDD partitions as a Hadoop OutputFormat.
try {
val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
+ // SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers.
+ // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently.
+ val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber
+
executeTask(
context = context,
jobTrackerId = jobTrackerId,
commitJobId = commitJobId,
sparkPartitionId = context.partitionId,
- sparkAttemptNumber = context.attemptNumber,
+ sparkAttemptNumber = attemptId,
committer = committer,
hadoopConf = conf.value,
outputFormat = format.asInstanceOf[Class[OutputFormat[K, V]]],
diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 764735dc4eae7..db8aff94ea1e1 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -69,9 +69,9 @@ object SparkHadoopMapRedUtil extends Logging {
if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
- val taskAttemptNumber = TaskContext.get().attemptNumber()
- val stageId = TaskContext.get().stageId()
- val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber)
+ val ctx = TaskContext.get()
+ val canCommit = outputCommitCoordinator.canCommit(ctx.stageId(), ctx.stageAttemptNumber(),
+ splitId, ctx.attemptNumber())
if (canCommit) {
performCommit()
@@ -81,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(message)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
committer.abortTask(mrTaskContext)
- throw new CommitDeniedException(message, stageId, splitId, taskAttemptNumber)
+ throw new CommitDeniedException(message, ctx.stageId(), splitId, ctx.attemptNumber())
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 533b11bab9715..6776e80761980 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1165,6 +1165,7 @@ class DAGScheduler(
outputCommitCoordinator.taskCompleted(
stageId,
+ task.stageAttemptId,
task.partitionId,
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)
@@ -1323,23 +1324,24 @@ class DAGScheduler(
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
} else {
+ failedStage.fetchFailedAttemptIds.add(task.stageAttemptId)
+ val shouldAbortStage =
+ failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts ||
+ disallowStageRetryForTest
+
// It is likely that we receive multiple FetchFailed for a single stage (because we have
// multiple tasks running concurrently on different executors). In that case, it is
// possible the fetch failure has already been handled by the scheduler.
if (runningStages.contains(failedStage)) {
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
s"due to a fetch failure from $mapStage (${mapStage.name})")
- markStageAsFinished(failedStage, Some(failureMessage))
+ markStageAsFinished(failedStage, errorMessage = Some(failureMessage),
+ willRetry = !shouldAbortStage)
} else {
logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " +
s"longer running")
}
- failedStage.fetchFailedAttemptIds.add(task.stageAttemptId)
- val shouldAbortStage =
- failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts ||
- disallowStageRetryForTest
-
if (shouldAbortStage) {
val abortMessage = if (disallowStageRetryForTest) {
"Fetch failure will not retry stage due to testing config"
@@ -1485,7 +1487,10 @@ class DAGScheduler(
/**
* Marks a stage as finished and removes it from the list of running stages.
*/
- private def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = {
+ private def markStageAsFinished(
+ stage: Stage,
+ errorMessage: Option[String] = None,
+ willRetry: Boolean = false): Unit = {
val serviceTime = stage.latestInfo.submissionTime match {
case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
case _ => "Unknown"
@@ -1504,7 +1509,9 @@ class DAGScheduler(
logInfo(s"$stage (${stage.name}) failed in $serviceTime s due to ${errorMessage.get}")
}
- outputCommitCoordinator.stageEnd(stage.id)
+ if (!willRetry) {
+ outputCommitCoordinator.stageEnd(stage.id)
+ }
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
runningStages -= stage
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 83d87b548a430..b382d623806e2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -27,7 +27,11 @@ import org.apache.spark.util.{RpcUtils, ThreadUtils}
private sealed trait OutputCommitCoordinationMessage extends Serializable
private case object StopCoordinator extends OutputCommitCoordinationMessage
-private case class AskPermissionToCommitOutput(stage: Int, partition: Int, attemptNumber: Int)
+private case class AskPermissionToCommitOutput(
+ stage: Int,
+ stageAttempt: Int,
+ partition: Int,
+ attemptNumber: Int)
/**
* Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins"
@@ -45,13 +49,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
// Initialized by SparkEnv
var coordinatorRef: Option[RpcEndpointRef] = None
- private type StageId = Int
- private type PartitionId = Int
- private type TaskAttemptNumber = Int
- private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1
+ // Class used to identify a committer. The task ID for a committer is implicitly defined by
+ // the partition being processed, but the coordinator needs to keep track of both the stage
+ // attempt and the task attempt, because in some situations the same task may be running
+ // concurrently in two different attempts of the same stage.
+ private case class TaskIdentifier(stageAttempt: Int, taskAttempt: Int)
+
private case class StageState(numPartitions: Int) {
- val authorizedCommitters = Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER)
- val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]()
+ val authorizedCommitters = Array.fill[TaskIdentifier](numPartitions)(null)
+ val failures = mutable.Map[Int, mutable.Set[TaskIdentifier]]()
}
/**
@@ -64,7 +70,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
*
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
*/
- private val stageStates = mutable.Map[StageId, StageState]()
+ private val stageStates = mutable.Map[Int, StageState]()
/**
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
@@ -87,10 +93,11 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
* @return true if this task is authorized to commit, false otherwise
*/
def canCommit(
- stage: StageId,
- partition: PartitionId,
- attemptNumber: TaskAttemptNumber): Boolean = {
- val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
+ stage: Int,
+ stageAttempt: Int,
+ partition: Int,
+ attemptNumber: Int): Boolean = {
+ val msg = AskPermissionToCommitOutput(stage, stageAttempt, partition, attemptNumber)
coordinatorRef match {
case Some(endpointRef) =>
ThreadUtils.awaitResult(endpointRef.ask[Boolean](msg),
@@ -103,26 +110,35 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
}
/**
- * Called by the DAGScheduler when a stage starts.
+ * Called by the DAGScheduler when a stage starts. Initializes the stage's state if it hasn't
+ * yet been initialized.
*
* @param stage the stage id.
* @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e.
* the maximum possible value of `context.partitionId`).
*/
- private[scheduler] def stageStart(stage: StageId, maxPartitionId: Int): Unit = synchronized {
- stageStates(stage) = new StageState(maxPartitionId + 1)
+ private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized {
+ stageStates.get(stage) match {
+ case Some(state) =>
+ require(state.authorizedCommitters.length == maxPartitionId + 1)
+ logInfo(s"Reusing state from previous attempt of stage $stage.")
+
+ case _ =>
+ stageStates(stage) = new StageState(maxPartitionId + 1)
+ }
}
// Called by DAGScheduler
- private[scheduler] def stageEnd(stage: StageId): Unit = synchronized {
+ private[scheduler] def stageEnd(stage: Int): Unit = synchronized {
stageStates.remove(stage)
}
// Called by DAGScheduler
private[scheduler] def taskCompleted(
- stage: StageId,
- partition: PartitionId,
- attemptNumber: TaskAttemptNumber,
+ stage: Int,
+ stageAttempt: Int,
+ partition: Int,
+ attemptNumber: Int,
reason: TaskEndReason): Unit = synchronized {
val stageState = stageStates.getOrElse(stage, {
logDebug(s"Ignoring task completion for completed stage")
@@ -131,16 +147,17 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
reason match {
case Success =>
// The task output has been committed successfully
- case denied: TaskCommitDenied =>
- logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
- s"attempt: $attemptNumber")
- case otherReason =>
+ case _: TaskCommitDenied =>
+ logInfo(s"Task was denied committing, stage: $stage.$stageAttempt, " +
+ s"partition: $partition, attempt: $attemptNumber")
+ case _ =>
// Mark the attempt as failed to blacklist from future commit protocol
- stageState.failures.getOrElseUpdate(partition, mutable.Set()) += attemptNumber
- if (stageState.authorizedCommitters(partition) == attemptNumber) {
+ val taskId = TaskIdentifier(stageAttempt, attemptNumber)
+ stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId
+ if (stageState.authorizedCommitters(partition) == taskId) {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
- stageState.authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER
+ stageState.authorizedCommitters(partition) = null
}
}
}
@@ -155,47 +172,41 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
// Marked private[scheduler] instead of private so this can be mocked in tests
private[scheduler] def handleAskPermissionToCommit(
- stage: StageId,
- partition: PartitionId,
- attemptNumber: TaskAttemptNumber): Boolean = synchronized {
+ stage: Int,
+ stageAttempt: Int,
+ partition: Int,
+ attemptNumber: Int): Boolean = synchronized {
stageStates.get(stage) match {
- case Some(state) if attemptFailed(state, partition, attemptNumber) =>
- logInfo(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage," +
- s" partition=$partition as task attempt $attemptNumber has already failed.")
+ case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) =>
+ logInfo(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " +
+ s"task attempt $attemptNumber already marked as failed.")
false
case Some(state) =>
- state.authorizedCommitters(partition) match {
- case NO_AUTHORIZED_COMMITTER =>
- logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
- s"partition=$partition")
- state.authorizedCommitters(partition) = attemptNumber
- true
- case existingCommitter =>
- // Coordinator should be idempotent when receiving AskPermissionToCommit.
- if (existingCommitter == attemptNumber) {
- logWarning(s"Authorizing duplicate request to commit for " +
- s"attemptNumber=$attemptNumber to commit for stage=$stage," +
- s" partition=$partition; existingCommitter = $existingCommitter." +
- s" This can indicate dropped network traffic.")
- true
- } else {
- logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
- s"partition=$partition; existingCommitter = $existingCommitter")
- false
- }
+ val existing = state.authorizedCommitters(partition)
+ if (existing == null) {
+ logDebug(s"Commit allowed for stage=$stage.$stageAttempt, partition=$partition, " +
+ s"task attempt $attemptNumber")
+ state.authorizedCommitters(partition) = TaskIdentifier(stageAttempt, attemptNumber)
+ true
+ } else {
+ logDebug(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " +
+ s"already committed by $existing")
+ false
}
case None =>
- logDebug(s"Stage $stage has completed, so not allowing" +
- s" attempt number $attemptNumber of partition $partition to commit")
+ logDebug(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " +
+ "stage already marked as completed.")
false
}
}
private def attemptFailed(
stageState: StageState,
- partition: PartitionId,
- attempt: TaskAttemptNumber): Boolean = synchronized {
- stageState.failures.get(partition).exists(_.contains(attempt))
+ stageAttempt: Int,
+ partition: Int,
+ attempt: Int): Boolean = synchronized {
+ val failInfo = TaskIdentifier(stageAttempt, attempt)
+ stageState.failures.get(partition).exists(_.contains(failInfo))
}
}
@@ -215,9 +226,10 @@ private[spark] object OutputCommitCoordinator {
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case AskPermissionToCommitOutput(stage, partition, attemptNumber) =>
+ case AskPermissionToCommitOutput(stage, stageAttempt, partition, attemptNumber) =>
context.reply(
- outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, attemptNumber))
+ outputCommitCoordinator.handleAskPermissionToCommit(stage, stageAttempt, partition,
+ attemptNumber))
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 7767ef1803a06..f536fc2a5f0a1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -79,6 +79,7 @@ private[spark] abstract class Task[T](
SparkEnv.get.blockManager.registerTask(taskAttemptId)
context = new TaskContextImpl(
stageId,
+ stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal
partitionId,
taskAttemptId,
attemptNumber,
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 7df1de530cff9..3e0b62dc8aba8 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -250,7 +250,7 @@ private[spark] object JettyUtils extends Logging {
filters.foreach {
case filter : String =>
if (!filter.isEmpty) {
- logInfo("Adding filter: " + filter)
+ logInfo(s"Adding filter $filter to ${handlers.map(_.getContextPath).mkString(", ")}.")
val holder : FilterHolder = new FilterHolder()
holder.setClassName(filter)
// Get any parameters for each filter
@@ -393,7 +393,7 @@ private[spark] object JettyUtils extends Logging {
}
pool.setMaxThreads(math.max(pool.getMaxThreads, minThreads))
- ServerInfo(server, httpPort, securePort, collection)
+ ServerInfo(server, httpPort, securePort, conf, collection)
} catch {
case e: Exception =>
server.stop()
@@ -492,10 +492,12 @@ private[spark] case class ServerInfo(
server: Server,
boundPort: Int,
securePort: Option[Int],
+ conf: SparkConf,
private val rootHandler: ContextHandlerCollection) {
- def addHandler(handler: ContextHandler): Unit = {
+ def addHandler(handler: ServletContextHandler): Unit = {
handler.setVirtualHosts(JettyUtils.toVirtualHosts(JettyUtils.SPARK_CONNECTOR_NAME))
+ JettyUtils.addFilters(Seq(handler), conf)
rootHandler.addHandler(handler)
if (!handler.isStarted()) {
handler.start()
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTable.scala
index 2da4baf5efc41..78a4f3d07c51f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTable.scala
@@ -34,7 +34,9 @@ private[ui] class JobsTable(
progressListener: JobProgressListener,
killEnabled: Boolean) {
// stripXSS is called to remove suspicious characters used in XSS attacks
- val allParameters = request.getParameterMap.asScala.toMap.mapValues(_.map(UIUtils.stripXSS))
+ val allParameters = request.getParameterMap.asScala.toMap.map { case (k, v) =>
+ UIUtils.stripXSS(k) -> v.map(UIUtils.stripXSS).toSeq
+ }
val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag))
.map(para => para._1 + "=" + para._2(0))
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index a28daf7f90451..741f95ae26428 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -43,7 +43,9 @@ private[ui] class StageTableBase(
killEnabled: Boolean,
isFailedStage: Boolean) {
// stripXSS is called to remove suspicious characters used in XSS attacks
- val allParameters = request.getParameterMap.asScala.toMap.mapValues(_.map(UIUtils.stripXSS))
+ val allParameters = request.getParameterMap.asScala.toMap.map { case (k, v) =>
+ UIUtils.stripXSS(k) -> v.map(UIUtils.stripXSS).toSeq
+ }
val parameterOtherTable = allParameters.filterNot(_._1.startsWith(stageTag))
.map(para => para._1 + "=" + para._2(0))
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 6bcaf102d9680..626b65679a276 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2625,6 +2625,17 @@ private[spark] object Utils extends Logging {
}
}
+ /**
+ * Redact the sensitive values in the given map. If a map key matches the redaction pattern then
+ * its value is replaced with a dummy text.
+ */
+ def redact(regex: Option[Regex], kvs: Seq[(String, String)]): Seq[(String, String)] = {
+ regex match {
+ case None => kvs
+ case Some(r) => redact(r, kvs)
+ }
+ }
+
private def redact(redactionPattern: Regex, kvs: Seq[(String, String)]): Seq[(String, String)] = {
// If the sensitive information regex matches with either the key or the value, redact the value
// While the original intent was to only redact the value if the key matched with the regex,
diff --git a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
index 94f5805853e1e..f8e233a05a447 100644
--- a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
+++ b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
@@ -38,6 +38,7 @@ public static void test() {
tc.attemptNumber();
tc.partitionId();
tc.stageId();
+ tc.stageAttemptNumber();
tc.taskAttemptId();
}
@@ -51,6 +52,7 @@ public void onTaskCompletion(TaskContext context) {
context.isCompleted();
context.isInterrupted();
context.stageId();
+ context.stageAttemptNumber();
context.partitionId();
context.addTaskCompletionListener(this);
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 58b865969f517..3b564df8a44bc 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -336,14 +336,14 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
// first attempt -- its successful
val writer1 = manager.getWriter[Int, Int](shuffleHandle, 0,
- new TaskContextImpl(0, 0, 0L, 0, taskMemoryManager, new Properties, metricsSystem))
+ new TaskContextImpl(0, 0, 0, 0L, 0, taskMemoryManager, new Properties, metricsSystem))
val data1 = (1 to 10).map { x => x -> x}
// second attempt -- also successful. We'll write out different data,
// just to simulate the fact that the records may get written differently
// depending on what gets spilled, what gets combined, etc.
val writer2 = manager.getWriter[Int, Int](shuffleHandle, 0,
- new TaskContextImpl(0, 0, 1L, 0, taskMemoryManager, new Properties, metricsSystem))
+ new TaskContextImpl(0, 0, 0, 1L, 0, taskMemoryManager, new Properties, metricsSystem))
val data2 = (11 to 20).map { x => x -> x}
// interleave writes of both attempts -- we want to test that both attempts can occur
@@ -371,7 +371,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1,
- new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem))
+ new TaskContextImpl(1, 0, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem))
val readData = reader.read().toIndexedSeq
assert(readData === data1.toIndexedSeq || readData === data2.toIndexedSeq)
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala b/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala
index 362cd861cc248..dcf89e4f75acf 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala
@@ -29,6 +29,7 @@ object MemoryTestingUtils {
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, 0)
new TaskContextImpl(
stageId = 0,
+ stageAttemptNumber = 0,
partitionId = 0,
taskAttemptId = 0,
attemptNumber = 0,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index e51e6a0d3ff6b..742b84192a210 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -33,6 +33,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark._
import org.apache.spark.internal.io.SparkHadoopWriter
import org.apache.spark.rdd.{FakeOutputCommitter, RDD}
+import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.util.{ThreadUtils, Utils}
/**
@@ -151,7 +152,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
test("Job should not complete if all commits are denied") {
// Create a mock OutputCommitCoordinator that denies all attempts to commit
doReturn(false).when(outputCommitCoordinator).handleAskPermissionToCommit(
- Matchers.any(), Matchers.any(), Matchers.any())
+ Matchers.any(), Matchers.any(), Matchers.any(), Matchers.any())
val rdd: RDD[Int] = sc.parallelize(Seq(1), 1)
def resultHandler(x: Int, y: Unit): Unit = {}
val futureAction: SimpleFutureAction[Unit] = sc.submitJob[Int, Unit, Unit](rdd,
@@ -167,45 +168,106 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") {
val stage: Int = 1
+ val stageAttempt: Int = 1
val partition: Int = 2
val authorizedCommitter: Int = 3
val nonAuthorizedCommitter: Int = 100
outputCommitCoordinator.stageStart(stage, maxPartitionId = 2)
- assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter))
- assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter))
+ assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, authorizedCommitter))
+ assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
+ nonAuthorizedCommitter))
// The non-authorized committer fails
- outputCommitCoordinator.taskCompleted(
- stage, partition, attemptNumber = nonAuthorizedCommitter, reason = TaskKilled("test"))
+ outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
+ attemptNumber = nonAuthorizedCommitter, reason = TaskKilled("test"))
// New tasks should still not be able to commit because the authorized committer has not failed
- assert(
- !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 1))
+ assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
+ nonAuthorizedCommitter + 1))
// The authorized committer now fails, clearing the lock
- outputCommitCoordinator.taskCompleted(
- stage, partition, attemptNumber = authorizedCommitter, reason = TaskKilled("test"))
+ outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
+ attemptNumber = authorizedCommitter, reason = TaskKilled("test"))
// A new task should now be allowed to become the authorized committer
- assert(
- outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 2))
+ assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
+ nonAuthorizedCommitter + 2))
// There can only be one authorized committer
- assert(
- !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
- }
-
- test("Duplicate calls to canCommit from the authorized committer gets idempotent responses.") {
- val rdd = sc.parallelize(Seq(1), 1)
- sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _,
- 0 until rdd.partitions.size)
+ assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
+ nonAuthorizedCommitter + 3))
}
test("SPARK-19631: Do not allow failed attempts to be authorized for committing") {
val stage: Int = 1
+ val stageAttempt: Int = 1
val partition: Int = 1
val failedAttempt: Int = 0
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
- outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = failedAttempt,
+ outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
+ attemptNumber = failedAttempt,
reason = ExecutorLostFailure("0", exitCausedByApp = true, None))
- assert(!outputCommitCoordinator.canCommit(stage, partition, failedAttempt))
- assert(outputCommitCoordinator.canCommit(stage, partition, failedAttempt + 1))
+ assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition, failedAttempt))
+ assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, failedAttempt + 1))
+ }
+
+ test("SPARK-24589: Differentiate tasks from different stage attempts") {
+ var stage = 1
+ val taskAttempt = 1
+ val partition = 1
+
+ outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
+ assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
+ assert(!outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt))
+
+ // Fail the task in the first attempt, the task in the second attempt should succeed.
+ stage += 1
+ outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
+ outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
+ ExecutorLostFailure("0", exitCausedByApp = true, None))
+ assert(!outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
+ assert(outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt))
+
+ // Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit,
+ // then fail the 1st attempt and make sure the 4th one can commit again.
+ stage += 1
+ outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
+ assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
+ outputCommitCoordinator.taskCompleted(stage, 2, partition, taskAttempt,
+ ExecutorLostFailure("0", exitCausedByApp = true, None))
+ assert(!outputCommitCoordinator.canCommit(stage, 3, partition, taskAttempt))
+ outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
+ ExecutorLostFailure("0", exitCausedByApp = true, None))
+ assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))
+ }
+
+ test("SPARK-24589: Make sure stage state is cleaned up") {
+ // Normal application without stage failures.
+ sc.parallelize(1 to 100, 100)
+ .map { i => (i % 10, i) }
+ .reduceByKey(_ + _)
+ .collect()
+
+ assert(sc.dagScheduler.outputCommitCoordinator.isEmpty)
+
+ // Force failures in a few tasks so that a stage is retried. Collect the ID of the failing
+ // stage so that we can check the state of the output committer.
+ val retriedStage = sc.parallelize(1 to 100, 10)
+ .map { i => (i % 10, i) }
+ .reduceByKey { case (_, _) =>
+ val ctx = TaskContext.get()
+ if (ctx.stageAttemptNumber() == 0) {
+ throw new FetchFailedException(SparkEnv.get.blockManager.blockManagerId, 1, 1, 1,
+ new Exception("Failure for test."))
+ } else {
+ ctx.stageId()
+ }
+ }
+ .collect()
+ .map { case (k, v) => v }
+ .toSet
+
+ assert(retriedStage.size === 1)
+ assert(sc.dagScheduler.outputCommitCoordinator.isEmpty)
+ verify(sc.env.outputCommitCoordinator, times(2))
+ .stageStart(Matchers.eq(retriedStage.head), Matchers.any())
+ verify(sc.env.outputCommitCoordinator).stageEnd(Matchers.eq(retriedStage.head))
}
}
@@ -239,16 +301,6 @@ private case class OutputCommitFunctions(tempDirPath: String) {
if (ctx.attemptNumber == 0) failingOutputCommitter else successfulOutputCommitter)
}
- // Receiver should be idempotent for AskPermissionToCommitOutput
- def callCanCommitMultipleTimes(iter: Iterator[Int]): Unit = {
- val ctx = TaskContext.get()
- val canCommit1 = SparkEnv.get.outputCommitCoordinator
- .canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
- val canCommit2 = SparkEnv.get.outputCommitCoordinator
- .canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
- assert(canCommit1 && canCommit2)
- }
-
private def runCommitWithProvidedCommitter(
ctx: TaskContext,
iter: Iterator[Int],
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 992d3396d203f..7a560f45b44e5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.source.JvmSource
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD
+import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.util._
class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkContext {
@@ -145,6 +146,30 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
assert(attemptIdsWithFailedTask.toSet === Set(0, 1))
}
+ test("TaskContext.stageAttemptNumber getter") {
+ sc = new SparkContext("local[1,2]", "test")
+
+ // Check stageAttemptNumbers are 0 for initial stage
+ val stageAttemptNumbers = sc.parallelize(Seq(1, 2), 2).mapPartitions { _ =>
+ Seq(TaskContext.get().stageAttemptNumber()).iterator
+ }.collect()
+ assert(stageAttemptNumbers.toSet === Set(0))
+
+ // Check stageAttemptNumbers that are resubmitted when tasks have FetchFailedException
+ val stageAttemptNumbersWithFailedStage =
+ sc.parallelize(Seq(1, 2, 3, 4), 4).repartition(1).mapPartitions { _ =>
+ val stageAttemptNumber = TaskContext.get().stageAttemptNumber()
+ if (stageAttemptNumber < 2) {
+ // Throw FetchFailedException to explicitly trigger stage resubmission. A normal exception
+ // will only trigger task resubmission in the same stage.
+ throw new FetchFailedException(null, 0, 0, 0, "Fake")
+ }
+ Seq(stageAttemptNumber).iterator
+ }.collect()
+
+ assert(stageAttemptNumbersWithFailedStage.toSet === Set(2))
+ }
+
test("accumulators are updated on exception failures") {
// This means use 1 core and 4 max task failures
sc = new SparkContext("local[1,4]", "test")
@@ -177,7 +202,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
// accumulator updates from it.
val taskMetrics = TaskMetrics.empty
val task = new Task[Int](0, 0, 0) {
- context = new TaskContextImpl(0, 0, 0L, 0,
+ context = new TaskContextImpl(0, 0, 0, 0L, 0,
new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
new Properties,
SparkEnv.get.metricsSystem,
@@ -200,7 +225,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
// accumulator updates from it.
val taskMetrics = TaskMetrics.registered
val task = new Task[Int](0, 0, 0) {
- context = new TaskContextImpl(0, 0, 0L, 0,
+ context = new TaskContextImpl(0, 0, 0, 0L, 0,
new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
new Properties,
SparkEnv.get.metricsSystem,
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index 917db766f7f11..9c0699bc981f8 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -62,7 +62,7 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
private def withTaskId[T](taskAttemptId: Long)(block: => T): T = {
try {
TaskContext.setTaskContext(
- new TaskContextImpl(0, 0, taskAttemptId, 0, null, new Properties, null))
+ new TaskContextImpl(0, 0, 0, taskAttemptId, 0, null, new Properties, null))
block
} finally {
TaskContext.unset()
diff --git a/docs/_config.yml b/docs/_config.yml
index a78e2d5b8de0f..320b077874e5d 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -14,8 +14,8 @@ include:
# These allow the documentation to be updated with newer releases
# of Spark, Scala, and Mesos.
-SPARK_VERSION: 2.2.2-SNAPSHOT
-SPARK_VERSION_SHORT: 2.2.2
+SPARK_VERSION: 2.2.3-SNAPSHOT
+SPARK_VERSION_SHORT: 2.2.3
SCALA_BINARY_VERSION: "2.11"
SCALA_VERSION: "2.11.8"
MESOS_VERSION: 1.0.0
diff --git a/examples/pom.xml b/examples/pom.xml
index ca721144f97d3..df8081f951721 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../pom.xml
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index 8c7c7ce386573..b393be16d7e55 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index 44b95a247d78b..91abcdc817dc5 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 3e7f52018d239..81b35b870b1a5 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml
index e75d677853eeb..5fa491b90d3ae 100644
--- a/external/kafka-0-10-assembly/pom.xml
+++ b/external/kafka-0-10-assembly/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml
index 96a5e3ad10b32..52f5bb6a2ad12 100644
--- a/external/kafka-0-10-sql/pom.xml
+++ b/external/kafka-0-10-sql/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml
index 3c60c3dd5f681..230870d27fa97 100644
--- a/external/kafka-0-10/pom.xml
+++ b/external/kafka-0-10/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/external/kafka-0-8-assembly/pom.xml b/external/kafka-0-8-assembly/pom.xml
index da6d55933b9f5..08dc43aed3728 100644
--- a/external/kafka-0-8-assembly/pom.xml
+++ b/external/kafka-0-8-assembly/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/external/kafka-0-8/pom.xml b/external/kafka-0-8/pom.xml
index 2dae424743f98..de42a17fe95ba 100644
--- a/external/kafka-0-8/pom.xml
+++ b/external/kafka-0-8/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/external/kinesis-asl-assembly/pom.xml b/external/kinesis-asl-assembly/pom.xml
index acf64a4f3efc2..81a9a6b200b90 100644
--- a/external/kinesis-asl-assembly/pom.xml
+++ b/external/kinesis-asl-assembly/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/external/kinesis-asl/pom.xml b/external/kinesis-asl/pom.xml
index 756de1028af4c..0eddd3343936a 100644
--- a/external/kinesis-asl/pom.xml
+++ b/external/kinesis-asl/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/graphx/pom.xml b/graphx/pom.xml
index 22d90f988f50b..b15fc43d56494 100644
--- a/graphx/pom.xml
+++ b/graphx/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../pom.xml
diff --git a/launcher/pom.xml b/launcher/pom.xml
index 13f69b99c3480..6217ebbc37c9c 100644
--- a/launcher/pom.xml
+++ b/launcher/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../pom.xml
diff --git a/mllib-local/pom.xml b/mllib-local/pom.xml
index 7be865d167d83..f785e128bbf42 100644
--- a/mllib-local/pom.xml
+++ b/mllib-local/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../pom.xml
diff --git a/mllib/pom.xml b/mllib/pom.xml
index c7d9bc1e64e8e..68477eee6041b 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../pom.xml
diff --git a/pom.xml b/pom.xml
index dde3af24d45db..930307ada2e20 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
pom
Spark Project Parent POM
http://spark.apache.org/
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index feae76a087dec..3eec622fb99b7 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,9 @@ object MimaExcludes {
// Exclude rules for 2.2.x
lazy val v22excludes = v21excludes ++ Seq(
+ // [SPARK-22897] Expose stageAttemptId in TaskContext
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.stageAttemptNumber"),
+
// [SPARK-19652][UI] Do auth checks for REST API access.
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.history.HistoryServer.withSparkUI"),
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.status.api.v1.UIRootFromServletContext"),
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index aa46995ab4aac..9bc9d2adfe004 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -713,7 +713,8 @@ object Unidoc {
scalacOptions in (ScalaUnidoc, unidoc) ++= Seq(
"-groups", // Group similar methods together based on the @group annotation.
- "-skip-packages", "org.apache.hadoop"
+ "-skip-packages", "org.apache.hadoop",
+ "-sourcepath", (baseDirectory in ThisBuild).value.getAbsolutePath
) ++ (
// Add links to sources when generating Scaladoc for a non-snapshot release
if (!isSnapshot.value) {
diff --git a/python/pyspark/version.py b/python/pyspark/version.py
index 5b36495eff276..5ba4f7b3b2ab3 100644
--- a/python/pyspark/version.py
+++ b/python/pyspark/version.py
@@ -16,4 +16,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-__version__ = "2.2.2.dev0"
+__version__ = "2.2.3.dev0"
diff --git a/repl/pom.xml b/repl/pom.xml
index ddd51b1aa659d..b14bf22c15048 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../pom.xml
diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml
index 7d9bdbfc16243..76135157a5fe2 100644
--- a/resource-managers/yarn/pom.xml
+++ b/resource-managers/yarn/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 68dba3f8f5a3a..19fb0d9e08ed3 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 499ad980b84c9..74c33962e40cf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.util.Utils
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -825,6 +826,15 @@ object SQLConf {
.intConf
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
+ val SQL_OPTIONS_REDACTION_PATTERN =
+ buildConf("spark.sql.redaction.options.regex")
+ .doc("Regex to decide which keys in a Spark SQL command's options map contain sensitive " +
+ "information. The values of options whose names that match this regex will be redacted " +
+ "in the explain output. This redaction is applied on top of the global redaction " +
+ s"configuration defined by ${SECRET_REDACTION_PATTERN.key}.")
+ .regexConf
+ .createWithDefault("(?i)url".r)
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
@@ -1190,6 +1200,17 @@ class SQLConf extends Serializable with Logging {
}.toSeq
}
+ /**
+ * Redacts the given option map according to the description of SQL_OPTIONS_REDACTION_PATTERN.
+ */
+ def redactOptions(options: Map[String, String]): Map[String, String] = {
+ val regexes = Seq(
+ getConf(SQL_OPTIONS_REDACTION_PATTERN),
+ SECRET_REDACTION_PATTERN.readFrom(reader))
+
+ regexes.foldLeft(options.toSeq) { case (opts, r) => Utils.redact(Some(r), opts) }.toMap
+ }
+
/**
* Return whether a given key is set in this [[SQLConf]].
*/
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index b3dcc0553d13b..f6efd549107ba 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
index 53868d41b7c44..b92684c5d3807 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
@@ -17,12 +17,10 @@
package org.apache.spark.sql.execution.datasources
-import org.apache.spark.SparkEnv
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.util.Utils
/**
* Saves the results of `query` in to a data source.
@@ -53,7 +51,9 @@ case class SaveIntoDataSourceCommand(
}
override def simpleString: String = {
- val redacted = Utils.redact(SparkEnv.get.conf, options.toSeq).toMap
- s"SaveIntoDataSourceCommand ${provider}, ${partitionColumns}, ${redacted}, ${mode}"
+ val redacted = SparkSession.getActiveSession
+ .map(_.sessionState.conf.redactOptions(options))
+ .getOrElse(Map())
+ s"SaveIntoDataSourceCommand ${provider}, ${redacted}, ${mode}"
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
index fb632cf2bb70e..09879690e5f98 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
@@ -326,8 +326,8 @@ private[sql] object JsonInferSchema {
ArrayType(compatibleType(elementType1, elementType2), containsNull1 || containsNull2)
// The case that given `DecimalType` is capable of given `IntegralType` is handled in
- // `findTightestCommonTypeOfTwo`. Both cases below will be executed only when
- // the given `DecimalType` is not capable of the given `IntegralType`.
+ // `findTightestCommonType`. Both cases below will be executed only when the given
+ // `DecimalType` is not capable of the given `IntegralType`.
case (t1: IntegralType, t2: DecimalType) =>
compatibleType(DecimalType.forType(t1), t2)
case (t1: DecimalType, t2: IntegralType) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 30a54d820c162..7d5275a5c3ec6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -538,7 +538,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
def append(key: Long, row: UnsafeRow): Unit = {
val sizeInBytes = row.getSizeInBytes
if (sizeInBytes >= (1 << SIZE_BITS)) {
- sys.error("Does not support row that is larger than 256M")
+ throw new UnsupportedOperationException("Does not support row that is larger than 256M")
}
if (key < minKey) {
@@ -548,19 +548,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
maxKey = key
}
- // There is 8 bytes for the pointer to next value
- if (cursor + 8 + row.getSizeInBytes > page.length * 8L + Platform.LONG_ARRAY_OFFSET) {
- val used = page.length
- if (used >= (1 << 30)) {
- sys.error("Can not build a HashedRelation that is larger than 8G")
- }
- ensureAcquireMemory(used * 8L * 2)
- val newPage = new Array[Long](used * 2)
- Platform.copyMemory(page, Platform.LONG_ARRAY_OFFSET, newPage, Platform.LONG_ARRAY_OFFSET,
- cursor - Platform.LONG_ARRAY_OFFSET)
- page = newPage
- freeMemory(used * 8L)
- }
+ grow(row.getSizeInBytes)
// copy the bytes of UnsafeRow
val offset = cursor
@@ -593,7 +581,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
growArray()
} else if (numKeys > array.length / 2 * 0.75) {
// The fill ratio should be less than 0.75
- sys.error("Cannot build HashedRelation with more than 1/3 billions unique keys")
+ throw new UnsupportedOperationException(
+ "Cannot build HashedRelation with more than 1/3 billions unique keys")
}
}
} else {
@@ -604,6 +593,25 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
}
}
+ private def grow(inputRowSize: Int): Unit = {
+ // There is 8 bytes for the pointer to next value
+ val neededNumWords = (cursor - Platform.LONG_ARRAY_OFFSET + 8 + inputRowSize + 7) / 8
+ if (neededNumWords > page.length) {
+ if (neededNumWords > (1 << 30)) {
+ throw new UnsupportedOperationException(
+ "Can not build a HashedRelation that is larger than 8G")
+ }
+ val newNumWords = math.max(neededNumWords, math.min(page.length * 2, 1 << 30))
+ ensureAcquireMemory(newNumWords * 8L)
+ val newPage = new Array[Long](newNumWords.toInt)
+ Platform.copyMemory(page, Platform.LONG_ARRAY_OFFSET, newPage, Platform.LONG_ARRAY_OFFSET,
+ cursor - Platform.LONG_ARRAY_OFFSET)
+ val used = page.length
+ page = newPage
+ freeMemory(used * 8L)
+ }
+ }
+
private def growArray(): Unit = {
var old_array = array
val n = array.length
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index 6cf18de0cc768..6c222a08bfe1f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -69,6 +69,7 @@ class UnsafeFixedWidthAggregationMapSuite
TaskContext.setTaskContext(new TaskContextImpl(
stageId = 0,
+ stageAttemptNumber = 0,
partitionId = 0,
taskAttemptId = Random.nextInt(10000),
attemptNumber = 0,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 57ef468c39abe..3a81fbaa6e104 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -117,6 +117,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
val taskMemMgr = new TaskMemoryManager(memoryManager, 0)
TaskContext.setTaskContext(new TaskContextImpl(
stageId = 0,
+ stageAttemptNumber = 0,
partitionId = 0,
taskAttemptId = 98456,
attemptNumber = 0,
@@ -230,7 +231,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
// Make sure we can successfully create a UnsafeKVExternalSorter with a `BytesToBytesMap`
// which has duplicated keys and the number of entries exceeds its capacity.
try {
- TaskContext.setTaskContext(new TaskContextImpl(0, 0, 0, 0, taskMemoryManager, null, null))
+ TaskContext.setTaskContext(new TaskContextImpl(0, 0, 0, 0, 0, taskMemoryManager, null, null))
new UnsafeKVExternalSorter(
schema,
schema,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 53105e0b24959..c3ecf5208d59e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -114,7 +114,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
(i, converter(Row(i)))
}
val taskMemoryManager = new TaskMemoryManager(sc.env.memoryManager, 0)
- val taskContext = new TaskContextImpl(0, 0, 0, 0, taskMemoryManager, new Properties, null)
+ val taskContext = new TaskContextImpl(0, 0, 0, 0, 0, taskMemoryManager, new Properties, null)
val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
taskContext,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationStoreSuite.scala
index bc9cb6ec2e771..3a9b34d7533b6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationStoreSuite.scala
@@ -35,7 +35,8 @@ class SortBasedAggregationStoreSuite extends SparkFunSuite with LocalSparkConte
val conf = new SparkConf()
sc = new SparkContext("local[2, 4]", "test", conf)
val taskManager = new TaskMemoryManager(new TestMemoryManager(conf), 0)
- TaskContext.setTaskContext(new TaskContextImpl(0, 0, 0, 0, taskManager, new Properties, null))
+ TaskContext.setTaskContext(
+ new TaskContextImpl(0, 0, 0, 0, 0, taskManager, new Properties, null))
}
override def afterAll(): Unit = TaskContext.unset()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommandSuite.scala
index 6b9ddb1b481c7..cf340d0ab4a76 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommandSuite.scala
@@ -23,9 +23,6 @@ import org.apache.spark.sql.test.SharedSQLContext
class SaveIntoDataSourceCommandSuite extends SharedSQLContext {
- override protected def sparkConf: SparkConf = super.sparkConf
- .set("spark.redaction.regex", "(?i)password|url")
-
test("simpleString is redacted") {
val URL = "connection.url"
val PASS = "123"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index ede63fea9606f..f0288c84eec38 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
+import org.apache.spark.sql.types._
import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.collection.CompactBuffer
@@ -253,6 +253,30 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
map.free()
}
+ test("SPARK-24257: insert big values into LongToUnsafeRowMap") {
+ val taskMemoryManager = new TaskMemoryManager(
+ new StaticMemoryManager(
+ new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+ Long.MaxValue,
+ Long.MaxValue,
+ 1),
+ 0)
+ val unsafeProj = UnsafeProjection.create(Array[DataType](StringType))
+ val map = new LongToUnsafeRowMap(taskMemoryManager, 1)
+
+ val key = 0L
+ // the page array is initialized with length 1 << 17 (1M bytes),
+ // so here we need a value larger than 1 << 18 (2M bytes), to trigger the bug
+ val bigStr = UTF8String.fromString("x" * (1 << 19))
+
+ map.append(key, unsafeProj(InternalRow(bigStr)))
+ map.optimize()
+
+ val resultRow = new UnsafeRow(1)
+ assert(map.getValue(key, resultRow).getUTF8String(0) === bigStr)
+ map.free()
+ }
+
test("Spark-14521") {
val ser = new KryoSerializer(
(new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance()
diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml
index 2858513d4c226..edea2cc07f913 100644
--- a/sql/hive-thriftserver/pom.xml
+++ b/sql/hive-thriftserver/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index 8a17f6377cb05..df62260235796 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../../pom.xml
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index 2b37047612dfe..92c27e833673e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -160,7 +160,7 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
object PROCESS_TABLES extends QueryTest with SQLTestUtils {
// Tests the latest version of every release line.
- val testingVersions = Seq("2.0.2", "2.1.2", "2.2.0")
+ val testingVersions = Seq("2.0.2", "2.1.2", "2.2.1")
protected var spark: SparkSession = _
diff --git a/streaming/pom.xml b/streaming/pom.xml
index cef68db1ae8ad..f35c2d9c508aa 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../pom.xml
diff --git a/tools/pom.xml b/tools/pom.xml
index 21b24f1b13e34..84f30353cb663 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-csd-14-SNAPSHOT
+ 2.2.2-csd-1-SNAPSHOT
../pom.xml