From 984626fb2451934738d702becf5c6c59f5c51082 Mon Sep 17 00:00:00 2001 From: scwf Date: Sat, 8 Aug 2015 16:09:01 +0800 Subject: [PATCH 1/5] hive side: fallback on sorting when writing many dynamic partitions --- .../hive/execution/InsertIntoHiveTable.scala | 72 ++---- .../spark/sql/hive/hiveWriterContainers.scala | 241 +++++++++++++++--- 2 files changed, 231 insertions(+), 82 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index b02ace786c66c..0d69f867f3cd7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -24,20 +24,16 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.{Context, ErrorMsg} -import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.hadoop.hive.serde2.Serializer -import org.apache.hadoop.hive.serde2.objectinspector._ -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} -import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLConf import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, FromUnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} -import org.apache.spark.sql.types.DataType +import org.apache.spark.SparkException import org.apache.spark.util.SerializableJobConf private[hive] @@ -46,20 +42,13 @@ case class InsertIntoHiveTable( partition: Map[String, Option[String]], child: SparkPlan, overwrite: Boolean, - ifNotExists: Boolean) extends UnaryNode with HiveInspectors { + ifNotExists: Boolean) extends UnaryNode { @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext] - @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass @transient private lazy val hiveContext = new Context(sc.hiveconf) @transient private lazy val catalog = sc.catalog - private def newSerializer(tableDesc: TableDesc): Serializer = { - val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] - serializer.initialize(null, tableDesc.getProperties) - serializer - } - - def output: Seq[Attribute] = Seq.empty + def output: Seq[Attribute] = child.output private def saveAsHiveFile( rdd: RDD[InternalRow], @@ -78,44 +67,10 @@ case class InsertIntoHiveTable( conf.value, SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value)) log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) - writerContainer.driverSideSetup() - sc.sparkContext.runJob(rdd, writeToFile _) + sc.sparkContext.runJob(rdd, writerContainer.writeToFile _) writerContainer.commitJob() - // Note that this function is executed on executor side - def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { - val serializer = newSerializer(fileSinkConf.getTableInfo) - val standardOI = ObjectInspectorUtils - .getStandardObjectInspector( - fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, - ObjectInspectorCopyOption.JAVA) - .asInstanceOf[StructObjectInspector] - - val fieldOIs = standardOI.getAllStructFieldRefs.asScala - .map(_.getFieldObjectInspector).toArray - val dataTypes: Array[DataType] = child.output.map(_.dataType).toArray - val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt)} - val outputData = new Array[Any](fieldOIs.length) - - writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) - - val proj = FromUnsafeProjection(child.schema) - iterator.foreach { row => - var i = 0 - val safeRow = proj(row) - while (i < fieldOIs.length) { - outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(safeRow.get(i, dataTypes(i))) - i += 1 - } - - writerContainer - .getLocalFileWriter(safeRow, table.schema) - .write(serializer.serialize(outputData, standardOI)) - } - - writerContainer.close() - } } /** @@ -194,11 +149,22 @@ case class InsertIntoHiveTable( val writerContainer = if (numDynamicPartitions > 0) { val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions) - new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames) + new SparkHiveDynamicPartitionWriterContainer( + jobConf, + fileSinkConf, + dynamicPartColNames, + child.output, + table, + sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES)) } else { - new SparkHiveWriterContainer(jobConf, fileSinkConf) + new SparkHiveWriterContainer( + jobConf, + fileSinkConf, + child.output, + table) } + @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) val outputPath = FileOutputFormat.getOutputPath(jobConf) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 22182ba00986f..a6b24e9eaa4e4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -20,8 +20,9 @@ package org.apache.spark.sql.hive import java.text.NumberFormat import java.util.Date -import scala.collection.mutable - +import org.apache.hadoop.hive.serde2.Serializer +import org.apache.hadoop.hive.serde2.objectinspector.{StructObjectInspector, ObjectInspectorUtils} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.conf.HiveConf.ConfVars @@ -32,22 +33,31 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred._ import org.apache.hadoop.mapreduce.TaskType -import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter} +import org.apache.spark.{Logging, SerializableWritable} import org.apache.spark.mapred.SparkHadoopMapRedUtil -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.UnsafeKVExternalSorter +import org.apache.spark._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableJobConf +import scala.collection.JavaConversions._ + /** * Internal helper class that saves an RDD using a Hive OutputFormat. * It is based on [[SparkHadoopWriter]]. */ private[hive] class SparkHiveWriterContainer( - jobConf: JobConf, - fileSinkConf: FileSinkDesc) - extends Logging with Serializable { + @transient jobConf: JobConf, + fileSinkConf: FileSinkDesc, + inputSchema: Seq[Attribute], + table: MetastoreRelation) + extends Logging + with HiveInspectors + with Serializable { private val now = new Date() private val tableDesc: TableDesc = fileSinkConf.getTableInfo @@ -93,14 +103,12 @@ private[hive] class SparkHiveWriterContainer( "part-" + numberFormat.format(splitID) + extension } - def getLocalFileWriter(row: InternalRow, schema: StructType): FileSinkOperator.RecordWriter = { - writer - } - def close() { // Seems the boolean value passed into close does not matter. - writer.close(false) - commit() + if (writer != null) { + writer.close(false) + commit() + } } def commitJob() { @@ -123,6 +131,13 @@ private[hive] class SparkHiveWriterContainer( SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID) } + def abortTask(): Unit = { + if (committer != null) { + committer.abortTask(taskContext) + } + logError(s"Task attempt $taskContext aborted.") + } + private def setIDs(jobId: Int, splitId: Int, attemptId: Int) { jobID = jobId splitID = splitId @@ -140,6 +155,39 @@ private[hive] class SparkHiveWriterContainer( conf.value.setBoolean("mapred.task.is.map", true) conf.value.setInt("mapred.task.partition", splitID) } + + def newSerializer(tableDesc: TableDesc): Serializer = { + val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] + serializer.initialize(null, tableDesc.getProperties) + serializer + } + + // this function is executed on executor side + def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { + val serializer = newSerializer(fileSinkConf.getTableInfo) + val standardOI = ObjectInspectorUtils + .getStandardObjectInspector( + fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val dataTypes = inputSchema.map(_.dataType) + val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) } + val outputData = new Array[Any](fieldOIs.length) + executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) + + iterator.foreach { row => + var i = 0 + while (i < fieldOIs.length) { + outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i))) + i += 1 + } + writer.write(serializer.serialize(outputData, standardOI)) + } + + close() + } } private[hive] object SparkHiveWriterContainer { @@ -163,25 +211,23 @@ private[spark] object SparkHiveDynamicPartitionWriterContainer { private[spark] class SparkHiveDynamicPartitionWriterContainer( jobConf: JobConf, fileSinkConf: FileSinkDesc, - dynamicPartColNames: Array[String]) - extends SparkHiveWriterContainer(jobConf, fileSinkConf) { + dynamicPartColNames: Array[String], + inputSchema: Seq[Attribute], + table: MetastoreRelation, + maxOpenFiles: Int) + extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema, table) { import SparkHiveDynamicPartitionWriterContainer._ private val defaultPartName = jobConf.get( ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultStrVal) - @transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _ - override protected def initWriters(): Unit = { - // NOTE: This method is executed at the executor side. - // Actual writers are created for each dynamic partition on the fly. - writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter] + // do nothing } override def close(): Unit = { - writers.values.foreach(_.close(false)) - commit() + // do nothing } override def commitJob(): Unit = { @@ -198,8 +244,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker) } - override def getLocalFileWriter(row: InternalRow, schema: StructType) - : FileSinkOperator.RecordWriter = { + private def getPartitionString(row: InternalRow, schema: StructType): String = { def convertToHiveRawString(col: String, value: Any): String = { val raw = String.valueOf(value) schema(col).dataType match { @@ -210,7 +255,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( } val nonDynamicPartLen = row.numFields - dynamicPartColNames.length - val dynamicPartPath = dynamicPartColNames.zipWithIndex.map { case (colName, i) => + dynamicPartColNames.zipWithIndex.map { case (colName, i) => val rawVal = row.get(nonDynamicPartLen + i, schema(colName).dataType) val string = if (rawVal == null) null else convertToHiveRawString(colName, rawVal) val colString = @@ -221,10 +266,137 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( } s"/$colName=$colString" }.mkString + } - def newWriter(): FileSinkOperator.RecordWriter = { + // this function is executed on executor side + override def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { + val outputWriters = new java.util.HashMap[InternalRow, FileSinkOperator.RecordWriter] + + val serializer = newSerializer(fileSinkConf.getTableInfo) + val standardOI = ObjectInspectorUtils + .getStandardObjectInspector( + fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val dataTypes = inputSchema.map(_.dataType) + val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) } + val outputData = new Array[Any](fieldOIs.length) + executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) + + val partitionOutput = inputSchema.takeRight(dynamicPartColNames.length) + val dataOutput = inputSchema.dropRight(dynamicPartColNames.length) + // Returns the partition key given an input row + val getPartitionKey = UnsafeProjection.create(partitionOutput, inputSchema) + // Returns the data columns to be written given an input row + val getOutputRow = UnsafeProjection.create(dataOutput, inputSchema) + + // If anything below fails, we should abort the task. + try { + // This will be filled in if we have to fall back on sorting. + var sorter: UnsafeKVExternalSorter = null + while (iterator.hasNext && sorter == null) { + val inputRow = iterator.next() + val currentKey = getPartitionKey(inputRow) + var currentWriter = outputWriters.get(currentKey) + + if (currentWriter == null) { + if (outputWriters.size < maxOpenFiles) { + currentWriter = newOutputWriter(currentKey) + outputWriters.put(currentKey.copy(), currentWriter) + var i = 0 + while (i < fieldOIs.length) { + outputData(i) = if (inputRow.isNullAt(i)) { + null + } else { + wrappers(i)(inputRow.get(i, dataTypes(i))) + } + i += 1 + } + currentWriter.write(serializer.serialize(outputData, standardOI)) + } else { + logInfo(s"Maximum partitions reached, falling back on sorting.") + sorter = new UnsafeKVExternalSorter( + StructType.fromAttributes(partitionOutput), + StructType.fromAttributes(dataOutput), + SparkEnv.get.blockManager, + TaskContext.get().taskMemoryManager().pageSizeBytes) + sorter.insertKV(currentKey, getOutputRow(inputRow)) + } + } else { + var i = 0 + while (i < fieldOIs.length) { + outputData(i) = if (inputRow.isNullAt(i)) { + null + } else { + wrappers(i)(inputRow.get(i, dataTypes(i))) + } + i += 1 + } + currentWriter.write(serializer.serialize(outputData, standardOI)) + } + } + + // If the sorter is not null that means that we reached the maxFiles above and need to finish + // using external sort. + if (sorter != null) { + while (iterator.hasNext) { + val currentRow = iterator.next() + sorter.insertKV(getPartitionKey(currentRow), getOutputRow(currentRow)) + } + + logInfo(s"Sorting complete. Writing out partition files one at a time.") + + val sortedIterator = sorter.sortedIterator() + var currentKey: InternalRow = null + var currentWriter: FileSinkOperator.RecordWriter = null + try { + while (sortedIterator.next()) { + if (currentKey != sortedIterator.getKey) { + if (currentWriter != null) { + currentWriter.close(false) + } + currentKey = sortedIterator.getKey.copy() + logDebug(s"Writing partition: $currentKey") + + // Either use an existing file from before, or open a new one. + currentWriter = outputWriters.remove(currentKey) + if (currentWriter == null) { + currentWriter = newOutputWriter(currentKey) + } + } + + var i = 0 + while (i < fieldOIs.length) { + outputData(i) = if (sortedIterator.getValue.isNullAt(i)) { + null + } else { + wrappers(i)(sortedIterator.getValue.get(i, dataTypes(i))) + } + i += 1 + } + currentWriter.write(serializer.serialize(outputData, standardOI)) + } + } finally { + if (currentWriter != null) { + currentWriter.close(false) + } + } + } + clearOutputWriters + commit() + } catch { + case cause: Throwable => + logError("Aborting task.", cause) + abortTask() + throw new SparkException("Task failed while writing rows.", cause) + } + /** Open and returns a new OutputWriter given a partition key. */ + def newOutputWriter(key: InternalRow): FileSinkOperator.RecordWriter = { + val partitionPath = getPartitionString(key, table.schema) val newFileSinkDesc = new FileSinkDesc( - fileSinkConf.getDirName + dynamicPartPath, + fileSinkConf.getDirName + partitionPath, fileSinkConf.getTableInfo, fileSinkConf.getCompressed) newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec) @@ -234,7 +406,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( // to avoid write to the same file when `spark.speculation=true` val path = FileOutputFormat.getTaskOutputPath( conf.value, - dynamicPartPath.stripPrefix("/") + "/" + getOutputName) + partitionPath.stripPrefix("/") + "/" + getOutputName) HiveFileFormatUtils.getHiveRecordWriter( conf.value, @@ -245,6 +417,17 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( Reporter.NULL) } - writers.getOrElseUpdate(dynamicPartPath, newWriter()) + def clearOutputWriters(): Unit = { + outputWriters.values.foreach(_.close(false)) + outputWriters.clear() + } + + def abortTask(): Unit = { + try { + clearOutputWriters() + } finally { + super.abortTask() + } + } } } From 3d6338f641942e720e1b29c256c043d1221890cf Mon Sep 17 00:00:00 2001 From: scwf Date: Sat, 8 Aug 2015 21:28:11 +0800 Subject: [PATCH 2/5] fix spark-3810 test failure --- .../spark/sql/hive/hiveWriterContainers.scala | 42 +++++++------------ 1 file changed, 16 insertions(+), 26 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index a6b24e9eaa4e4..862f437632dd9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -244,30 +244,6 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker) } - private def getPartitionString(row: InternalRow, schema: StructType): String = { - def convertToHiveRawString(col: String, value: Any): String = { - val raw = String.valueOf(value) - schema(col).dataType match { - case DateType => DateTimeUtils.dateToString(raw.toInt) - case _: DecimalType => BigDecimal(raw).toString() - case _ => raw - } - } - - val nonDynamicPartLen = row.numFields - dynamicPartColNames.length - dynamicPartColNames.zipWithIndex.map { case (colName, i) => - val rawVal = row.get(nonDynamicPartLen + i, schema(colName).dataType) - val string = if (rawVal == null) null else convertToHiveRawString(colName, rawVal) - val colString = - if (string == null || string.isEmpty) { - defaultPartName - } else { - FileUtils.escapePathName(string, defaultPartName) - } - s"/$colName=$colString" - }.mkString - } - // this function is executed on executor side override def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { val outputWriters = new java.util.HashMap[InternalRow, FileSinkOperator.RecordWriter] @@ -286,12 +262,26 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) val partitionOutput = inputSchema.takeRight(dynamicPartColNames.length) - val dataOutput = inputSchema.dropRight(dynamicPartColNames.length) + val dataOutput = inputSchema.take(fieldOIs.length) // Returns the partition key given an input row val getPartitionKey = UnsafeProjection.create(partitionOutput, inputSchema) // Returns the data columns to be written given an input row val getOutputRow = UnsafeProjection.create(dataOutput, inputSchema) + val fun: AnyRef = (pathString: String) => FileUtils.escapePathName(pathString, defaultPartName) + // Expressions that given a partition key build a string like: col1=val/col2=val/... + val partitionStringExpression = partitionOutput.zipWithIndex.flatMap { case (c, i) => + val escaped = + ScalaUDF(fun, StringType, Seq(Cast(c, StringType)), Seq(StringType)) + val str = If(IsNull(c), Literal(defaultPartName), escaped) + val partitionName = Literal(dynamicPartColNames(i) + "=") :: str :: Nil + if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: partitionName + } + + // Returns the partition path given a partition key. + val getPartitionString = + UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionOutput) + // If anything below fails, we should abort the task. try { // This will be filled in if we have to fall back on sorting. @@ -394,7 +384,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( } /** Open and returns a new OutputWriter given a partition key. */ def newOutputWriter(key: InternalRow): FileSinkOperator.RecordWriter = { - val partitionPath = getPartitionString(key, table.schema) + val partitionPath = getPartitionString(key).getString(0) val newFileSinkDesc = new FileSinkDesc( fileSinkConf.getDirName + partitionPath, fileSinkConf.getTableInfo, From 7766247e9df33455dc166df14d37ab08de1c5852 Mon Sep 17 00:00:00 2001 From: wangfei Date: Sun, 10 Jan 2016 23:02:02 +0800 Subject: [PATCH 3/5] always do sort based dynamic patition writing --- .../hive/execution/InsertIntoHiveTable.scala | 5 +- .../spark/sql/hive/hiveWriterContainers.scala | 130 +++++------------- 2 files changed, 34 insertions(+), 101 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 0d69f867f3cd7..feb133d44898a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -48,7 +48,7 @@ case class InsertIntoHiveTable( @transient private lazy val hiveContext = new Context(sc.hiveconf) @transient private lazy val catalog = sc.catalog - def output: Seq[Attribute] = child.output + def output: Seq[Attribute] = Seq.empty private def saveAsHiveFile( rdd: RDD[InternalRow], @@ -154,8 +154,7 @@ case class InsertIntoHiveTable( fileSinkConf, dynamicPartColNames, child.output, - table, - sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES)) + table) } else { new SparkHiveWriterContainer( jobConf, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 862f437632dd9..7017d79adfa37 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -33,18 +33,16 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred._ import org.apache.hadoop.mapreduce.TaskType -import org.apache.spark.{Logging, SerializableWritable} +import org.apache.spark._ import org.apache.spark.mapred.SparkHadoopMapRedUtil -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.UnsafeKVExternalSorter -import org.apache.spark._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableJobConf -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ /** * Internal helper class that saves an RDD using a Hive OutputFormat. @@ -171,7 +169,7 @@ private[hive] class SparkHiveWriterContainer( ObjectInspectorCopyOption.JAVA) .asInstanceOf[StructObjectInspector] - val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val fieldOIs = standardOI.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray val dataTypes = inputSchema.map(_.dataType) val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) } val outputData = new Array[Any](fieldOIs.length) @@ -213,8 +211,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( fileSinkConf: FileSinkDesc, dynamicPartColNames: Array[String], inputSchema: Seq[Attribute], - table: MetastoreRelation, - maxOpenFiles: Int) + table: MetastoreRelation) extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema, table) { import SparkHiveDynamicPartitionWriterContainer._ @@ -246,8 +243,6 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( // this function is executed on executor side override def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { - val outputWriters = new java.util.HashMap[InternalRow, FileSinkOperator.RecordWriter] - val serializer = newSerializer(fileSinkConf.getTableInfo) val standardOI = ObjectInspectorUtils .getStandardObjectInspector( @@ -255,7 +250,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( ObjectInspectorCopyOption.JAVA) .asInstanceOf[StructObjectInspector] - val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val fieldOIs = standardOI.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray val dataTypes = inputSchema.map(_.dataType) val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) } val outputData = new Array[Any](fieldOIs.length) @@ -284,97 +279,49 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( // If anything below fails, we should abort the task. try { - // This will be filled in if we have to fall back on sorting. - var sorter: UnsafeKVExternalSorter = null - while (iterator.hasNext && sorter == null) { + val sorter: UnsafeKVExternalSorter = new UnsafeKVExternalSorter( + StructType.fromAttributes(partitionOutput), + StructType.fromAttributes(dataOutput), + SparkEnv.get.blockManager, + TaskContext.get().taskMemoryManager().pageSizeBytes) + + while (iterator.hasNext) { val inputRow = iterator.next() val currentKey = getPartitionKey(inputRow) - var currentWriter = outputWriters.get(currentKey) + sorter.insertKV(currentKey, getOutputRow(inputRow)) + } - if (currentWriter == null) { - if (outputWriters.size < maxOpenFiles) { - currentWriter = newOutputWriter(currentKey) - outputWriters.put(currentKey.copy(), currentWriter) - var i = 0 - while (i < fieldOIs.length) { - outputData(i) = if (inputRow.isNullAt(i)) { - null - } else { - wrappers(i)(inputRow.get(i, dataTypes(i))) - } - i += 1 + logInfo(s"Sorting complete. Writing out partition files one at a time.") + val sortedIterator = sorter.sortedIterator() + var currentKey: InternalRow = null + var currentWriter: FileSinkOperator.RecordWriter = null + try { + while (sortedIterator.next()) { + if (currentKey != sortedIterator.getKey) { + if (currentWriter != null) { + currentWriter.close(false) } - currentWriter.write(serializer.serialize(outputData, standardOI)) - } else { - logInfo(s"Maximum partitions reached, falling back on sorting.") - sorter = new UnsafeKVExternalSorter( - StructType.fromAttributes(partitionOutput), - StructType.fromAttributes(dataOutput), - SparkEnv.get.blockManager, - TaskContext.get().taskMemoryManager().pageSizeBytes) - sorter.insertKV(currentKey, getOutputRow(inputRow)) + currentKey = sortedIterator.getKey.copy() + logDebug(s"Writing partition: $currentKey") + currentWriter = newOutputWriter(currentKey) } - } else { + var i = 0 while (i < fieldOIs.length) { - outputData(i) = if (inputRow.isNullAt(i)) { + outputData(i) = if (sortedIterator.getValue.isNullAt(i)) { null } else { - wrappers(i)(inputRow.get(i, dataTypes(i))) + wrappers(i)(sortedIterator.getValue.get(i, dataTypes(i))) } i += 1 } currentWriter.write(serializer.serialize(outputData, standardOI)) } - } - - // If the sorter is not null that means that we reached the maxFiles above and need to finish - // using external sort. - if (sorter != null) { - while (iterator.hasNext) { - val currentRow = iterator.next() - sorter.insertKV(getPartitionKey(currentRow), getOutputRow(currentRow)) - } - - logInfo(s"Sorting complete. Writing out partition files one at a time.") - - val sortedIterator = sorter.sortedIterator() - var currentKey: InternalRow = null - var currentWriter: FileSinkOperator.RecordWriter = null - try { - while (sortedIterator.next()) { - if (currentKey != sortedIterator.getKey) { - if (currentWriter != null) { - currentWriter.close(false) - } - currentKey = sortedIterator.getKey.copy() - logDebug(s"Writing partition: $currentKey") - - // Either use an existing file from before, or open a new one. - currentWriter = outputWriters.remove(currentKey) - if (currentWriter == null) { - currentWriter = newOutputWriter(currentKey) - } - } - - var i = 0 - while (i < fieldOIs.length) { - outputData(i) = if (sortedIterator.getValue.isNullAt(i)) { - null - } else { - wrappers(i)(sortedIterator.getValue.get(i, dataTypes(i))) - } - i += 1 - } - currentWriter.write(serializer.serialize(outputData, standardOI)) - } - } finally { - if (currentWriter != null) { - currentWriter.close(false) - } + } finally { + if (currentWriter != null) { + currentWriter.close(false) } } - clearOutputWriters commit() } catch { case cause: Throwable => @@ -406,18 +353,5 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( path, Reporter.NULL) } - - def clearOutputWriters(): Unit = { - outputWriters.values.foreach(_.close(false)) - outputWriters.clear() - } - - def abortTask(): Unit = { - try { - clearOutputWriters() - } finally { - super.abortTask() - } - } } } From 5da7fdcaa376bf5b4925427414f059909d7d5beb Mon Sep 17 00:00:00 2001 From: wangfei Date: Wed, 20 Jan 2016 23:20:32 +0800 Subject: [PATCH 4/5] extract a common method --- .../spark/sql/hive/hiveWriterContainers.scala | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 7017d79adfa37..a13054534dde4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -160,8 +160,7 @@ private[hive] class SparkHiveWriterContainer( serializer } - // this function is executed on executor side - def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { + protected def executorSidePrepare() = { val serializer = newSerializer(fileSinkConf.getTableInfo) val standardOI = ObjectInspectorUtils .getStandardObjectInspector( @@ -173,6 +172,12 @@ private[hive] class SparkHiveWriterContainer( val dataTypes = inputSchema.map(_.dataType) val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) } val outputData = new Array[Any](fieldOIs.length) + (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) + } + + // this function is executed on executor side + def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { + val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = executorSidePrepare() executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) iterator.foreach { row => @@ -243,17 +248,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( // this function is executed on executor side override def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { - val serializer = newSerializer(fileSinkConf.getTableInfo) - val standardOI = ObjectInspectorUtils - .getStandardObjectInspector( - fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, - ObjectInspectorCopyOption.JAVA) - .asInstanceOf[StructObjectInspector] - - val fieldOIs = standardOI.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray - val dataTypes = inputSchema.map(_.dataType) - val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) } - val outputData = new Array[Any](fieldOIs.length) + val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = executorSidePrepare() executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) val partitionOutput = inputSchema.takeRight(dynamicPartColNames.length) From 7adbcca02758c71f60148cfb3826d61e7db1c9b7 Mon Sep 17 00:00:00 2001 From: wangfei Date: Wed, 20 Jan 2016 23:31:38 +0800 Subject: [PATCH 5/5] style fix --- .../spark/sql/hive/hiveWriterContainers.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index a13054534dde4..e9e08dbf8386a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -20,30 +20,30 @@ package org.apache.spark.sql.hive import java.text.NumberFormat import java.util.Date -import org.apache.hadoop.hive.serde2.Serializer -import org.apache.hadoop.hive.serde2.objectinspector.{StructObjectInspector, ObjectInspectorUtils} -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities} import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde2.Serializer +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred._ import org.apache.hadoop.mapreduce.TaskType import org.apache.spark._ import org.apache.spark.mapred.SparkHadoopMapRedUtil -import org.apache.spark.sql.execution.UnsafeKVExternalSorter import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.UnsafeKVExternalSorter import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableJobConf -import scala.collection.JavaConverters._ - /** * Internal helper class that saves an RDD using a Hive OutputFormat. * It is based on [[SparkHadoopWriter]]. @@ -160,7 +160,7 @@ private[hive] class SparkHiveWriterContainer( serializer } - protected def executorSidePrepare() = { + protected def prepareForWrite() = { val serializer = newSerializer(fileSinkConf.getTableInfo) val standardOI = ObjectInspectorUtils .getStandardObjectInspector( @@ -177,7 +177,7 @@ private[hive] class SparkHiveWriterContainer( // this function is executed on executor side def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { - val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = executorSidePrepare() + val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = prepareForWrite() executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) iterator.foreach { row => @@ -248,7 +248,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( // this function is executed on executor side override def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { - val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = executorSidePrepare() + val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = prepareForWrite() executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) val partitionOutput = inputSchema.takeRight(dynamicPartColNames.length)