Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, s: InputSplit)
*
* @param sc The SparkContext to associate the RDD with.
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
* variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job.
* Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
* variable references an instance of JobConf, then that JobConf will be used for the Hadoop job.
* Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
* creates.
* @param inputFormatClass Storage format of the data to be read.
Expand Down Expand Up @@ -123,7 +123,7 @@ class HadoopRDD[K, V](
sc,
sc.broadcast(new SerializableConfiguration(conf))
.asInstanceOf[Broadcast[SerializableConfiguration]],
None /* initLocalJobConfFuncOpt */,
initLocalJobConfFuncOpt = None,
inputFormatClass,
keyClass,
valueClass,
Expand Down Expand Up @@ -184,8 +184,9 @@ class HadoopRDD[K, V](
protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
.asInstanceOf[InputFormat[K, V]]
if (newInputFormat.isInstanceOf[Configurable]) {
newInputFormat.asInstanceOf[Configurable].setConf(conf)
newInputFormat match {
case c: Configurable => c.setConf(conf)
case _ =>
}
newInputFormat
}
Expand All @@ -195,9 +196,6 @@ class HadoopRDD[K, V](
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can this be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a duplication since getInputFormat does exactly this.

if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(jobConf)
}
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,7 @@ class DAGScheduler(
}

/**
* Submit an action job to the scheduler and get a JobWaiter object back. The JobWaiter object
* can be used to block until the the job finishes executing or can be used to cancel the job.
* Submit an action job to the scheduler.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
Expand All @@ -551,6 +550,11 @@ class DAGScheduler(
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @return a JobWaiter object that can be used to block until the job finishes executing
* or can be used to cancel the job.
*
* @throws IllegalArgumentException when partitions ids are illegal
*/
def submitJob[T, U](
rdd: RDD[T],
Expand Down Expand Up @@ -584,7 +588,7 @@ class DAGScheduler(

/**
* Run an action job on the given RDD and pass all the results to the resultHandler function as
* they arrive. Throws an exception if the job fials, or returns normally if successful.
* they arrive.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
Expand All @@ -593,6 +597,8 @@ class DAGScheduler(
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @throws Exception when the job fails
*/
def runJob[T, U](
rdd: RDD[T],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.ShuffleWriter

/**
* A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner
* specified in the ShuffleDependency).
*
* See [[org.apache.spark.scheduler.Task]] for more information.
*
* A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner
* specified in the ShuffleDependency).
*
* See [[org.apache.spark.scheduler.Task]] for more information.
*
* @param stageId id of the stage this task belongs to
* @param taskBinary broadcast version of the RDD and the ShuffleDependency. Once deserialized,
* the type should be (RDD[_], ShuffleDependency[_, _, _]).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[spark] class TaskSet(
val stageAttemptId: Int,
val priority: Int,
val properties: Properties) {
val id: String = stageId + "." + stageAttemptId
val id: String = stageId + "." + stageAttemptId

override def toString: String = "TaskSet " + id
}