diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala index d551f386eee6..f57d1da6314a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql.execution.datasources import java.util.{Date, UUID} import scala.collection.JavaConversions.asScalaIterator +import scala.collection.mutable.HashSet import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter => MapReduceFileOutputCommitter} import org.apache.spark._ import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil @@ -35,7 +36,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.StringType import org.apache.spark.util.SerializableConfiguration @@ -203,7 +203,7 @@ private[sql] case class InsertIntoHadoopFsRelation( */ private def insertWithDynamicPartitions( sqlContext: SQLContext, - writerContainer: BaseWriterContainer, + writerContainer: DynamicPartitionWriterContainer, df: DataFrame, partitionColumns: Array[String]): Unit = { // Uses a local val for serialization @@ -244,34 +244,65 @@ private[sql] case class InsertIntoHadoopFsRelation( } def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { + // Track which rows have been output to disk so that if a data sort is necessary mid-write, + // we don't end up outputting the same data twice + val writtenRows: HashSet[InternalRow] = new HashSet[InternalRow] + + // Flag to track whether data has been sorted in which case it's safe to close previously + // used outputWriters + var sorted: Boolean = false + // If anything below fails, we should abort the task. try { writerContainer.executorSideSetup(taskContext) - // Projects all partition columns and casts them to strings to build partition directories. - val partitionCasts = partitionOutput.map(Cast(_, StringType)) - val partitionProj = newProjection(codegenEnabled, partitionCasts, output) - val dataProj = newProjection(codegenEnabled, dataOutput, output) + // Sort the data by partition so that it's possible to use a single outputWriter at a + // time to process the incoming data + def sortRows(iterator: Iterator[InternalRow]): Iterator[InternalRow] = { + // Sort by the same key used to look up the outputWriter to allow us to recyle the writer + iterator.toArray.sortBy(writerContainer.computePartitionPath).toIterator + } - if (needsConversion) { - val converter = CatalystTypeConverters.createToScalaConverter(dataSchema) - .asInstanceOf[InternalRow => Row] - while (iterator.hasNext) { - val internalRow = iterator.next() - val partitionPart = partitionProj(internalRow) - val dataPart = converter(dataProj(internalRow)) - writerContainer.outputWriterForRow(partitionPart).write(dataPart) + // When outputting rows, we may need to interrupt the file write to sort the underlying data + // (SPARK-8890) to avoid running out of memory due to creating too many outputWriters. Thus, + // we extract this functionality into its own function that can be called with updated + // underlying data. + def writeRowsSafe(iterator: Iterator[InternalRow]): Unit = { + var converter: Option[InternalRow => Row] = None + if (needsConversion) { + converter = Some(CatalystTypeConverters.createToScalaConverter(dataSchema) + .asInstanceOf[InternalRow => Row]) } - } else { + while (iterator.hasNext) { val internalRow = iterator.next() - val partitionPart = partitionProj(internalRow) - val dataPart = dataProj(internalRow) - writerContainer.outputWriterForRow(partitionPart) - .asInstanceOf[OutputWriterInternal].writeInternal(dataPart) + + // Only output rows that we haven't already output, this code can be called after a sort + // mid-traversal. + if (!writtenRows.contains(internalRow) && + writerContainer.canGetOutputWriter(internalRow)) { + + converter match { + case Some(converter) => + writerContainer.outputWriterForRow(internalRow).write(converter(internalRow)) + case None => + writerContainer.outputWriterForRow(internalRow) + .asInstanceOf[OutputWriterInternal].writeInternal(internalRow) + } + + writtenRows += internalRow + } else if (!writtenRows.contains(internalRow)) { + // If there are no more available output writers, sort the data, and set the sorted + // flag to true. This will then cause subsequent output writers to be cleared after + // use, minimizing the memory footprint. + val sortedRows: Iterator[InternalRow] = sortRows(iterator) + sorted = true + writeRowsSafe(sortedRows) + } } } + writeRowsSafe(iterator) writerContainer.commitTask() } catch { case cause: Throwable => logError("Aborting task.", cause) @@ -527,13 +558,17 @@ private[sql] class DynamicPartitionWriterContainer( // All output writers are created on executor side. @transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _ + protected var maxOutputWriters = 50 + override protected def initWriters(): Unit = { outputWriters = new java.util.HashMap[String, OutputWriter] } - // The `row` argument is supposed to only contain partition column values which have been casted - // to strings. - override def outputWriterForRow(row: InternalRow): OutputWriter = { + /** + * Extract the functionality to create a partitionPath, a grouping of columns in a row, which + * serves as a key variable when allocating new outputWriters. + */ + def computePartitionPath(row: InternalRow): String = { val partitionPath = { val partitionPathBuilder = new StringBuilder var i = 0 @@ -559,6 +594,37 @@ private[sql] class DynamicPartitionWriterContainer( partitionPathBuilder.toString() } + partitionPath + } + + /** + * Returns true if it's possible to create a new outputWriter for a given row or to use an + * existing writer without triggering a sort operation on the incoming data to avoid memory + * problems. + * + * During {{ InsertIntoHadoopFsRelation }} new outputWriters are created for every partition. + * Creating too many outputWriters can cause us to run out of memory (SPARK-8890). Therefore, only + * create up to a certain number of outputWriters. If the number of allowed writers is exceeded, + * the existing outputWriters will be closed and a sort operation will be triggered on the + * incoming data, ensuring that it's sorted by key such that a single outputWriter may be used + * at a time. E.g. process all key1, close the writer, process key2, etc. + */ + def canGetOutputWriter(row: InternalRow): Boolean = { + (outputWriters.size() < (maxOutputWriters - 1)) || { + // Only compute this when we're near the max allowed number of outputWriters + val partitionPath = computePartitionPath(row) + outputWriters.containsKey(partitionPath) + } + } + + /** + * Create the outputWriter to output a given row to disk. + * + * @param row The `row` argument is supposed to only contain partition column values + * which have been casted to strings. + */ + override def outputWriterForRow(row: InternalRow): OutputWriter = { + val partitionPath: String = computePartitionPath(row) val writer = outputWriters.get(partitionPath) if (writer.eq(null)) { @@ -573,6 +639,22 @@ private[sql] class DynamicPartitionWriterContainer( } } + /** + * Create the outputWriter to output a given row to disk. If dealing with sorted data, we + * can close previously used writers since they will no longer be necessary. + * + * @param row The `row` argument is supposed to only contain partition column values + * which have been casted to strings. + * @param shouldCloseWriters If true, close all existing writers before creating new writers + */ + def outputWriterForRow(row: InternalRow, shouldCloseWriters: Boolean): OutputWriter = { + if (shouldCloseWriters) { + clearOutputWriters() + } + + outputWriterForRow(row) + } + private def clearOutputWriters(): Unit = { if (!outputWriters.isEmpty) { asScalaIterator(outputWriters.values().iterator()).foreach(_.close())