Skip to content

Commit 8ba8e72

Browse files
committed
merging in master
2 parents 386b8db + d9b7f3e commit 8ba8e72

File tree

88 files changed

+1496
-1090
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+1496
-1090
lines changed

core/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,16 @@
331331
<artifactId>scalacheck_${scala.binary.version}</artifactId>
332332
<scope>test</scope>
333333
</dependency>
334+
<dependency>
335+
<groupId>junit</groupId>
336+
<artifactId>junit</artifactId>
337+
<scope>test</scope>
338+
</dependency>
339+
<dependency>
340+
<groupId>com.novocode</groupId>
341+
<artifactId>junit-interface</artifactId>
342+
<scope>test</scope>
343+
</dependency>
334344
<dependency>
335345
<groupId>org.apache.curator</groupId>
336346
<artifactId>curator-test</artifactId>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import scala.util.control.NonFatal
3636
import org.apache.avro.Schema
3737
import org.apache.avro.generic.GenericRecord
3838
import org.apache.avro.mapred.{AvroJob, AvroWrapper, AvroInputFormat}
39+
import org.apache.commons.lang.SerializationUtils
3940
import org.apache.hadoop.conf.Configuration
4041
import org.apache.hadoop.fs.Path
4142
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
@@ -350,8 +351,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
350351
private[spark] var checkpointDir: Option[String] = None
351352

352353
// Thread Local variable that can be used by users to pass information down the stack
353-
private val localProperties = new InheritableThreadLocal[Properties] {
354-
override protected def childValue(parent: Properties): Properties = new Properties(parent)
354+
protected[spark] val localProperties = new InheritableThreadLocal[Properties] {
355+
override protected def childValue(parent: Properties): Properties = {
356+
// Note: make a clone such that changes in the parent properties aren't reflected in
357+
// the those of the children threads, which has confusing semantics (SPARK-10563).
358+
SerializationUtils.clone(parent).asInstanceOf[Properties]
359+
}
355360
override protected def initialValue(): Properties = new Properties()
356361
}
357362

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,7 @@ class SparkHadoopWriter(jobConf: JobConf)
104104
}
105105

