Skip to content
Closed
25 changes: 2 additions & 23 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import java.text.SimpleDateFormat
import java.util.{Date, Locale}

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred._
import org.apache.hadoop.mapreduce.TaskType

import org.apache.spark.internal.Logging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd move this file into spark/internal/io as well to be closer to SparkNewHadoopWriter.scala

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do it in a separate pr when you do the mapred committer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I'll do it later.

import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.util.SerializableJobConf
Expand Down Expand Up @@ -153,29 +153,8 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable {
splitID = splitid
attemptID = attemptid

jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid))
jID = new SerializableWritable[JobID](SparkHadoopWriterUtils.createJobID(now, jobid))
taID = new SerializableWritable[TaskAttemptID](
new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
}
}

private[spark]
object SparkHadoopWriter {
def createJobID(time: Date, id: Int): JobID = {
val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
val jobtrackerID = formatter.format(time)
new JobID(jobtrackerID, id)
}

def createPathFromString(path: String, conf: JobConf): Path = {
if (path == null) {
throw new IllegalArgumentException("Output path is null")
}
val outputPath = new Path(path)
val fs = outputPath.getFileSystem(conf)
if (fs == null) {
throw new IllegalArgumentException("Incorrectly formatted output path")
}
outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import org.apache.spark.SparkHadoopWriter
import org.apache.spark.internal.Logging
import org.apache.spark.mapred.SparkHadoopMapRedUtil

Expand Down Expand Up @@ -69,7 +68,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)

override def setupJob(jobContext: JobContext): Unit = {
// Setup IDs
val jobId = SparkHadoopWriter.createJobID(new Date, 0)
val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
val taskId = new TaskID(jobId, TaskType.MAP, 0)
val taskAttemptId = new TaskAttemptID(taskId, 0)

Expand Down Expand Up @@ -108,4 +107,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
override def abortTask(taskContext: TaskAttemptContext): Unit = {
committer.abortTask(taskContext)
}

/** Whether we are using a direct output committer */
def isDirectOutput(): Boolean = committer.getClass.getSimpleName.contains("Direct")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.internal.io

import java.text.SimpleDateFormat
import java.util.{Date, Locale}

import scala.reflect.ClassTag
import scala.util.DynamicVariable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.{JobConf, JobID}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import org.apache.spark.{SparkConf, SparkException, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.OutputMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{SerializableConfiguration, Utils}

/**
* A helper object that saves an RDD using a Hadoop OutputFormat
* (from the newer mapreduce API, not the old mapred API).
*/
private[spark]
object SparkHadoopMapReduceWriter extends Logging {

/**
* Basic work flow of this command is:
* 1. Driver side setup, prepare the data source and hadoop configuration for the write job to
* be issued.
* 2. Issues a write job consists of one or more executor side tasks, each of which writes all
* rows within an RDD partition.
* 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
* exception is thrown during task commitment, also aborts that task.
* 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
* thrown during job commitment, also aborts the job.
*/
def write[K, V: ClassTag](
rdd: RDD[(K, V)],
hadoopConf: Configuration): Unit = {
// Extract context and configuration from RDD.
val sparkContext = rdd.context
val stageId = rdd.id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this accurate? Seems weird that satgeId is set to be equal to rdd.id. What is the commit protocol here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It follows the previous behavior, what's your concern here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a test failure using Spark 2.2 that seems to happen after this commit. The failure didn't occur before using Spark 2.0. We thought there could be a problem since the jobContext passed to commiter.setupJob() (line 84) has a different JobID comparing to the task context that is passed to to commiter.setupTask() (line 126) in the former, it comes from rdd.id and in the later, its from stage.id. Just wanted to check what is the protocol here that requires such a difference? Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I investigated the problem a little more and I filed a jira & fix, see SPARK-22162. Thank you in advance.

val sparkConf = rdd.conf
val conf = new SerializableConfiguration(hadoopConf)

// Set up a job.
val jobTrackerId = SparkHadoopWriterUtils.createJobTrackerID(new Date())
val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP, 0, 0)
val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId)
val format = jobContext.getOutputFormatClass

if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(sparkConf)) {
// FileOutputFormat ignores the filesystem parameter
val jobFormat = format.newInstance
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is Configurable, we should invoke setConf on it - the earlier code was doing this.
The tests for that seem to have been removed as well.
Any reason why ?

Copy link
Contributor Author

@jiangxb1987 jiangxb1987 Nov 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic is to create a OutputCommitter from the given hadoop conf, which is now handled by HadoopMapReduceCommitProtocol, so this code and corresponding tests are no longer needed.
I've search the history code and failed to figure out why we were doing this...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I added the comment to the wrong location (though it is relevant here, it is probably less serious ?).
HadoopMapReduceCommitProtocol.setupCommitter() should be doing a setConf if OutputFormat is Configurable.

This needs to be fixed to ensure custom OutputFormat's work.
I see that the PR has already been committed - can you please file a bug and fix it ?
The test will also need to be re-added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh...I see the problem now. Will add that in a follow up, thank you for clarifying.

jobFormat.checkOutputSpecs(jobContext)
}

val committer = FileCommitProtocol.instantiate(
className = classOf[HadoopMapReduceCommitProtocol].getName,
jobId = stageId.toString,
outputPath = conf.value.get("mapred.output.dir"),
isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
committer.setupJob(jobContext)

// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.
if (SparkHadoopWriterUtils.isSpeculationEnabled(sparkConf) && committer.isDirectOutput) {
val warningMessage =
s"$committer may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use an output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}

// Try to write all RDD partitions as a Hadoop OutputFormat.
try {
val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
executeTask(
context = context,
jobTrackerId = jobTrackerId,
sparkStageId = context.stageId,
sparkPartitionId = context.partitionId,
sparkAttemptNumber = context.attemptNumber,
committer = committer,
hadoopConf = conf.value,
outputFormat = format.asInstanceOf[Class[OutputFormat[K, V]]],
iterator = iter)
})

committer.commitJob(jobContext, ret)
logInfo(s"Job ${jobContext.getJobID} committed.")
} catch {
case cause: Throwable =>
logError(s"Aborting job ${jobContext.getJobID}.", cause)
committer.abortJob(jobContext)
throw new SparkException("Job aborted.", cause)
}
}

/** Write a RDD partition out in a single Spark task. */
private def executeTask[K, V: ClassTag](
context: TaskContext,
jobTrackerId: String,
sparkStageId: Int,
sparkPartitionId: Int,
sparkAttemptNumber: Int,
committer: FileCommitProtocol,
hadoopConf: Configuration,
outputFormat: Class[_ <: OutputFormat[K, V]],
iterator: Iterator[(K, V)]): TaskCommitMessage = {
// Set up a task.
val attemptId = new TaskAttemptID(jobTrackerId, sparkStageId, TaskType.REDUCE,
sparkPartitionId, sparkAttemptNumber)
val taskContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
committer.setupTask(taskContext)

val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
SparkHadoopWriterUtils.initHadoopOutputMetrics(context)

// Initiate the writer.
val taskFormat = outputFormat.newInstance
val writer = taskFormat.getRecordWriter(taskContext)
.asInstanceOf[RecordWriter[K, V]]
require(writer != null, "Unable to obtain RecordWriter")
var recordsWritten = 0L

// Write all rows in RDD partition.
try {
val ret = Utils.tryWithSafeFinallyAndFailureCallbacks {
while (iterator.hasNext) {
val pair = iterator.next()
writer.write(pair._1, pair._2)

// Update bytes written metric every few records
SparkHadoopWriterUtils.maybeUpdateOutputMetrics(
outputMetricsAndBytesWrittenCallback, recordsWritten)
recordsWritten += 1
}

committer.commitTask(taskContext)
}(catchBlock = {
committer.abortTask(taskContext)
logError(s"Task ${taskContext.getTaskAttemptID} aborted.")
}, finallyBlock = writer.close(taskContext))

outputMetricsAndBytesWrittenCallback.foreach {
case (om, callback) =>
om.setBytesWritten(callback())
om.setRecordsWritten(recordsWritten)
}

ret
} catch {
case t: Throwable =>
throw new SparkException("Task failed while writing rows", t)
}
}
}

