From c3258d2b2b109b2d8fd3f2a8eeb80137fc2e8e34 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 7 Mar 2014 19:15:39 -0500 Subject: [PATCH 1/8] set hadoop task properties while using InputFormat we should keep jobId consistent --- .../org/apache/spark/SparkHadoopWriter.scala | 2 +- .../org/apache/spark/rdd/HadoopRDD.scala | 28 ++++++++++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index d404459a8eb7e..12df6cf8fe7c6 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -181,7 +181,7 @@ private[apache] object SparkHadoopWriter { def createJobID(time: Date, id: Int): JobID = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") - val jobtrackerID = formatter.format(new Date()) + val jobtrackerID = formatter.format(time) new JobID(jobtrackerID, id) } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 100ddb360732a..98282fc9adac0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -17,6 +17,8 @@ package org.apache.spark.rdd +import java.text.SimpleDateFormat +import java.util.Date import java.io.EOFException import scala.collection.immutable.Map @@ -27,6 +29,9 @@ import org.apache.hadoop.mapred.InputSplit import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.Reporter +import org.apache.hadoop.mapred.JobID +import org.apache.hadoop.mapred.TaskAttemptID +import org.apache.hadoop.mapred.TaskID import org.apache.hadoop.util.ReflectionUtils import org.apache.spark._ @@ -34,6 +39,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.NextIterator + /** * A Spark split class that wraps around a Hadoop InputSplit. */ @@ -111,6 +117,9 @@ class HadoopRDD[K, V]( protected val inputFormatCacheKey = "rdd_%d_input_format".format(id) + //used to build JT ID + protected val createTime = new Date() + // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { val conf: Configuration = broadcastedConf.value.value @@ -165,12 +174,29 @@ class HadoopRDD[K, V]( override def compute(theSplit: Partition, context: TaskContext) = { val iter = new NextIterator[(K, V)] { + + private def localizeConfiguration(conf: JobConf) { + //generate job id + val stageId = context.stageId + val dummyJobTrackerID = new SimpleDateFormat("yyyyMMddHHmm").format(createTime) + val jobId = new JobID(dummyJobTrackerID, stageId) + val splitID = theSplit.index + val attemptId = (context.attemptId % Int.MaxValue).toInt + val taId = new TaskAttemptID(new TaskID(jobId, true, splitID), attemptId) + + conf.set("mapred.tip.id", taId.getTaskID.toString) + conf.set("mapred.task.id", taId.toString) + conf.setBoolean("mapred.task.is.map", true) + conf.setInt("mapred.task.partition", splitID) + conf.set("mapred.job.id", jobId.toString) + } + val split = theSplit.asInstanceOf[HadoopPartition] logInfo("Input split: " + split.inputSplit) var reader: RecordReader[K, V] = null - val jobConf = getJobConf() val inputFormat = getInputFormat(jobConf) + localizeConfiguration(jobConf) reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) // Register an on-task-completion callback to close the input stream. From a3153a89d7ee5e0d1571162a0d235f2be4720ae2 Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Sat, 8 Mar 2014 09:09:12 -0500 Subject: [PATCH 2/8] style fix --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 98282fc9adac0..666b2a701123e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -117,7 +117,7 @@ class HadoopRDD[K, V]( protected val inputFormatCacheKey = "rdd_%d_input_format".format(id) - //used to build JT ID + // used to build JT ID protected val createTime = new Date() // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. From b7bdfa512dcdf20d26ade2ced60ad8d32b3ed6eb Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Sat, 8 Mar 2014 09:09:43 -0500 Subject: [PATCH 3/8] style fix --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 666b2a701123e..d498f0c28577e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -118,7 +118,7 @@ class HadoopRDD[K, V]( protected val inputFormatCacheKey = "rdd_%d_input_format".format(id) // used to build JT ID - protected val createTime = new Date() + private val createTime = new Date() // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { From 9bd1fe31057d6ddfd33374ac3c349453374119a0 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 12 Mar 2014 22:09:15 -0400 Subject: [PATCH 4/8] move configuration for jobConf to HadoopRDD --- .../org/apache/spark/SparkHadoopWriter.scala | 20 +++------ .../org/apache/spark/rdd/HadoopRDD.scala | 45 +++++++++++-------- .../apache/spark/rdd/PairRDDFunctions.scala | 7 ++- 3 files changed, 36 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 12df6cf8fe7c6..fcfda18811c9b 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -15,18 +15,19 @@ * limitations under the License. */ -package org.apache.hadoop.mapred +package org.apache.spark import java.io.IOException import java.text.NumberFormat import java.text.SimpleDateFormat import java.util.Date +import org.apache.hadoop.mapred._ import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path -import org.apache.spark.Logging -import org.apache.spark.SerializableWritable +import org.apache.spark.rdd.HadoopRDD + /** * Internal helper class that saves an RDD using a Hadoop OutputFormat. This is only public @@ -59,7 +60,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) def preSetup() { setIDs(0, 0, 0) - setConfParams() + HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value) val jCtxt = getJobContext() getOutputCommitter().setupJob(jCtxt) @@ -68,7 +69,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf) def setup(jobid: Int, splitid: Int, attemptid: Int) { setIDs(jobid, splitid, attemptid) - setConfParams() + HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(now), + jobid, splitID, attemptID, conf.value) } def open() { @@ -167,14 +169,6 @@ class SparkHadoopWriter(@transient jobConf: JobConf) taID = new SerializableWritable[TaskAttemptID]( new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID)) } - - private def setConfParams() { - conf.value.set("mapred.job.id", jID.value.toString) - conf.value.set("mapred.tip.id", taID.value.getTaskID.toString) - conf.value.set("mapred.task.id", taID.value.toString) - conf.value.setBoolean("mapred.task.is.map", true) - conf.value.setInt("mapred.task.partition", splitID) - } } private[apache] diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index d498f0c28577e..40c88e7308b6d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -39,7 +39,6 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.NextIterator - /** * A Spark split class that wraps around a Hadoop InputSplit. */ @@ -117,7 +116,7 @@ class HadoopRDD[K, V]( protected val inputFormatCacheKey = "rdd_%d_input_format".format(id) - // used to build JT ID + // used to build JobTracker ID private val createTime = new Date() // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. @@ -175,28 +174,13 @@ class HadoopRDD[K, V]( override def compute(theSplit: Partition, context: TaskContext) = { val iter = new NextIterator[(K, V)] { - private def localizeConfiguration(conf: JobConf) { - //generate job id - val stageId = context.stageId - val dummyJobTrackerID = new SimpleDateFormat("yyyyMMddHHmm").format(createTime) - val jobId = new JobID(dummyJobTrackerID, stageId) - val splitID = theSplit.index - val attemptId = (context.attemptId % Int.MaxValue).toInt - val taId = new TaskAttemptID(new TaskID(jobId, true, splitID), attemptId) - - conf.set("mapred.tip.id", taId.getTaskID.toString) - conf.set("mapred.task.id", taId.toString) - conf.setBoolean("mapred.task.is.map", true) - conf.setInt("mapred.task.partition", splitID) - conf.set("mapred.job.id", jobId.toString) - } - val split = theSplit.asInstanceOf[HadoopPartition] logInfo("Input split: " + split.inputSplit) var reader: RecordReader[K, V] = null val jobConf = getJobConf() val inputFormat = getInputFormat(jobConf) - localizeConfiguration(jobConf) + HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime), + context.stageId, theSplit.index, context.attemptId.toInt, jobConf) reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) // Register an on-task-completion callback to close the input stream. @@ -248,4 +232,27 @@ private[spark] object HadoopRDD { def putCachedMetadata(key: String, value: Any) = SparkEnv.get.hadoopJobMetadata.put(key, value) + + /** + * + * @param jtId + * @param jobId + * @param splitId + * @param attemptId + * @param conf + */ + def addLocalConfiguration(jtId: String, jobId: Int, splitId: Int, attemptId: Int, + conf: JobConf) { + // generate job id + //val stageId = context.stageId + val jobID = new JobID(jtId, jobId) + //val attemptId = (attemptId % Int.MaxValue).toInt + val taId = new TaskAttemptID(new TaskID(jobID, true, splitId), attemptId) + + conf.set("mapred.tip.id", taId.getTaskID.toString) + conf.set("mapred.task.id", taId.toString) + conf.setBoolean("mapred.task.is.map", true) + conf.setInt("mapred.task.partition", splitId) + conf.set("mapred.job.id", jobID.toString) + } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 75fc02acd1bce..14386ff5b9127 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -34,14 +34,13 @@ import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} -import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, +RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} -// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark. -import org.apache.hadoop.mapred.SparkHadoopWriter - import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.SparkHadoopWriter import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.SparkContext._ import org.apache.spark.partial.{BoundedDouble, PartialResult} From af889390ba3d0ec0bbc09c813099c887dc5a059e Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 13 Mar 2014 10:07:32 -0400 Subject: [PATCH 5/8] update the comments and permission of SparkHadoopWriter --- .../scala/org/apache/spark/SparkHadoopWriter.scala | 9 +++------ .../main/scala/org/apache/spark/rdd/HadoopRDD.scala | 12 ++---------- 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index fcfda18811c9b..3cb07357a8030 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -28,16 +28,13 @@ import org.apache.hadoop.fs.Path import org.apache.spark.rdd.HadoopRDD - /** - * Internal helper class that saves an RDD using a Hadoop OutputFormat. This is only public - * because we need to access this class from the `spark` package to use some package-private Hadoop - * functions, but this class should not be used directly by users. + * Internal helper class that saves an RDD using a Hadoop OutputFormat. * * Saves the RDD using a JobConf, which should contain an output key class, an output value class, * a filename to write to, etc, exactly like in a Hadoop MapReduce job. */ -private[apache] +private[spark] class SparkHadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil @@ -171,7 +168,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) } } -private[apache] +private[spark] object SparkHadoopWriter { def createJobID(time: Date, id: Int): JobID = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 40c88e7308b6d..a138e5d1dd8b1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -233,19 +233,11 @@ private[spark] object HadoopRDD { def putCachedMetadata(key: String, value: Any) = SparkEnv.get.hadoopJobMetadata.put(key, value) - /** - * - * @param jtId - * @param jobId - * @param splitId - * @param attemptId - * @param conf - */ - def addLocalConfiguration(jtId: String, jobId: Int, splitId: Int, attemptId: Int, + def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, attemptId: Int, conf: JobConf) { // generate job id //val stageId = context.stageId - val jobID = new JobID(jtId, jobId) + val jobID = new JobID(jobTrackerId, jobId) //val attemptId = (attemptId % Int.MaxValue).toInt val taId = new TaskAttemptID(new TaskID(jobID, true, splitId), attemptId) From 258f92c2f657f4c6c543815d75fef47a5a347d8d Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 13 Mar 2014 13:47:58 -0400 Subject: [PATCH 6/8] code cleanup --- core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 3cb07357a8030..b92ea01a877f7 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.rdd.HadoopRDD /** - * Internal helper class that saves an RDD using a Hadoop OutputFormat. + * Internal helper class that saves an RDD using a Hadoop OutputFormat. * * Saves the RDD using a JobConf, which should contain an output key class, an output value class, * a filename to write to, etc, exactly like in a Hadoop MapReduce job. diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index a138e5d1dd8b1..932ff5bf369c7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -233,12 +233,10 @@ private[spark] object HadoopRDD { def putCachedMetadata(key: String, value: Any) = SparkEnv.get.hadoopJobMetadata.put(key, value) + /** Add Hadoop configuration specific to a single partition and attempt. */ def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, attemptId: Int, conf: JobConf) { - // generate job id - //val stageId = context.stageId val jobID = new JobID(jobTrackerId, jobId) - //val attemptId = (attemptId % Int.MaxValue).toInt val taId = new TaskAttemptID(new TaskID(jobID, true, splitId), attemptId) conf.set("mapred.tip.id", taId.getTaskID.toString) From 5b1ad7d51d7d67e2c02c0322271787f26f5a0a45 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 24 Mar 2014 14:35:53 -0400 Subject: [PATCH 7/8] move SparkHiveHadoopWriter to org.apache.spark package --- .../apache/{hadoop/mapred => spark}/SparkHadoopWriter.scala | 6 ++---- .../scala/org/apache/spark/sql/hive/hiveOperators.scala | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) rename sql/hive/src/main/scala/org/apache/{hadoop/mapred => spark}/SparkHadoopWriter.scala (98%) diff --git a/sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala similarity index 98% rename from sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala rename to sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 0b3873191985b..bc6c9a8ef7925 100644 --- a/sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hadoop.mapred +package org.apache.spark import java.io.IOException import java.text.NumberFormat @@ -25,11 +25,9 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities} import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} import org.apache.hadoop.hive.ql.plan.FileSinkDesc +import org.apache.hadoop.mapred._ import org.apache.hadoop.io.Writable -import org.apache.spark.Logging -import org.apache.spark.SerializableWritable - /** * Internal helper class that saves an RDD using a Hive OutputFormat. * It is based on [[SparkHadoopWriter]]. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala index 9aa9e173a8367..78f69e7ff5731 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala @@ -35,7 +35,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.types.{BooleanType, DataType} import org.apache.spark.sql.execution._ -import org.apache.spark.{TaskContext, SparkException} +import org.apache.spark.{SparkHiveHadoopWriter, TaskContext, SparkException} /* Implicits */ import scala.collection.JavaConversions._ From ed0980ff523a7fbb267989db4540ba4d45ad3c45 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 24 Mar 2014 21:52:57 -0400 Subject: [PATCH 8/8] make SparkHiveHadoopWriter belongs to spark package --- .../src/main/scala/org/apache/spark/SparkHadoopWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index bc6c9a8ef7925..d96c2f70e0c74 100644 --- a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.io.Writable * Internal helper class that saves an RDD using a Hive OutputFormat. * It is based on [[SparkHadoopWriter]]. */ -protected[apache] +protected[spark] class SparkHiveHadoopWriter( @transient jobConf: JobConf, fileSinkConf: FileSinkDesc)