Skip to content

Commit f1479e5

Browse files
author
Ilya Ganelin
committed
Added method to check whether it's safe to create a new outputWriter
1 parent 59d820a commit f1479e5

File tree

1 file changed

+37
-4
lines changed

1 file changed

+37
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ import org.apache.spark.sql.execution.RunnableCommand
3838
import org.apache.spark.sql.types.StringType
3939
import org.apache.spark.util.SerializableConfiguration
4040

41+
import scala.collection.mutable
42+
4143
private[sql] case class InsertIntoDataSource(
4244
logicalRelation: LogicalRelation,
4345
query: LogicalPlan,
@@ -513,15 +515,19 @@ private[sql] class DynamicPartitionWriterContainer(
513515
// All output writers are created on executor side.
514516
@transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _
515517

518+
protected var maxOutputWriters = 50;
519+
516520
override protected def initWriters(): Unit = {
517521
outputWriters = new java.util.HashMap[String, OutputWriter]
518522
}
519523

520-
// The `row` argument is supposed to only contain partition column values which have been casted
521-
// to strings.
522-
override def outputWriterForRow(row: InternalRow): OutputWriter = {
524+
/**
525+
* Extract the functionality to create a partitionPath, a grouping of columns in a row, which
526+
* serves as a key variable when allocating new outputWriters.
527+
*/
528+
def computePartitionPath(row: InternalRow): String = {
523529
val partitionPath = {
524-
val partitionPathBuilder = new StringBuilder
530+
val partitionPathBuilder = new mutable.StringBuilder
525531
var i = 0
526532

527533
while (i < partitionColumns.length) {
@@ -541,6 +547,33 @@ private[sql] class DynamicPartitionWriterContainer(
541547

542548
partitionPathBuilder.toString()
543549
}
550+
partitionPath
551+
}
552+
553+
/**
554+
* Returns true if it's possible to create a new outputWriter for a given row or to use an
555+
* existing writer without triggering a sort operation on the incoming data to avoid memory
556+
* problems.
557+
*
558+
* During {{ InsertIntoHadoopFsRelation }} new outputWriters are created for every partition.
559+
* Creating too many outputWriters can cause us to run out of memory (SPARK-8890). Therefore, only
560+
* create up to a certain number of outputWriters. If the number of allowed writers is exceeded,
561+
* the existing outputWriters will be closed and a sort operation will be triggered on the
562+
* incoming data, ensuring that it's sorted by key such that a single outputWriter may be used
563+
* at a time. E.g. process all key1, close the writer, process key2, etc.
564+
*/
565+
def canGetOutputWriter(row: InternalRow): Boolean = {
566+
(outputWriters.size() < (maxOutputWriters - 1)) || {
567+
// Only compute this when we're near the max allowed number of outputWriters
568+
val partitionPath = computePartitionPath(row)
569+
outputWriters.containsKey(partitionPath)
570+
}
571+
}
572+
573+
// The `row` argument is supposed to only contain partition column values which have been casted
574+
// to strings.
575+
override def outputWriterForRow(row: InternalRow): OutputWriter = {
576+
val partitionPath: String = computePartitionPath(row)
544577

545578
val writer = outputWriters.get(partitionPath)
546579
if (writer.eq(null)) {

0 commit comments

Comments
 (0)