106106
def commit() {
107-
SparkHadoopMapRedUtil.commitTask(
108-
getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
107+
SparkHadoopMapRedUtil.commitTask(getOutputCommitter(), getTaskContext(), jobID, splitID)
109108
}
110109

111110
def commitJob() {

core/src/main/scala/org/apache/spark/TaskEndReason.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,12 @@ case object TaskKilled extends TaskFailedReason {
193193
* Task requested the driver to commit, but was denied.
194194
*/
195195
@DeveloperApi
196-
case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason {
196+
case class TaskCommitDenied(
197+
jobID: Int,
198+
partitionID: Int,
199+
attemptNumber: Int) extends TaskFailedReason {
197200
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
198-
s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
201+
s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber"
199202
/**
200203
* If a task failed because its attempt to commit was denied, do not count this failure
201204
* towards failing the stage. This is intended to prevent spurious stage failures in cases

core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ private[spark] class CommitDeniedException(
2626
msg: String,
2727
jobID: Int,
2828
splitID: Int,
29-
attemptID: Int)
29+
attemptNumber: Int)
3030
extends Exception(msg) {
3131

32-
def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptID)
32+
def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptNumber)
3333
}

core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,7 @@ object SparkHadoopMapRedUtil extends Logging {
9191
committer: MapReduceOutputCommitter,
9292
mrTaskContext: MapReduceTaskAttemptContext,
9393
jobId: Int,
94-
splitId: Int,
95-
attemptId: Int): Unit = {
94+
splitId: Int): Unit = {
9695

9796
val mrTaskAttemptID = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext)
9897

@@ -122,7 +121,8 @@ object SparkHadoopMapRedUtil extends Logging {
122121

123122
if (shouldCoordinateWithDriver) {
124123
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
125-
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, attemptId)
124+
val taskAttemptNumber = TaskContext.get().attemptNumber()
125+
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
126126

127127
if (canCommit) {
128128
performCommit()
@@ -132,7 +132,7 @@ object SparkHadoopMapRedUtil extends Logging {
132132
logInfo(message)
133133
// We need to abort the task so that the driver can reschedule new attempts, if necessary
134134
committer.abortTask(mrTaskContext)
135-
throw new CommitDeniedException(message, jobId, splitId, attemptId)
135+
throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber)
136136
}
137137
} else {
138138
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
@@ -143,16 +143,4 @@ object SparkHadoopMapRedUtil extends Logging {
143143
logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID")
144144
}
145145
}
146-
147-
def commitTask(
148-
committer: MapReduceOutputCommitter,
149-
mrTaskContext: MapReduceTaskAttemptContext,
150-
sparkTaskContext: TaskContext): Unit = {
151-
commitTask(
152-
committer,
153-
mrTaskContext,
154-
sparkTaskContext.stageId(),
155-
sparkTaskContext.partitionId(),
156-
sparkTaskContext.attemptNumber())
157-
}
158146
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 31 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -469,50 +469,44 @@ abstract class RDD[T: ClassTag](
469469
* @param seed seed for the random number generator
470470
* @return sample of specified size in an array
471471
*/
472-
// TODO: rewrite this without return statements so we can wrap it in a scope
473472
def takeSample(
474473
withReplacement: Boolean,
475474
num: Int,
476-
seed: Long = Utils.random.nextLong): Array[T] = {
475+
seed: Long = Utils.random.nextLong): Array[T] = withScope {
477476
val numStDev = 10.0
478477

479-
if (num < 0) {
480-
throw new IllegalArgumentException("Negative number of elements requested")
481-
} else if (num == 0) {
482-
return new Array[T](0)
483-
}
484-
485-
val initialCount = this.count()
486-
if (initialCount == 0) {
487-
return new Array[T](0)
488-
}
489-
490-
val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt
491-
if (num > maxSampleSize) {
492-
throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " +
493-
s"$numStDev * math.sqrt(Int.MaxValue)")
494-
}
495-
496-
val rand = new Random(seed)
497-
if (!withReplacement && num >= initialCount) {
498-
return Utils.randomizeInPlace(this.collect(), rand)
499-
}
500-
501-
val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
502-
withReplacement)
503-
504-
var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
478+
require(num >= 0, "Negative number of elements requested")
479+
require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt),
480+
"Cannot support a sample size > Int.MaxValue - " +
481+
s"$numStDev * math.sqrt(Int.MaxValue)")
505482

506-
// If the first sample didn't turn out large enough, keep trying to take samples;
507-
// this shouldn't happen often because we use a big multiplier for the initial size
508-
var numIters = 0
509-
while (samples.length < num) {
510-
logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
511-
samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
512-
numIters += 1
483+
if (num == 0) {
484+
new Array[T](0)
485+
} else {
486+
val initialCount = this.count()
487+
if (initialCount == 0) {
488+
new Array[T](0)
489+
} else {
490+
val rand = new Random(seed)
491+
if (!withReplacement && num >= initialCount) {
492+
Utils.randomizeInPlace(this.collect(), rand)
493+
} else {
494+
val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
495+
withReplacement)
496+
var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
497+
498+
// If the first sample didn't turn out large enough, keep trying to take samples;
499+
// this shouldn't happen often because we use a big multiplier for the initial size
500+
var numIters = 0
501+
while (samples.length < num) {
502+
logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
503+
samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
504+
numIters += 1
505+
}
506+
Utils.randomizeInPlace(samples, rand).take(num)
507+
}
508+
}
513509
}
514-
515-
Utils.randomizeInPlace(samples, rand).take(num)
516510
}
517511

518512
/**

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,8 +1128,11 @@ class DAGScheduler(
11281128
val stageId = task.stageId
11291129
val taskType = Utils.getFormattedClassName(task)
11301130

1131-
outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
1132-
event.taskInfo.attempt, event.reason)
1131+
outputCommitCoordinator.taskCompleted(
1132+
stageId,
1133+
task.partitionId,
1134+
event.taskInfo.attemptNumber, // this is a task attempt number
1135+
event.reason)
11331136

11341137
// The success case is dealt with separately below, since we need to compute accumulator
11351138
// updates before posting.

core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, RpcEndpoint
2525
private sealed trait OutputCommitCoordinationMessage extends Serializable
2626

2727
private case object StopCoordinator extends OutputCommitCoordinationMessage
28-
private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttempt: Long)
28+
private case class AskPermissionToCommitOutput(stage: Int, partition: Int, attemptNumber: Int)
2929

3030
/**
3131
* Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins"
@@ -44,8 +44,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
4444
var coordinatorRef: Option[RpcEndpointRef] = None
4545

4646
private type StageId = Int
47-
private type PartitionId = Long
48-
private type TaskAttemptId = Long
47+
private type PartitionId = Int
48+
private type TaskAttemptNumber = Int
4949

5050
/**
5151
* Map from active stages's id => partition id => task attempt with exclusive lock on committing
@@ -57,7 +57,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
5757
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
5858
*/
5959
private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
60-
private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptId]]
60+
private type CommittersByStageMap =
61+
mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptNumber]]
6162

