Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 10 additions & 19 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,26 @@
* limitations under the License.
*/

package org.apache.hadoop.mapred
package org.apache.spark
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm... interesting. I wonder if this compiles in all versions of Hadoop? It was originally in this package to access package-private methods, but it seems that in Hadoop 1.0.4, at least, any such methods have been made public. Would be cool if we could move it to our stuff. We could also then change the class to be private[spark] and get rid of the private[apache] stuff.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I successfully compiled against 0.23.7, 2.2.0, and 1.0.4, so I think we're good to go on this change (especially since it's intended to be an internal helper). Would you mind changing the private[apache]s to private[spark]s and update the class comment? That'd be sweet.


import java.io.IOException
import java.text.NumberFormat
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

import org.apache.spark.Logging
import org.apache.spark.SerializableWritable
import org.apache.spark.rdd.HadoopRDD

/**
* Internal helper class that saves an RDD using a Hadoop OutputFormat. This is only public
* because we need to access this class from the `spark` package to use some package-private Hadoop
* functions, but this class should not be used directly by users.
* Internal helper class that saves an RDD using a Hadoop OutputFormat.
*
* Saves the RDD using a JobConf, which should contain an output key class, an output value class,
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
private[apache]
private[spark]
class SparkHadoopWriter(@transient jobConf: JobConf)
extends Logging
with SparkHadoopMapRedUtil
Expand All @@ -59,7 +57,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)

def preSetup() {
setIDs(0, 0, 0)
setConfParams()
HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value)

val jCtxt = getJobContext()
getOutputCommitter().setupJob(jCtxt)
Expand All @@ -68,7 +66,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)

def setup(jobid: Int, splitid: Int, attemptid: Int) {
setIDs(jobid, splitid, attemptid)
setConfParams()
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(now),
jobid, splitID, attemptID, conf.value)
}

def open() {
Expand Down Expand Up @@ -167,21 +166,13 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
taID = new SerializableWritable[TaskAttemptID](
new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
}

private def setConfParams() {
conf.value.set("mapred.job.id", jID.value.toString)
conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
conf.value.set("mapred.task.id", taID.value.toString)
conf.value.setBoolean("mapred.task.is.map", true)
conf.value.setInt("mapred.task.partition", splitID)
}
}

private[apache]
private[spark]
object SparkHadoopWriter {
def createJobID(time: Date, id: Int): JobID = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val jobtrackerID = formatter.format(time)
new JobID(jobtrackerID, id)
}

Expand Down
25 changes: 24 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.rdd

import java.text.SimpleDateFormat
import java.util.Date
import java.io.EOFException
import scala.collection.immutable.Map

Expand All @@ -27,6 +29,9 @@ import org.apache.hadoop.mapred.InputSplit
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.mapred.JobID
import org.apache.hadoop.mapred.TaskAttemptID
import org.apache.hadoop.mapred.TaskID
import org.apache.hadoop.util.ReflectionUtils

import org.apache.spark._
Expand Down Expand Up @@ -111,6 +116,9 @@ class HadoopRDD[K, V](

protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)

// used to build JobTracker ID
private val createTime = new Date()

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
Expand Down Expand Up @@ -165,12 +173,14 @@ class HadoopRDD[K, V](

override def compute(theSplit: Partition, context: TaskContext) = {
val iter = new NextIterator[(K, V)] {

val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null

val jobConf = getJobConf()
val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

// Register an on-task-completion callback to close the input stream.
Expand Down Expand Up @@ -222,4 +232,17 @@ private[spark] object HadoopRDD {

def putCachedMetadata(key: String, value: Any) =
SparkEnv.get.hadoopJobMetadata.put(key, value)

/** Add Hadoop configuration specific to a single partition and attempt. */
def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, attemptId: Int,
conf: JobConf) {
val jobID = new JobID(jobTrackerId, jobId)
val taId = new TaskAttemptID(new TaskID(jobID, true, splitId), attemptId)

conf.set("mapred.tip.id", taId.getTaskID.toString)
conf.set("mapred.task.id", taId.toString)
conf.setBoolean("mapred.task.is.map", true)
conf.setInt("mapred.task.partition", splitId)
conf.set("mapred.job.id", jobID.toString)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob,
RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}

// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
import org.apache.hadoop.mapred.SparkHadoopWriter

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.SparkHadoopWriter
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.hadoop.mapred
package org.apache.spark

import java.io.IOException
import java.text.NumberFormat
Expand All @@ -25,16 +25,14 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
import org.apache.hadoop.hive.ql.plan.FileSinkDesc
import org.apache.hadoop.mapred._
import org.apache.hadoop.io.Writable

import org.apache.spark.Logging
import org.apache.spark.SerializableWritable

/**
* Internal helper class that saves an RDD using a Hive OutputFormat.
* It is based on [[SparkHadoopWriter]].
*/
protected[apache]
protected[spark]
class SparkHiveHadoopWriter(
@transient jobConf: JobConf,
fileSinkConf: FileSinkDesc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.{BooleanType, DataType}
import org.apache.spark.sql.execution._
import org.apache.spark.{TaskContext, SparkException}
import org.apache.spark.{SparkHiveHadoopWriter, TaskContext, SparkException}

/* Implicits */
import scala.collection.JavaConversions._
Expand Down