Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package org.apache.spark.sql.execution.datasources
import java.util.{Date, UUID}

import scala.collection.JavaConversions.asScalaIterator
import scala.collection.mutable.HashSet

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter => MapReduceFileOutputCommitter}
import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
Expand All @@ -35,7 +36,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.SerializableConfiguration


Expand Down Expand Up @@ -203,7 +203,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
*/
private def insertWithDynamicPartitions(
sqlContext: SQLContext,
writerContainer: BaseWriterContainer,
writerContainer: DynamicPartitionWriterContainer,
df: DataFrame,
partitionColumns: Array[String]): Unit = {
// Uses a local val for serialization
Expand Down Expand Up @@ -244,34 +244,65 @@ private[sql] case class InsertIntoHadoopFsRelation(
}

def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
// Track which rows have been output to disk so that if a data sort is necessary mid-write,
// we don't end up outputting the same data twice
val writtenRows: HashSet[InternalRow] = new HashSet[InternalRow]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is expensive, we should avoid that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a preferred way to do this? I could have the HashSet be created once to avoid creating it every time and clear it between calls?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we make sure that only visit the items once, then the rows will not be outputted twice.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is that after a sort, everything is reorganized so we may end up traversing some elements that have already been processed, no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the iterator can only be consumed once, so we only sort the items that have not been visited.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, so just use an `ExternalSorter`` based off that iterator to do the sort to avoid potential memory problems.


// Flag to track whether data has been sorted in which case it's safe to close previously
// used outputWriters
var sorted: Boolean = false

// If anything below fails, we should abort the task.
try {
writerContainer.executorSideSetup(taskContext)

// Projects all partition columns and casts them to strings to build partition directories.
val partitionCasts = partitionOutput.map(Cast(_, StringType))
val partitionProj = newProjection(codegenEnabled, partitionCasts, output)
val dataProj = newProjection(codegenEnabled, dataOutput, output)
// Sort the data by partition so that it's possible to use a single outputWriter at a
// time to process the incoming data
def sortRows(iterator: Iterator[InternalRow]): Iterator[InternalRow] = {
// Sort by the same key used to look up the outputWriter to allow us to recyle the writer
iterator.toArray.sortBy(writerContainer.computePartitionPath).toIterator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be many records, can not be hold in memory, so we should use external sort.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the iterator object already in memory?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the items will be loaded into memory lazily by iterator, that's the advantage of Iterator than Array.

}

if (needsConversion) {
val converter = CatalystTypeConverters.createToScalaConverter(dataSchema)
.asInstanceOf[InternalRow => Row]
while (iterator.hasNext) {
val internalRow = iterator.next()
val partitionPart = partitionProj(internalRow)
val dataPart = converter(dataProj(internalRow))
writerContainer.outputWriterForRow(partitionPart).write(dataPart)
// When outputting rows, we may need to interrupt the file write to sort the underlying data
// (SPARK-8890) to avoid running out of memory due to creating too many outputWriters. Thus,
// we extract this functionality into its own function that can be called with updated
// underlying data.
def writeRowsSafe(iterator: Iterator[InternalRow]): Unit = {
var converter: Option[InternalRow => Row] = None
if (needsConversion) {
converter = Some(CatalystTypeConverters.createToScalaConverter(dataSchema)
.asInstanceOf[InternalRow => Row])
}
} else {

while (iterator.hasNext) {
val internalRow = iterator.next()
val partitionPart = partitionProj(internalRow)
val dataPart = dataProj(internalRow)
writerContainer.outputWriterForRow(partitionPart)
.asInstanceOf[OutputWriterInternal].writeInternal(dataPart)

// Only output rows that we haven't already output, this code can be called after a sort
// mid-traversal.
if (!writtenRows.contains(internalRow) &&
writerContainer.canGetOutputWriter(internalRow)) {

converter match {
case Some(converter) =>
writerContainer.outputWriterForRow(internalRow).write(converter(internalRow))
case None =>
writerContainer.outputWriterForRow(internalRow)
.asInstanceOf[OutputWriterInternal].writeInternal(internalRow)
}

writtenRows += internalRow
} else if (!writtenRows.contains(internalRow)) {
// If there are no more available output writers, sort the data, and set the sorted
// flag to true. This will then cause subsequent output writers to be cleared after
// use, minimizing the memory footprint.
val sortedRows: Iterator[InternalRow] = sortRows(iterator)
sorted = true
writeRowsSafe(sortedRows)
}
}
}

writeRowsSafe(iterator)
writerContainer.commitTask()
} catch { case cause: Throwable =>
logError("Aborting task.", cause)
Expand Down Expand Up @@ -527,13 +558,17 @@ private[sql] class DynamicPartitionWriterContainer(
// All output writers are created on executor side.
@transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _

protected var maxOutputWriters = 50

override protected def initWriters(): Unit = {
outputWriters = new java.util.HashMap[String, OutputWriter]
}

// The `row` argument is supposed to only contain partition column values which have been casted
// to strings.
override def outputWriterForRow(row: InternalRow): OutputWriter = {
/**
* Extract the functionality to create a partitionPath, a grouping of columns in a row, which
* serves as a key variable when allocating new outputWriters.
*/
def computePartitionPath(row: InternalRow): String = {
val partitionPath = {
val partitionPathBuilder = new StringBuilder
var i = 0
Expand All @@ -559,6 +594,37 @@ private[sql] class DynamicPartitionWriterContainer(

partitionPathBuilder.toString()
}
partitionPath
}

/**
* Returns true if it's possible to create a new outputWriter for a given row or to use an
* existing writer without triggering a sort operation on the incoming data to avoid memory
* problems.
*
* During {{ InsertIntoHadoopFsRelation }} new outputWriters are created for every partition.
* Creating too many outputWriters can cause us to run out of memory (SPARK-8890). Therefore, only
* create up to a certain number of outputWriters. If the number of allowed writers is exceeded,
* the existing outputWriters will be closed and a sort operation will be triggered on the
* incoming data, ensuring that it's sorted by key such that a single outputWriter may be used
* at a time. E.g. process all key1, close the writer, process key2, etc.
*/
def canGetOutputWriter(row: InternalRow): Boolean = {
(outputWriters.size() < (maxOutputWriters - 1)) || {
// Only compute this when we're near the max allowed number of outputWriters
val partitionPath = computePartitionPath(row)
outputWriters.containsKey(partitionPath)
}
}

/**
* Create the outputWriter to output a given row to disk.
*
* @param row The `row` argument is supposed to only contain partition column values
* which have been casted to strings.
*/
override def outputWriterForRow(row: InternalRow): OutputWriter = {
val partitionPath: String = computePartitionPath(row)

val writer = outputWriters.get(partitionPath)
if (writer.eq(null)) {
Expand All @@ -573,6 +639,22 @@ private[sql] class DynamicPartitionWriterContainer(
}
}

/**
* Create the outputWriter to output a given row to disk. If dealing with sorted data, we
* can close previously used writers since they will no longer be necessary.
*
* @param row The `row` argument is supposed to only contain partition column values
* which have been casted to strings.
* @param shouldCloseWriters If true, close all existing writers before creating new writers
*/
def outputWriterForRow(row: InternalRow, shouldCloseWriters: Boolean): OutputWriter = {
if (shouldCloseWriters) {
clearOutputWriters()
}

outputWriterForRow(row)
}

private def clearOutputWriters(): Unit = {
if (!outputWriters.isEmpty) {
asScalaIterator(outputWriters.values().iterator()).foreach(_.close())
Expand Down