6263
/**
6364
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
@@ -75,14 +76,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
7576
*
7677
* @param stage the stage number
7778
* @param partition the partition number
78-
* @param attempt a unique identifier for this task attempt
79+
* @param attemptNumber how many times this task has been attempted
80+
* (see [[TaskContext.attemptNumber()]])
7981
* @return true if this task is authorized to commit, false otherwise
8082
*/
8183
def canCommit(
8284
stage: StageId,
8385
partition: PartitionId,
84-
attempt: TaskAttemptId): Boolean = {
85-
val msg = AskPermissionToCommitOutput(stage, partition, attempt)
86+
attemptNumber: TaskAttemptNumber): Boolean = {
87+
val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
8688
coordinatorRef match {
8789
case Some(endpointRef) =>
8890
endpointRef.askWithRetry[Boolean](msg)
@@ -95,7 +97,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
9597

9698
// Called by DAGScheduler
9799
private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
98-
authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptId]()
100+
authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptNumber]()
99101
}
100102

101103
// Called by DAGScheduler
@@ -107,7 +109,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
107109
private[scheduler] def taskCompleted(
108110
stage: StageId,
109111
partition: PartitionId,
110-
attempt: TaskAttemptId,
112+
attemptNumber: TaskAttemptNumber,
111113
reason: TaskEndReason): Unit = synchronized {
112114
val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
113115
logDebug(s"Ignoring task completion for completed stage")
@@ -117,12 +119,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
117119
case Success =>
118120
// The task output has been committed successfully
119121
case denied: TaskCommitDenied =>
120-
logInfo(
121-
s"Task was denied committing, stage: $stage, partition: $partition, attempt: $attempt")
122+
logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
123+
s"attempt: $attemptNumber")
122124
case otherReason =>
123-
if (authorizedCommitters.get(partition).exists(_ == attempt)) {
124-
logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
125-
s" clearing lock")
125+
if (authorizedCommitters.get(partition).exists(_ == attemptNumber)) {
126+
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
127+
s"partition=$partition) failed; clearing lock")
126128
authorizedCommitters.remove(partition)
127129
}
128130
}
@@ -140,21 +142,23 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
140142
private[scheduler] def handleAskPermissionToCommit(
141143
stage: StageId,
142144
partition: PartitionId,
143-
attempt: TaskAttemptId): Boolean = synchronized {
145+
attemptNumber: TaskAttemptNumber): Boolean = synchronized {
144146
authorizedCommittersByStage.get(stage) match {
145147
case Some(authorizedCommitters) =>
146148
authorizedCommitters.get(partition) match {
147149
case Some(existingCommitter) =>
148-
logDebug(s"Denying $attempt to commit for stage=$stage, partition=$partition; " +
149-
s"existingCommitter = $existingCommitter")
150+
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
151+
s"partition=$partition; existingCommitter = $existingCommitter")
150152
false
151153
case None =>
152-
logDebug(s"Authorizing $attempt to commit for stage=$stage, partition=$partition")
153-
authorizedCommitters(partition) = attempt
154+
logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
155+
s"partition=$partition")
156+
authorizedCommitters(partition) = attemptNumber
154157
true
155158
}
156159
case None =>
157-
logDebug(s"Stage $stage has completed, so not allowing task attempt $attempt to commit")
160+
logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +
161+
s"partition $partition to commit")
158162
false
159163
}
160164
}
@@ -174,9 +178,9 @@ private[spark] object OutputCommitCoordinator {
174178
}
175179

176180
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
177-
case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
181+
case AskPermissionToCommitOutput(stage, partition, attemptNumber) =>
178182
context.reply(
179-
outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt))
183+
outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, attemptNumber))
180184
}
181185
}
182186
}

core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
2929
class TaskInfo(
3030
val taskId: Long,
3131
val index: Int,
32-
val attempt: Int,
32+
val attemptNumber: Int,
3333
val launchTime: Long,
3434
val executorId: String,
3535
val host: String,
@@ -95,7 +95,10 @@ class TaskInfo(
9595
}
9696
}
9797

98-
def id: String = s"$index.$attempt"
98+
@deprecated("Use attemptNumber", "1.6.0")
99+
def attempt: Int = attemptNumber
100+
101+
def id: String = s"$index.$attemptNumber"
99102

100103
def duration: Long = {
101104
if (!finished) {

0 commit comments

Comments
 (0)