From a0426c8d5fdac5bb59faff3681c1180b24cb314e Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Fri, 4 Nov 2016 22:27:17 +0800 Subject: [PATCH 01/11] port RDD API to use commit protocol. --- .../org/apache/spark/SparkHadoopWriter.scala | 7 +- .../apache/spark/SparkNewHadoopWriter.scala | 132 ++++++++++++++++++ .../apache/spark/rdd/PairRDDFunctions.scala | 43 +++--- .../spark/rdd/PairRDDFunctionsSuite.scala | 15 -- 4 files changed, 160 insertions(+), 37 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/SparkNewHadoopWriter.scala diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 7f75a393bf8ff..39b49dc97e6d3 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -162,11 +162,14 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable { private[spark] object SparkHadoopWriter { def createJobID(time: Date, id: Int): JobID = { - val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) - val jobtrackerID = formatter.format(time) + val jobtrackerID = createJobTrackerID(time) new JobID(jobtrackerID, id) } + def createJobTrackerID(time: Date): String = { + new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time) + } + def createPathFromString(path: String, conf: JobConf): Path = { if (path == null) { throw new IllegalArgumentException("Output path is null") diff --git a/core/src/main/scala/org/apache/spark/SparkNewHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkNewHadoopWriter.scala new file mode 100644 index 0000000000000..1e409b2db45fa --- /dev/null +++ b/core/src/main/scala/org/apache/spark/SparkNewHadoopWriter.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.Date + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol +import org.apache.spark.util.SerializableConfiguration + +/** + * Internal helper class that saves an RDD using a Hadoop OutputFormat + * (from the newer mapreduce API, not the old mapred API). + * + * Saves the RDD using a JobConf, which should contain an output key class, an output value class, + * a filename to write to, etc, exactly like in a Hadoop MapReduce job. + * + * Use a [[HadoopMapReduceCommitProtocol]] to handle output commit, which, unlike Hadoop's + * OutputCommitter, is serializable. + */ +private[spark] +class SparkNewHadoopWriter( + jobConf: Configuration, + committer: HadoopMapReduceCommitProtocol) extends Logging with Serializable { + + private val now = new Date() + private val conf = new SerializableConfiguration(jobConf) + + private val jobtrackerID = SparkHadoopWriter.createJobTrackerID(new Date()) + private var jobId = 0 + private var splitId = 0 + private var attemptId = 0 + + @transient private var writer: RecordWriter[AnyRef, AnyRef] = null + @transient private var jobContext: JobContext = null + @transient private var taskContext: TaskAttemptContext = null + + def setupJob(): Unit = { + // Committer setup a job + committer.setupJob(getJobContext) + } + + def setupTask(context: TaskContext): Unit = { + // Set jobID/taskID + jobId = context.stageId + splitId = context.partitionId + attemptId = (context.taskAttemptId % Int.MaxValue).toInt + // Committer setup a task + committer.setupTask(getTaskContext(context)) + } + + def write(context: TaskContext, key: AnyRef, value: AnyRef): Unit = { + getWriter(context).write(key, value) + } + + def abortTask(context: TaskContext): Unit = { + // Close writer + getWriter(context).close(getTaskContext(context)) + // Committer abort a task + committer.abortTask(getTaskContext(context)) + } + + def commitTask(context: TaskContext): Unit = { + // Close writer + getWriter(context).close(getTaskContext(context)) + // Committer commit a task + committer.commitTask(getTaskContext(context)) + } + + def abortJob(): Unit = { + committer.abortJob(getJobContext) + } + + def commitJob() { + committer.commitJob(getJobContext, Seq.empty) + } + + // ********* Private Functions ********* + + /* + * Generate jobContext. Since jobContext is transient, it may be null after serialization. + */ + private def getJobContext(): JobContext = { + if (jobContext == null) { + val jobAttemptId = new TaskAttemptID(jobtrackerID, jobId, TaskType.MAP, 0, 0) + jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId) + } + jobContext + } + + /* + * Generate taskContext. Since taskContext is transient, it may be null after serialization. + */ + private def getTaskContext(context: TaskContext): TaskAttemptContext = { + if (taskContext == null) { + val attemptId = new TaskAttemptID(jobtrackerID, jobId, TaskType.REDUCE, splitId, + context.attemptNumber) + taskContext = new TaskAttemptContextImpl(conf.value, attemptId) + } + taskContext + } + + /* + * Generate writer. Since writer is transient, it may be null after serialization. + */ + private def getWriter(context: TaskContext): RecordWriter[AnyRef, AnyRef] = { + if (writer == null) { + val format = getJobContext.getOutputFormatClass.newInstance + writer = format.getRecordWriter(getTaskContext(context)) + .asInstanceOf[RecordWriter[AnyRef, AnyRef]] + } + writer + } +} diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 67baad1c51bca..e6cfc8efb82b6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -21,6 +21,8 @@ import java.nio.ByteBuffer import java.text.SimpleDateFormat import java.util.{Date, HashMap => JHashMap, Locale} +import org.apache.spark.internal.io.{HadoopMapReduceCommitProtocol, FileCommitProtocol} + import scala.collection.{mutable, Map} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -28,12 +30,12 @@ import scala.reflect.ClassTag import scala.util.DynamicVariable import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus -import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} -import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, TaskAttemptID, TaskType} +import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat, TaskAttemptID, TaskType} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ @@ -1092,37 +1094,38 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) jobFormat.checkOutputSpecs(job) } + // Instantiate writer + val committer = FileCommitProtocol.instantiate( + className = classOf[HadoopMapReduceCommitProtocol].getName, + jobId = stageId.toString, + outputPath = jobConfiguration.get("mapred.output.dir"), + isAppend = false + ).asInstanceOf[HadoopMapReduceCommitProtocol] + val writer = new SparkNewHadoopWriter(hadoopConf, committer) + val writeShard = (context: TaskContext, iter: Iterator[(K, V)]) => { - val config = wrappedConf.value - /* "reduce task" */ - val attemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.REDUCE, context.partitionId, - context.attemptNumber) - val hadoopContext = new TaskAttemptContextImpl(config, attemptId) - val format = outfmt.newInstance - format match { - case c: Configurable => c.setConf(config) - case _ => () - } - val committer = format.getOutputCommitter(hadoopContext) - committer.setupTask(hadoopContext) + writer.setupTask(context) val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = initHadoopOutputMetrics(context) - val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K, V]] require(writer != null, "Unable to obtain RecordWriter") var recordsWritten = 0L Utils.tryWithSafeFinallyAndFailureCallbacks { while (iter.hasNext) { val pair = iter.next() - writer.write(pair._1, pair._2) + writer.write(context, pair._1.asInstanceOf[AnyRef], pair._2.asInstanceOf[AnyRef]) // Update bytes written metric every few records maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten) recordsWritten += 1 } - }(finallyBlock = writer.close(hadoopContext)) - committer.commitTask(hadoopContext) + + writer.commitTask(context) + }(catchBlock = { + writer.abortTask(context) + writer.abortJob() + }) outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => om.setBytesWritten(callback()) om.setRecordsWritten(recordsWritten) @@ -1147,9 +1150,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) logWarning(warningMessage) } - jobCommitter.setupJob(jobTaskContext) + writer.setupJob() self.context.runJob(self, writeShard) - jobCommitter.commitJob(jobTaskContext) + writer.commitJob() } /** diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index b0d69de6e2ef4..24d9884a27184 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -509,21 +509,6 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { (2, ArrayBuffer(1)))) } - test("saveNewAPIHadoopFile should call setConf if format is configurable") { - val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) - - // No error, non-configurable formats still work - pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored") - - /* - Check that configurable formats get configured: - ConfigTestFormat throws an exception if we try to write - to it when setConf hasn't been called first. - Assertion is in ConfigTestFormat.getRecordWriter. - */ - pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored") - } - test("saveAsHadoopFile should respect configured output committers") { val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) val conf = new JobConf() From e017e1e501429f55ebdbbf98f8aaab8f53902a40 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Fri, 4 Nov 2016 22:36:28 +0800 Subject: [PATCH 02/11] update comment. --- .../scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 24d9884a27184..3acff934d9d70 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -710,8 +710,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } /* - These classes are fakes for testing - "saveNewAPIHadoopFile should call setConf if format is configurable". + These classes are fakes for testing saveAsHadoopFile/saveNewAPIHadoopFile. Unfortunately, they have to be top level classes, and not defined in the test method, because otherwise Scala won't generate no-args constructors and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile From 5e1285051c6ecc3020ee52b1f134f8c148bf2d65 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Fri, 4 Nov 2016 23:05:08 +0800 Subject: [PATCH 03/11] fix scala style failure. --- .../src/main/scala/org/apache/spark/SparkNewHadoopWriter.scala | 3 ++- .../src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 3 +-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkNewHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkNewHadoopWriter.scala index 1e409b2db45fa..657e73ac1c594 100644 --- a/core/src/main/scala/org/apache/spark/SparkNewHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkNewHadoopWriter.scala @@ -22,8 +22,9 @@ import java.util.Date import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.internal.Logging + import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol +import org.apache.spark.internal.Logging import org.apache.spark.util.SerializableConfiguration /** diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index e6cfc8efb82b6..a7ba7606128c5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -21,8 +21,6 @@ import java.nio.ByteBuffer import java.text.SimpleDateFormat import java.util.{Date, HashMap => JHashMap, Locale} -import org.apache.spark.internal.io.{HadoopMapReduceCommitProtocol, FileCommitProtocol} - import scala.collection.{mutable, Map} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -43,6 +41,7 @@ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.annotation.Experimental import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.OutputMetrics +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} import org.apache.spark.internal.Logging import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer From 4e72745da6e691d5f006184e85998e7519fab50b Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Sat, 5 Nov 2016 08:34:21 +0800 Subject: [PATCH 04/11] move SparkNewHadoopWriter to internal/io --- .../apache/spark/{ => internal/io}/SparkNewHadoopWriter.scala | 4 ++-- .../main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename core/src/main/scala/org/apache/spark/{ => internal/io}/SparkNewHadoopWriter.scala (97%) diff --git a/core/src/main/scala/org/apache/spark/SparkNewHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkNewHadoopWriter.scala similarity index 97% rename from core/src/main/scala/org/apache/spark/SparkNewHadoopWriter.scala rename to core/src/main/scala/org/apache/spark/internal/io/SparkNewHadoopWriter.scala index 657e73ac1c594..fdeb1767604a2 100644 --- a/core/src/main/scala/org/apache/spark/SparkNewHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkNewHadoopWriter.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark +package org.apache.spark.internal.io import java.util.Date @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol +import org.apache.spark.{SparkHadoopWriter, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.util.SerializableConfiguration diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index a7ba7606128c5..dc4ca1d1981e3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -41,7 +41,7 @@ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.annotation.Experimental import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.OutputMetrics -import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol, SparkNewHadoopWriter} import org.apache.spark.internal.Logging import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer From eb74e59d076b51cabed062f84dd50dc59cfa7452 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Sun, 6 Nov 2016 01:16:19 +0800 Subject: [PATCH 05/11] move logic in saveAsNewAPIHadoopDataset into SparkNewHadoopWriter --- .../org/apache/spark/SparkHadoopWriter.scala | 28 +- .../io/HadoopMapReduceCommitProtocol.scala | 5 +- .../internal/io/SparkNewHadoopWriter.scala | 268 +++++++++++++----- .../apache/spark/rdd/PairRDDFunctions.scala | 129 +-------- .../spark/rdd/PairRDDFunctionsSuite.scala | 2 +- .../datasources/FileFormatWriter.scala | 4 +- 6 files changed, 214 insertions(+), 222 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 39b49dc97e6d3..e863aca46c5b1 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -23,11 +23,11 @@ import java.text.SimpleDateFormat import java.util.{Date, Locale} import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred._ import org.apache.hadoop.mapreduce.TaskType import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.SparkNewHadoopWriterUtils import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.rdd.HadoopRDD import org.apache.spark.util.SerializableJobConf @@ -153,32 +153,8 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable { splitID = splitid attemptID = attemptid - jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid)) + jID = new SerializableWritable[JobID](SparkNewHadoopWriterUtils.createJobID(now, jobid)) taID = new SerializableWritable[TaskAttemptID]( new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID)) } } - -private[spark] -object SparkHadoopWriter { - def createJobID(time: Date, id: Int): JobID = { - val jobtrackerID = createJobTrackerID(time) - new JobID(jobtrackerID, id) - } - - def createJobTrackerID(time: Date): String = { - new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time) - } - - def createPathFromString(path: String, conf: JobConf): Path = { - if (path == null) { - throw new IllegalArgumentException("Output path is null") - } - val outputPath = new Path(path) - val fs = outputPath.getFileSystem(conf) - if (fs == null) { - throw new IllegalArgumentException("Incorrectly formatted output path") - } - outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - } -} diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 66ccb6d437708..31d21f930ba3b 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -69,7 +69,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) override def setupJob(jobContext: JobContext): Unit = { // Setup IDs - val jobId = SparkHadoopWriter.createJobID(new Date, 0) + val jobId = SparkNewHadoopWriterUtils.createJobID(new Date, 0) val taskId = new TaskID(jobId, TaskType.MAP, 0) val taskAttemptId = new TaskAttemptID(taskId, 0) @@ -108,4 +108,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) override def abortTask(taskContext: TaskAttemptContext): Unit = { committer.abortTask(taskContext) } + + /** Whether we are using a direct output committer */ + def isDirectOutput(): Boolean = committer.getClass.getSimpleName.contains("Direct") } diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkNewHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkNewHadoopWriter.scala index fdeb1767604a2..c8421504807d2 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkNewHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkNewHadoopWriter.scala @@ -17,117 +17,231 @@ package org.apache.spark.internal.io -import java.util.Date +import java.text.SimpleDateFormat +import java.util.{Locale, Date} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.{JobConf, JobID} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.OutputMetrics +import org.apache.spark.rdd.RDD -import org.apache.spark.{SparkHadoopWriter, TaskContext} +import org.apache.spark.{SparkConf, SparkException, SparkContext, TaskContext} import org.apache.spark.internal.Logging -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.{Utils, SerializableConfiguration} + +import scala.reflect.ClassTag +import scala.util.DynamicVariable /** - * Internal helper class that saves an RDD using a Hadoop OutputFormat + * A helper object that saves an RDD using a Hadoop OutputFormat * (from the newer mapreduce API, not the old mapred API). - * - * Saves the RDD using a JobConf, which should contain an output key class, an output value class, - * a filename to write to, etc, exactly like in a Hadoop MapReduce job. - * - * Use a [[HadoopMapReduceCommitProtocol]] to handle output commit, which, unlike Hadoop's - * OutputCommitter, is serializable. */ private[spark] -class SparkNewHadoopWriter( - jobConf: Configuration, - committer: HadoopMapReduceCommitProtocol) extends Logging with Serializable { - - private val now = new Date() - private val conf = new SerializableConfiguration(jobConf) +object SparkNewHadoopWriter extends Logging { + + /** A shared job description for all the write tasks. */ + private class WriteJobDescription[K, V]( + val jobTrackerId: String, + val serializableHadoopConf: SerializableConfiguration, + val outputFormat: Class[_ <: OutputFormat[K, V]]) + extends Serializable { + } - private val jobtrackerID = SparkHadoopWriter.createJobTrackerID(new Date()) - private var jobId = 0 - private var splitId = 0 - private var attemptId = 0 + /** + * Basic work flow of this command is: + * 1. Driver side setup, including output committer initialization and data source specific + * preparation work for the write job to be issued. + * 2. Issues a write job consists of one or more executor side tasks, each of which writes all + * rows within an RDD partition. + * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any + * exception is thrown during task commitment, also aborts that task. + * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is + * thrown during job commitment, also aborts the job. + */ + def write[K, V: ClassTag]( + sparkContext: SparkContext, + rdd: RDD[(K, V)], + committer: HadoopMapReduceCommitProtocol, + stageId: Int, + hadoopConf: Configuration): Unit = { + val conf = new SerializableConfiguration(hadoopConf) + + // Set up a job. + val jobTrackerId = SparkNewHadoopWriterUtils.createJobTrackerID(new Date()) + val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP, 0, 0) + val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId) + val format = jobContext.getOutputFormatClass + committer.setupJob(jobContext) + + if (SparkNewHadoopWriterUtils.isOutputSpecValidationEnabled(rdd.conf)) { + // FileOutputFormat ignores the filesystem parameter + val jobFormat = format.newInstance + jobFormat.checkOutputSpecs(jobContext) + } - @transient private var writer: RecordWriter[AnyRef, AnyRef] = null - @transient private var jobContext: JobContext = null - @transient private var taskContext: TaskAttemptContext = null + // When speculation is on and output committer class name contains "Direct", we should warn + // users that they may loss data if they are using a direct output committer. + if (SparkNewHadoopWriterUtils.isSpeculationEnabled(rdd.conf) && committer.isDirectOutput) { + val warningMessage = + s"$committer may be an output committer that writes data directly to " + + "the final location. Because speculation is enabled, this output committer may " + + "cause data loss (see the case in SPARK-10063). If possible, please use an output " + + "committer that does not have this behavior (e.g. FileOutputCommitter)." + logWarning(warningMessage) + } - def setupJob(): Unit = { - // Committer setup a job - committer.setupJob(getJobContext) + // Generate shared job description. + val description = new WriteJobDescription[K, V]( + jobTrackerId = jobTrackerId, + serializableHadoopConf = conf, + outputFormat = format.asInstanceOf[Class[OutputFormat[K, V]]]) + + // Try to write all RDD partitions as a Hadoop OutputFormat. + try { + sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { + executeTask( + context = context, + description = description, + sparkStageId = context.stageId, + sparkPartitionId = context.partitionId, + sparkAttemptNumber = context.attemptNumber, + committer = committer, + iterator = iter) + }) + + committer.commitJob(jobContext, Seq.empty) + logInfo(s"Job ${jobContext.getJobID} committed.") + } catch { case cause: Throwable => + logError(s"Aborting job ${jobContext.getJobID}.", cause) + committer.abortJob(jobContext) + throw new SparkException("Job aborted.", cause) + } } - def setupTask(context: TaskContext): Unit = { - // Set jobID/taskID - jobId = context.stageId - splitId = context.partitionId - attemptId = (context.taskAttemptId % Int.MaxValue).toInt - // Committer setup a task - committer.setupTask(getTaskContext(context)) + /** Write a RDD partition out in a single Spark task. */ + private def executeTask[K, V: ClassTag]( + context: TaskContext, + description: WriteJobDescription[K, V], + sparkStageId: Int, + sparkPartitionId: Int, + sparkAttemptNumber: Int, + committer: FileCommitProtocol, + iterator: Iterator[(K, V)]): Unit = { + val conf = description.serializableHadoopConf.value + + // Set up a task. + val attemptId = new TaskAttemptID(description.jobTrackerId, sparkStageId, TaskType.REDUCE, + sparkPartitionId, sparkAttemptNumber) + val taskContext = new TaskAttemptContextImpl(conf, attemptId) + committer.setupTask(taskContext) + + val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = + SparkNewHadoopWriterUtils.initHadoopOutputMetrics(context) + + // Initiate the writer. + val taskFormat = description.outputFormat.newInstance + val writer = taskFormat.getRecordWriter(taskContext) + .asInstanceOf[RecordWriter[K, V]] + require(writer != null, "Unable to obtain RecordWriter") + var recordsWritten = 0L + + // Write all rows in RDD partition. + try { + Utils.tryWithSafeFinallyAndFailureCallbacks { + while (iterator.hasNext) { + val pair = iterator.next() + writer.write(pair._1, pair._2) + + // Update bytes written metric every few records + SparkNewHadoopWriterUtils.maybeUpdateOutputMetrics( + outputMetricsAndBytesWrittenCallback, recordsWritten) + recordsWritten += 1 + } + + committer.commitTask(taskContext) + }(catchBlock = { + committer.abortTask(taskContext) + logError(s"Task ${taskContext.getTaskAttemptID} aborted.") + }, finallyBlock = writer.close(taskContext)) + } catch { + case t: Throwable => + throw new SparkException("Task failed while writing rows", t) + } + + outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => + om.setBytesWritten(callback()) + om.setRecordsWritten(recordsWritten) + } } +} - def write(context: TaskContext, key: AnyRef, value: AnyRef): Unit = { - getWriter(context).write(key, value) +private[spark] +object SparkNewHadoopWriterUtils { + def createJobID(time: Date, id: Int): JobID = { + val jobtrackerID = createJobTrackerID(time) + new JobID(jobtrackerID, id) } - def abortTask(context: TaskContext): Unit = { - // Close writer - getWriter(context).close(getTaskContext(context)) - // Committer abort a task - committer.abortTask(getTaskContext(context)) + def createJobTrackerID(time: Date): String = { + new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time) } - def commitTask(context: TaskContext): Unit = { - // Close writer - getWriter(context).close(getTaskContext(context)) - // Committer commit a task - committer.commitTask(getTaskContext(context)) + def createPathFromString(path: String, conf: JobConf): Path = { + if (path == null) { + throw new IllegalArgumentException("Output path is null") + } + val outputPath = new Path(path) + val fs = outputPath.getFileSystem(conf) + if (fs == null) { + throw new IllegalArgumentException("Incorrectly formatted output path") + } + outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) } - def abortJob(): Unit = { - committer.abortJob(getJobContext) + // Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation + // setting can take effect: + def isOutputSpecValidationEnabled(conf: SparkConf): Boolean = { + val validationDisabled = disableOutputSpecValidation.value + val enabledInConf = conf.getBoolean("spark.hadoop.validateOutputSpecs", true) + enabledInConf && !validationDisabled } - def commitJob() { - committer.commitJob(getJobContext, Seq.empty) + def isSpeculationEnabled(conf: SparkConf): Boolean = { + conf.getBoolean("spark.speculation", false) } - // ********* Private Functions ********* + // TODO: these don't seem like the right abstractions. + // We should abstract the duplicate code in a less awkward way. - /* - * Generate jobContext. Since jobContext is transient, it may be null after serialization. - */ - private def getJobContext(): JobContext = { - if (jobContext == null) { - val jobAttemptId = new TaskAttemptID(jobtrackerID, jobId, TaskType.MAP, 0, 0) - jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId) + // return type: (output metrics, bytes written callback), defined only if the latter is defined + def initHadoopOutputMetrics( + context: TaskContext): Option[(OutputMetrics, () => Long)] = { + val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback() + bytesWrittenCallback.map { b => + (context.taskMetrics().outputMetrics, b) } - jobContext } - /* - * Generate taskContext. Since taskContext is transient, it may be null after serialization. - */ - private def getTaskContext(context: TaskContext): TaskAttemptContext = { - if (taskContext == null) { - val attemptId = new TaskAttemptID(jobtrackerID, jobId, TaskType.REDUCE, splitId, - context.attemptNumber) - taskContext = new TaskAttemptContextImpl(conf.value, attemptId) + def maybeUpdateOutputMetrics( + outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)], + recordsWritten: Long): Unit = { + if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) { + outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => + om.setBytesWritten(callback()) + om.setRecordsWritten(recordsWritten) + } } - taskContext } - /* - * Generate writer. Since writer is transient, it may be null after serialization. + val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256 + + /** + * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case + * basis; see SPARK-4835 for more details. */ - private def getWriter(context: TaskContext): RecordWriter[AnyRef, AnyRef] = { - if (writer == null) { - val format = getJobContext.getOutputFormatClass.newInstance - writer = format.getRecordWriter(getTaskContext(context)) - .asInstanceOf[RecordWriter[AnyRef, AnyRef]] - } - writer - } + val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false) } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index dc4ca1d1981e3..69ae0190ea85c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -41,7 +41,7 @@ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.annotation.Experimental import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.OutputMetrics -import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol, SparkNewHadoopWriter} +import org.apache.spark.internal.io.{SparkNewHadoopWriterUtils, FileCommitProtocol, HadoopMapReduceCommitProtocol, SparkNewHadoopWriter} import org.apache.spark.internal.Logging import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer @@ -1061,7 +1061,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } FileOutputFormat.setOutputPath(hadoopConf, - SparkHadoopWriter.createPathFromString(path, hadoopConf)) + SparkNewHadoopWriterUtils.createPathFromString(path, hadoopConf)) saveAsHadoopDataset(hadoopConf) } @@ -1077,81 +1077,20 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * result of using direct output committer with speculation enabled. */ def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope { - // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). - val hadoopConf = conf - val job = NewAPIHadoopJob.getInstance(hadoopConf) - val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) - val jobtrackerID = formatter.format(new Date()) val stageId = self.id - val jobConfiguration = job.getConfiguration - val wrappedConf = new SerializableConfiguration(jobConfiguration) - val outfmt = job.getOutputFormatClass - val jobFormat = outfmt.newInstance - - if (isOutputSpecValidationEnabled) { - // FileOutputFormat ignores the filesystem parameter - jobFormat.checkOutputSpecs(job) - } - // Instantiate writer val committer = FileCommitProtocol.instantiate( className = classOf[HadoopMapReduceCommitProtocol].getName, jobId = stageId.toString, - outputPath = jobConfiguration.get("mapred.output.dir"), - isAppend = false - ).asInstanceOf[HadoopMapReduceCommitProtocol] - val writer = new SparkNewHadoopWriter(hadoopConf, committer) - - val writeShard = (context: TaskContext, iter: Iterator[(K, V)]) => { - writer.setupTask(context) - - val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = - initHadoopOutputMetrics(context) - - require(writer != null, "Unable to obtain RecordWriter") - var recordsWritten = 0L - Utils.tryWithSafeFinallyAndFailureCallbacks { - while (iter.hasNext) { - val pair = iter.next() - writer.write(context, pair._1.asInstanceOf[AnyRef], pair._2.asInstanceOf[AnyRef]) + outputPath = conf.get("mapred.output.dir"), + isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] - // Update bytes written metric every few records - maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten) - recordsWritten += 1 - } - - writer.commitTask(context) - }(catchBlock = { - writer.abortTask(context) - writer.abortJob() - }) - outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => - om.setBytesWritten(callback()) - om.setRecordsWritten(recordsWritten) - } - 1 - } : Int - - val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 0, 0) - val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, jobAttemptId) - val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) - - // When speculation is on and output committer class name contains "Direct", we should warn - // users that they may loss data if they are using a direct output committer. - val speculationEnabled = self.conf.getBoolean("spark.speculation", false) - val outputCommitterClass = jobCommitter.getClass.getSimpleName - if (speculationEnabled && outputCommitterClass.contains("Direct")) { - val warningMessage = - s"$outputCommitterClass may be an output committer that writes data directly to " + - "the final location. Because speculation is enabled, this output committer may " + - "cause data loss (see the case in SPARK-10063). If possible, please use an output " + - "committer that does not have this behavior (e.g. FileOutputCommitter)." - logWarning(warningMessage) - } - - writer.setupJob() - self.context.runJob(self, writeShard) - writer.commitJob() + SparkNewHadoopWriter.write( + sparkContext = self.context, + rdd = self, + committer = committer, + stageId = stageId, + hadoopConf = conf) } /** @@ -1180,7 +1119,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") - if (isOutputSpecValidationEnabled) { + if (SparkNewHadoopWriterUtils.isOutputSpecValidationEnabled(self.conf)) { // FileOutputFormat ignores the filesystem parameter val ignoredFs = FileSystem.get(hadoopConf) hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) @@ -1195,7 +1134,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = - initHadoopOutputMetrics(context) + SparkNewHadoopWriterUtils.initHadoopOutputMetrics(context) writer.setup(context.stageId, context.partitionId, taskAttemptId) writer.open() @@ -1207,7 +1146,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) // Update bytes written metric every few records - maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten) + SparkNewHadoopWriterUtils.maybeUpdateOutputMetrics( + outputMetricsAndBytesWrittenCallback, recordsWritten) recordsWritten += 1 } }(finallyBlock = writer.close()) @@ -1222,29 +1162,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.commitJob() } - // TODO: these don't seem like the right abstractions. - // We should abstract the duplicate code in a less awkward way. - - // return type: (output metrics, bytes written callback), defined only if the latter is defined - private def initHadoopOutputMetrics( - context: TaskContext): Option[(OutputMetrics, () => Long)] = { - val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback() - bytesWrittenCallback.map { b => - (context.taskMetrics().outputMetrics, b) - } - } - - private def maybeUpdateOutputMetrics( - outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)], - recordsWritten: Long): Unit = { - if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) { - outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => - om.setBytesWritten(callback()) - om.setRecordsWritten(recordsWritten) - } - } - } - /** * Return an RDD with the keys of each tuple. */ @@ -1260,22 +1177,4 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) private[spark] def valueClass: Class[_] = vt.runtimeClass private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord) - - // Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation - // setting can take effect: - private def isOutputSpecValidationEnabled: Boolean = { - val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value - val enabledInConf = self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) - enabledInConf && !validationDisabled - } -} - -private[spark] object PairRDDFunctions { - val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256 - - /** - * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case - * basis; see SPARK-4835 for more details. - */ - val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false) } diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 3acff934d9d70..fe547d4d9163e 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -529,7 +529,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val e = intercept[SparkException] { pairs.saveAsNewAPIHadoopFile[NewFakeFormatWithCallback]("ignored") } - assert(e.getMessage contains "failed to write") + assert(e.getCause.getMessage contains "failed to write") assert(FakeWriterWithCallback.calledBy === "write,callback,close") assert(FakeWriterWithCallback.exception != null, "exception should be captured") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index e404dcd5452b9..9f28c0d138b48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.io.{SparkNewHadoopWriterUtils, FileCommitProtocol} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.catalog.BucketSpec @@ -153,7 +153,7 @@ object FileFormatWriter extends Logging { committer: FileCommitProtocol, iterator: Iterator[InternalRow]): (TaskCommitMessage, Set[String]) = { - val jobId = SparkHadoopWriter.createJobID(new Date, sparkStageId) + val jobId = SparkNewHadoopWriterUtils.createJobID(new Date, sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) From 09d5ed906b5a178632e98f079fed65f5c3891706 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Sun, 6 Nov 2016 02:20:42 +0800 Subject: [PATCH 06/11] bugfix --- .../internal/io/SparkNewHadoopWriter.scala | 19 ++++++++++--------- .../apache/spark/rdd/PairRDDFunctions.scala | 11 ++++------- .../datasources/FileFormatWriter.scala | 2 +- 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkNewHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkNewHadoopWriter.scala index c8421504807d2..cc4baa9cd641f 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkNewHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkNewHadoopWriter.scala @@ -18,23 +18,23 @@ package org.apache.spark.internal.io import java.text.SimpleDateFormat -import java.util.{Locale, Date} +import java.util.{Date, Locale} + +import scala.reflect.ClassTag +import scala.util.DynamicVariable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.{JobConf, JobID} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl + +import org.apache.spark.{SparkConf, SparkContext, SparkException, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.OutputMetrics -import org.apache.spark.rdd.RDD - -import org.apache.spark.{SparkConf, SparkException, SparkContext, TaskContext} import org.apache.spark.internal.Logging -import org.apache.spark.util.{Utils, SerializableConfiguration} - -import scala.reflect.ClassTag -import scala.util.DynamicVariable +import org.apache.spark.rdd.RDD +import org.apache.spark.util.{SerializableConfiguration, Utils} /** * A helper object that saves an RDD using a Hadoop OutputFormat @@ -75,7 +75,6 @@ object SparkNewHadoopWriter extends Logging { val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP, 0, 0) val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId) val format = jobContext.getOutputFormatClass - committer.setupJob(jobContext) if (SparkNewHadoopWriterUtils.isOutputSpecValidationEnabled(rdd.conf)) { // FileOutputFormat ignores the filesystem parameter @@ -83,6 +82,8 @@ object SparkNewHadoopWriter extends Logging { jobFormat.checkOutputSpecs(jobContext) } + committer.setupJob(jobContext) + // When speculation is on and output committer class name contains "Direct", we should warn // users that they may loss data if they are using a direct output committer. if (SparkNewHadoopWriterUtils.isSpeculationEnabled(rdd.conf) && committer.isDirectOutput) { diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 69ae0190ea85c..73afcceec3f42 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -18,14 +18,12 @@ package org.apache.spark.rdd import java.nio.ByteBuffer -import java.text.SimpleDateFormat -import java.util.{Date, HashMap => JHashMap, Locale} +import java.util.{HashMap => JHashMap} import scala.collection.{mutable, Map} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import scala.util.DynamicVariable import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.conf.Configuration @@ -33,19 +31,18 @@ import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} -import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat, TaskAttemptID, TaskType} -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat} import org.apache.spark._ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.annotation.Experimental import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.OutputMetrics -import org.apache.spark.internal.io.{SparkNewHadoopWriterUtils, FileCommitProtocol, HadoopMapReduceCommitProtocol, SparkNewHadoopWriter} +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol, SparkNewHadoopWriter, SparkNewHadoopWriterUtils} import org.apache.spark.internal.Logging import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer -import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util.Utils import org.apache.spark.util.collection.CompactBuffer import org.apache.spark.util.random.StratifiedSamplingUtils diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 9f28c0d138b48..3e24fb3fdddf3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.{SparkNewHadoopWriterUtils, FileCommitProtocol} +import org.apache.spark.internal.io.{FileCommitProtocol, SparkNewHadoopWriterUtils} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.catalog.BucketSpec From e5a60ff66461045eb2a1754f41fd1412af5d76b0 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Sun, 6 Nov 2016 02:41:53 +0800 Subject: [PATCH 07/11] bugfix --- .../spark/internal/io/HadoopMapReduceCommitProtocol.scala | 1 - .../scala/org/apache/spark/sql/hive/hiveWriterContainers.scala | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 31d21f930ba3b..6420ee4fc49b8 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.SparkHadoopWriter import org.apache.spark.internal.Logging import org.apache.spark.mapred.SparkHadoopMapRedUtil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index e53c3e4d4833b..22a687e22dcf9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.TaskType import org.apache.spark._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.SparkNewHadoopWriterUtils import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -142,7 +143,7 @@ private[hive] class SparkHiveWriterContainer( splitID = splitId attemptID = attemptId - jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId)) + jID = new SerializableWritable[JobID](SparkNewHadoopWriterUtils.createJobID(now, jobId)) taID = new SerializableWritable[TaskAttemptID]( new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID)) } From 86f195199e336bbbadd9809dfb3923a9a948c195 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Sun, 6 Nov 2016 03:03:57 +0800 Subject: [PATCH 08/11] bugfix --- .../scala/org/apache/spark/streaming/dstream/DStream.scala | 5 +++-- .../org/apache/spark/streaming/scheduler/JobScheduler.scala | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index fa15a0bf65ab9..d63cefc72ff90 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -27,7 +27,8 @@ import scala.util.matching.Regex import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD, RDDOperationScope} +import org.apache.spark.internal.io.SparkNewHadoopWriterUtils +import org.apache.spark.rdd.{BlockRDD, RDD, RDDOperationScope} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext.rddToFileName @@ -337,7 +338,7 @@ abstract class DStream[T: ClassTag] ( // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. - PairRDDFunctions.disableOutputSpecValidation.withValue(true) { + SparkNewHadoopWriterUtils.disableOutputSpecValidation.withValue(true) { compute(time) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 98e099354a7db..a884212cf9259 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -26,7 +26,8 @@ import org.apache.commons.lang3.SerializationUtils import org.apache.spark.ExecutorAllocationClient import org.apache.spark.internal.Logging -import org.apache.spark.rdd.{PairRDDFunctions, RDD} +import org.apache.spark.internal.io.SparkNewHadoopWriterUtils +import org.apache.spark.rdd.RDD import org.apache.spark.streaming._ import org.apache.spark.streaming.api.python.PythonDStream import org.apache.spark.streaming.ui.UIUtils @@ -250,7 +251,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. - PairRDDFunctions.disableOutputSpecValidation.withValue(true) { + SparkNewHadoopWriterUtils.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop = eventLoop From cfcd823225b3c1b0464b4b571c78ecf969a88f25 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Sun, 6 Nov 2016 09:53:59 +0800 Subject: [PATCH 09/11] refactor to improve readability. --- .../org/apache/spark/SparkHadoopWriter.scala | 4 +- .../io/HadoopMapReduceCommitProtocol.scala | 2 +- ...scala => SparkHadoopMapReduceWriter.scala} | 60 +++++++++---------- .../apache/spark/rdd/PairRDDFunctions.scala | 23 ++----- .../datasources/FileFormatWriter.scala | 4 +- .../spark/sql/hive/hiveWriterContainers.scala | 4 +- .../spark/streaming/dstream/DStream.scala | 4 +- .../streaming/scheduler/JobScheduler.scala | 4 +- 8 files changed, 44 insertions(+), 61 deletions(-) rename core/src/main/scala/org/apache/spark/internal/io/{SparkNewHadoopWriter.scala => SparkHadoopMapReduceWriter.scala} (83%) diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index e863aca46c5b1..46e22b215b8ee 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.mapred._ import org.apache.hadoop.mapreduce.TaskType import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.SparkNewHadoopWriterUtils +import org.apache.spark.internal.io.SparkHadoopWriterUtils import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.rdd.HadoopRDD import org.apache.spark.util.SerializableJobConf @@ -153,7 +153,7 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable { splitID = splitid attemptID = attemptid - jID = new SerializableWritable[JobID](SparkNewHadoopWriterUtils.createJobID(now, jobid)) + jID = new SerializableWritable[JobID](SparkHadoopWriterUtils.createJobID(now, jobid)) taID = new SerializableWritable[TaskAttemptID]( new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID)) } diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 6420ee4fc49b8..d643a32af0314 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -68,7 +68,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) override def setupJob(jobContext: JobContext): Unit = { // Setup IDs - val jobId = SparkNewHadoopWriterUtils.createJobID(new Date, 0) + val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0) val taskId = new TaskID(jobId, TaskType.MAP, 0) val taskAttemptId = new TaskAttemptID(taskId, 0) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkNewHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala similarity index 83% rename from core/src/main/scala/org/apache/spark/internal/io/SparkNewHadoopWriter.scala rename to core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index cc4baa9cd641f..51d3c576f93d4 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkNewHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -41,20 +41,12 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} * (from the newer mapreduce API, not the old mapred API). */ private[spark] -object SparkNewHadoopWriter extends Logging { - - /** A shared job description for all the write tasks. */ - private class WriteJobDescription[K, V]( - val jobTrackerId: String, - val serializableHadoopConf: SerializableConfiguration, - val outputFormat: Class[_ <: OutputFormat[K, V]]) - extends Serializable { - } +object SparkHadoopMapReduceWriter extends Logging { /** * Basic work flow of this command is: - * 1. Driver side setup, including output committer initialization and data source specific - * preparation work for the write job to be issued. + * 1. Driver side setup, prepare the data source and hadoop configuration for the write job to + * be issued. * 2. Issues a write job consists of one or more executor side tasks, each of which writes all * rows within an RDD partition. * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any @@ -63,30 +55,36 @@ object SparkNewHadoopWriter extends Logging { * thrown during job commitment, also aborts the job. */ def write[K, V: ClassTag]( - sparkContext: SparkContext, rdd: RDD[(K, V)], - committer: HadoopMapReduceCommitProtocol, - stageId: Int, hadoopConf: Configuration): Unit = { + // Extract context and configuration from RDD. + val sparkContext = rdd.context + val stageId = rdd.id + val sparkConf = rdd.conf val conf = new SerializableConfiguration(hadoopConf) // Set up a job. - val jobTrackerId = SparkNewHadoopWriterUtils.createJobTrackerID(new Date()) + val jobTrackerId = SparkHadoopWriterUtils.createJobTrackerID(new Date()) val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP, 0, 0) val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId) val format = jobContext.getOutputFormatClass - if (SparkNewHadoopWriterUtils.isOutputSpecValidationEnabled(rdd.conf)) { + if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(sparkConf)) { // FileOutputFormat ignores the filesystem parameter val jobFormat = format.newInstance jobFormat.checkOutputSpecs(jobContext) } + val committer = FileCommitProtocol.instantiate( + className = classOf[HadoopMapReduceCommitProtocol].getName, + jobId = stageId.toString, + outputPath = conf.value.get("mapred.output.dir"), + isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] committer.setupJob(jobContext) // When speculation is on and output committer class name contains "Direct", we should warn // users that they may loss data if they are using a direct output committer. - if (SparkNewHadoopWriterUtils.isSpeculationEnabled(rdd.conf) && committer.isDirectOutput) { + if (SparkHadoopWriterUtils.isSpeculationEnabled(sparkConf) && committer.isDirectOutput) { val warningMessage = s"$committer may be an output committer that writes data directly to " + "the final location. Because speculation is enabled, this output committer may " + @@ -95,22 +93,18 @@ object SparkNewHadoopWriter extends Logging { logWarning(warningMessage) } - // Generate shared job description. - val description = new WriteJobDescription[K, V]( - jobTrackerId = jobTrackerId, - serializableHadoopConf = conf, - outputFormat = format.asInstanceOf[Class[OutputFormat[K, V]]]) - // Try to write all RDD partitions as a Hadoop OutputFormat. try { sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { executeTask( context = context, - description = description, + jobTrackerId = jobTrackerId, sparkStageId = context.stageId, sparkPartitionId = context.partitionId, sparkAttemptNumber = context.attemptNumber, committer = committer, + hadoopConf = conf.value, + outputFormat = format.asInstanceOf[Class[OutputFormat[K, V]]], iterator = iter) }) @@ -126,25 +120,25 @@ object SparkNewHadoopWriter extends Logging { /** Write a RDD partition out in a single Spark task. */ private def executeTask[K, V: ClassTag]( context: TaskContext, - description: WriteJobDescription[K, V], + jobTrackerId: String, sparkStageId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, committer: FileCommitProtocol, + hadoopConf: Configuration, + outputFormat: Class[_ <: OutputFormat[K, V]], iterator: Iterator[(K, V)]): Unit = { - val conf = description.serializableHadoopConf.value - // Set up a task. - val attemptId = new TaskAttemptID(description.jobTrackerId, sparkStageId, TaskType.REDUCE, + val attemptId = new TaskAttemptID(jobTrackerId, sparkStageId, TaskType.REDUCE, sparkPartitionId, sparkAttemptNumber) - val taskContext = new TaskAttemptContextImpl(conf, attemptId) + val taskContext = new TaskAttemptContextImpl(hadoopConf, attemptId) committer.setupTask(taskContext) val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = - SparkNewHadoopWriterUtils.initHadoopOutputMetrics(context) + SparkHadoopWriterUtils.initHadoopOutputMetrics(context) // Initiate the writer. - val taskFormat = description.outputFormat.newInstance + val taskFormat = outputFormat.newInstance val writer = taskFormat.getRecordWriter(taskContext) .asInstanceOf[RecordWriter[K, V]] require(writer != null, "Unable to obtain RecordWriter") @@ -158,7 +152,7 @@ object SparkNewHadoopWriter extends Logging { writer.write(pair._1, pair._2) // Update bytes written metric every few records - SparkNewHadoopWriterUtils.maybeUpdateOutputMetrics( + SparkHadoopWriterUtils.maybeUpdateOutputMetrics( outputMetricsAndBytesWrittenCallback, recordsWritten) recordsWritten += 1 } @@ -181,7 +175,7 @@ object SparkNewHadoopWriter extends Logging { } private[spark] -object SparkNewHadoopWriterUtils { +object SparkHadoopWriterUtils { def createJobID(time: Date, id: Int): JobID = { val jobtrackerID = createJobTrackerID(time) new JobID(jobtrackerID, id) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 73afcceec3f42..f9b9631d9e7ca 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -38,7 +38,7 @@ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.annotation.Experimental import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.OutputMetrics -import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol, SparkNewHadoopWriter, SparkNewHadoopWriterUtils} +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol, SparkHadoopMapReduceWriter, SparkHadoopWriterUtils} import org.apache.spark.internal.Logging import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer @@ -1058,7 +1058,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } FileOutputFormat.setOutputPath(hadoopConf, - SparkNewHadoopWriterUtils.createPathFromString(path, hadoopConf)) + SparkHadoopWriterUtils.createPathFromString(path, hadoopConf)) saveAsHadoopDataset(hadoopConf) } @@ -1074,19 +1074,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * result of using direct output committer with speculation enabled. */ def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope { - val stageId = self.id - - val committer = FileCommitProtocol.instantiate( - className = classOf[HadoopMapReduceCommitProtocol].getName, - jobId = stageId.toString, - outputPath = conf.get("mapred.output.dir"), - isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] - - SparkNewHadoopWriter.write( - sparkContext = self.context, + SparkHadoopMapReduceWriter.write( rdd = self, - committer = committer, - stageId = stageId, hadoopConf = conf) } @@ -1116,7 +1105,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") - if (SparkNewHadoopWriterUtils.isOutputSpecValidationEnabled(self.conf)) { + if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(self.conf)) { // FileOutputFormat ignores the filesystem parameter val ignoredFs = FileSystem.get(hadoopConf) hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) @@ -1131,7 +1120,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = - SparkNewHadoopWriterUtils.initHadoopOutputMetrics(context) + SparkHadoopWriterUtils.initHadoopOutputMetrics(context) writer.setup(context.stageId, context.partitionId, taskAttemptId) writer.open() @@ -1143,7 +1132,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) // Update bytes written metric every few records - SparkNewHadoopWriterUtils.maybeUpdateOutputMetrics( + SparkHadoopWriterUtils.maybeUpdateOutputMetrics( outputMetricsAndBytesWrittenCallback, recordsWritten) recordsWritten += 1 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 3e24fb3fdddf3..fa7fe143daeba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.{FileCommitProtocol, SparkNewHadoopWriterUtils} +import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.catalog.BucketSpec @@ -153,7 +153,7 @@ object FileFormatWriter extends Logging { committer: FileCommitProtocol, iterator: Iterator[InternalRow]): (TaskCommitMessage, Set[String]) = { - val jobId = SparkNewHadoopWriterUtils.createJobID(new Date, sparkStageId) + val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 22a687e22dcf9..a34e2e76f5838 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -37,7 +37,7 @@ import org.apache.hadoop.mapreduce.TaskType import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.SparkNewHadoopWriterUtils +import org.apache.spark.internal.io.SparkHadoopWriterUtils import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -143,7 +143,7 @@ private[hive] class SparkHiveWriterContainer( splitID = splitId attemptID = attemptId - jID = new SerializableWritable[JobID](SparkNewHadoopWriterUtils.createJobID(now, jobId)) + jID = new SerializableWritable[JobID](SparkHadoopWriterUtils.createJobID(now, jobId)) taID = new SerializableWritable[TaskAttemptID]( new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index d63cefc72ff90..7e0a2ca609c86 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -27,7 +27,7 @@ import scala.util.matching.Regex import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.SparkNewHadoopWriterUtils +import org.apache.spark.internal.io.SparkHadoopWriterUtils import org.apache.spark.rdd.{BlockRDD, RDD, RDDOperationScope} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ @@ -338,7 +338,7 @@ abstract class DStream[T: ClassTag] ( // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. - SparkNewHadoopWriterUtils.disableOutputSpecValidation.withValue(true) { + SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) { compute(time) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index a884212cf9259..b7d114bc16d48 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -26,7 +26,7 @@ import org.apache.commons.lang3.SerializationUtils import org.apache.spark.ExecutorAllocationClient import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.SparkNewHadoopWriterUtils +import org.apache.spark.internal.io.SparkHadoopWriterUtils import org.apache.spark.rdd.RDD import org.apache.spark.streaming._ import org.apache.spark.streaming.api.python.PythonDStream @@ -251,7 +251,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. - SparkNewHadoopWriterUtils.disableOutputSpecValidation.withValue(true) { + SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop = eventLoop From 243b8ba106dd3a604a9053612d9d38482792a3db Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Mon, 7 Nov 2016 11:50:43 +0800 Subject: [PATCH 10/11] pass the result from commit protocol to commitJob. --- .../io/SparkHadoopMapReduceWriter.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index 51d3c576f93d4..eda39b3eae65b 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -29,10 +29,11 @@ import org.apache.hadoop.mapred.{JobConf, JobID} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.{SparkConf, SparkContext, SparkException, TaskContext} +import org.apache.spark.{SparkConf, SparkException, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.OutputMetrics import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.rdd.RDD import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -95,7 +96,7 @@ object SparkHadoopMapReduceWriter extends Logging { // Try to write all RDD partitions as a Hadoop OutputFormat. try { - sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { + val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { executeTask( context = context, jobTrackerId = jobTrackerId, @@ -108,7 +109,7 @@ object SparkHadoopMapReduceWriter extends Logging { iterator = iter) }) - committer.commitJob(jobContext, Seq.empty) + committer.commitJob(jobContext, ret) logInfo(s"Job ${jobContext.getJobID} committed.") } catch { case cause: Throwable => logError(s"Aborting job ${jobContext.getJobID}.", cause) @@ -127,7 +128,7 @@ object SparkHadoopMapReduceWriter extends Logging { committer: FileCommitProtocol, hadoopConf: Configuration, outputFormat: Class[_ <: OutputFormat[K, V]], - iterator: Iterator[(K, V)]): Unit = { + iterator: Iterator[(K, V)]): TaskCommitMessage = { // Set up a task. val attemptId = new TaskAttemptID(jobTrackerId, sparkStageId, TaskType.REDUCE, sparkPartitionId, sparkAttemptNumber) @@ -165,11 +166,11 @@ object SparkHadoopMapReduceWriter extends Logging { } catch { case t: Throwable => throw new SparkException("Task failed while writing rows", t) - } - - outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => - om.setBytesWritten(callback()) - om.setRecordsWritten(recordsWritten) + } finally { + outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => + om.setBytesWritten(callback()) + om.setRecordsWritten(recordsWritten) + } } } } From 9380f91281587867d8a630199c01c4263bc2e197 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Tue, 8 Nov 2016 15:54:10 +0800 Subject: [PATCH 11/11] refactor. --- .../io/SparkHadoopMapReduceWriter.scala | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index eda39b3eae65b..a405c44e1093d 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -111,10 +111,11 @@ object SparkHadoopMapReduceWriter extends Logging { committer.commitJob(jobContext, ret) logInfo(s"Job ${jobContext.getJobID} committed.") - } catch { case cause: Throwable => - logError(s"Aborting job ${jobContext.getJobID}.", cause) - committer.abortJob(jobContext) - throw new SparkException("Job aborted.", cause) + } catch { + case cause: Throwable => + logError(s"Aborting job ${jobContext.getJobID}.", cause) + committer.abortJob(jobContext) + throw new SparkException("Job aborted.", cause) } } @@ -147,7 +148,7 @@ object SparkHadoopMapReduceWriter extends Logging { // Write all rows in RDD partition. try { - Utils.tryWithSafeFinallyAndFailureCallbacks { + val ret = Utils.tryWithSafeFinallyAndFailureCallbacks { while (iterator.hasNext) { val pair = iterator.next() writer.write(pair._1, pair._2) @@ -163,20 +164,26 @@ object SparkHadoopMapReduceWriter extends Logging { committer.abortTask(taskContext) logError(s"Task ${taskContext.getTaskAttemptID} aborted.") }, finallyBlock = writer.close(taskContext)) + + outputMetricsAndBytesWrittenCallback.foreach { + case (om, callback) => + om.setBytesWritten(callback()) + om.setRecordsWritten(recordsWritten) + } + + ret } catch { case t: Throwable => throw new SparkException("Task failed while writing rows", t) - } finally { - outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => - om.setBytesWritten(callback()) - om.setRecordsWritten(recordsWritten) - } } } } private[spark] object SparkHadoopWriterUtils { + + private val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256 + def createJobID(time: Date, id: Int): JobID = { val jobtrackerID = createJobTrackerID(time) new JobID(jobtrackerID, id) @@ -226,15 +233,14 @@ object SparkHadoopWriterUtils { outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)], recordsWritten: Long): Unit = { if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) { - outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => - om.setBytesWritten(callback()) - om.setRecordsWritten(recordsWritten) + outputMetricsAndBytesWrittenCallback.foreach { + case (om, callback) => + om.setBytesWritten(callback()) + om.setRecordsWritten(recordsWritten) } } } - val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256 - /** * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case * basis; see SPARK-4835 for more details.