private[spark]
object SparkHadoopWriterUtils {

private val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256

def createJobID(time: Date, id: Int): JobID = {
val jobtrackerID = createJobTrackerID(time)
new JobID(jobtrackerID, id)
}

def createJobTrackerID(time: Date): String = {
new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time)
}

def createPathFromString(path: String, conf: JobConf): Path = {
if (path == null) {
throw new IllegalArgumentException("Output path is null")
}
val outputPath = new Path(path)
val fs = outputPath.getFileSystem(conf)
if (fs == null) {
throw new IllegalArgumentException("Incorrectly formatted output path")
}
outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
}

// Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation
// setting can take effect:
def isOutputSpecValidationEnabled(conf: SparkConf): Boolean = {
val validationDisabled = disableOutputSpecValidation.value
val enabledInConf = conf.getBoolean("spark.hadoop.validateOutputSpecs", true)
enabledInConf && !validationDisabled
}

def isSpeculationEnabled(conf: SparkConf): Boolean = {
conf.getBoolean("spark.speculation", false)
}

// TODO: these don't seem like the right abstractions.
// We should abstract the duplicate code in a less awkward way.

// return type: (output metrics, bytes written callback), defined only if the latter is defined
def initHadoopOutputMetrics(
context: TaskContext): Option[(OutputMetrics, () => Long)] = {
val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
bytesWrittenCallback.map { b =>
(context.taskMetrics().outputMetrics, b)
}
}

def maybeUpdateOutputMetrics(
outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)],
recordsWritten: Long): Unit = {
if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
outputMetricsAndBytesWrittenCallback.foreach {
case (om, callback) =>
om.setBytesWritten(callback())
om.setRecordsWritten(recordsWritten)
}
}
}

/**
* Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case
* basis; see SPARK-4835 for more details.
*/
val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
}
Loading