From f1479e529b8ac525748a1dfee2ace7020810caaf Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Sun, 19 Jul 2015 09:23:29 -0700 Subject: [PATCH 1/7] Added method to check whether it's safe to create a new outputWriter --- .../apache/spark/sql/sources/commands.scala | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 5c6ef2dc90c73..34f4f615ddd43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -38,6 +38,8 @@ import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.types.StringType import org.apache.spark.util.SerializableConfiguration +import scala.collection.mutable + private[sql] case class InsertIntoDataSource( logicalRelation: LogicalRelation, query: LogicalPlan, @@ -513,15 +515,19 @@ private[sql] class DynamicPartitionWriterContainer( // All output writers are created on executor side. @transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _ + protected var maxOutputWriters = 50; + override protected def initWriters(): Unit = { outputWriters = new java.util.HashMap[String, OutputWriter] } - // The `row` argument is supposed to only contain partition column values which have been casted - // to strings. - override def outputWriterForRow(row: InternalRow): OutputWriter = { + /** + * Extract the functionality to create a partitionPath, a grouping of columns in a row, which + * serves as a key variable when allocating new outputWriters. + */ + def computePartitionPath(row: InternalRow): String = { val partitionPath = { - val partitionPathBuilder = new StringBuilder + val partitionPathBuilder = new mutable.StringBuilder var i = 0 while (i < partitionColumns.length) { @@ -541,6 +547,33 @@ private[sql] class DynamicPartitionWriterContainer( partitionPathBuilder.toString() } + partitionPath + } + + /** + * Returns true if it's possible to create a new outputWriter for a given row or to use an + * existing writer without triggering a sort operation on the incoming data to avoid memory + * problems. + * + * During {{ InsertIntoHadoopFsRelation }} new outputWriters are created for every partition. + * Creating too many outputWriters can cause us to run out of memory (SPARK-8890). Therefore, only + * create up to a certain number of outputWriters. If the number of allowed writers is exceeded, + * the existing outputWriters will be closed and a sort operation will be triggered on the + * incoming data, ensuring that it's sorted by key such that a single outputWriter may be used + * at a time. E.g. process all key1, close the writer, process key2, etc. + */ + def canGetOutputWriter(row: InternalRow): Boolean = { + (outputWriters.size() < (maxOutputWriters - 1)) || { + // Only compute this when we're near the max allowed number of outputWriters + val partitionPath = computePartitionPath(row) + outputWriters.containsKey(partitionPath) + } + } + + // The `row` argument is supposed to only contain partition column values which have been casted + // to strings. + override def outputWriterForRow(row: InternalRow): OutputWriter = { + val partitionPath: String = computePartitionPath(row) val writer = outputWriters.get(partitionPath) if (writer.eq(null)) { From 7e355c4059462cfa3a146a8dca3eb7ff31a73bf2 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Sun, 19 Jul 2015 09:54:00 -0700 Subject: [PATCH 2/7] Added logic to handle operational flow for sorting data mid-write --- .../apache/spark/sql/sources/commands.scala | 52 +++++++++++++++---- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 34f4f615ddd43..f65eb4f268ccd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -19,7 +19,11 @@ package org.apache.spark.sql.sources import java.util.{Date, UUID} +import org.apache.commons.lang.NotImplementedException + import scala.collection.JavaConversions.asScalaIterator +import scala.collection.mutable +import scala.collection.mutable.HashSet import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ @@ -38,7 +42,6 @@ import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.types.StringType import org.apache.spark.util.SerializableConfiguration -import scala.collection.mutable private[sql] case class InsertIntoDataSource( logicalRelation: LogicalRelation, @@ -197,7 +200,7 @@ private[sql] case class InsertIntoHadoopFsRelation( */ private def insertWithDynamicPartitions( sqlContext: SQLContext, - writerContainer: BaseWriterContainer, + writerContainer: DynamicPartitionWriterContainer, df: DataFrame, partitionColumns: Array[String]): Unit = { // Uses a local val for serialization @@ -238,6 +241,10 @@ private[sql] case class InsertIntoHadoopFsRelation( } def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { + // Track which rows have been output to disk so that if a data sort is necessary mid-write, + // we don't end up outputting the same data twice + val writtenRows: mutable.HashSet[InternalRow] = new HashSet[InternalRow] + // If anything below fails, we should abort the task. try { writerContainer.executorSideSetup(taskContext) @@ -253,13 +260,38 @@ private[sql] case class InsertIntoHadoopFsRelation( r: InternalRow => r.asInstanceOf[Row] } - while (iterator.hasNext) { - val internalRow = iterator.next() - val partitionPart = partitionProj(internalRow) - val dataPart = dataConverter(dataProj(internalRow)) - writerContainer.outputWriterForRow(partitionPart).write(dataPart) + // Sort the data by partition so that it's possible to use a single outputWriter at a + // time to process the incoming data + def sortRows(iterator: Iterator[InternalRow]): Iterator[InternalRow] = { + throw new NotImplementedException() + } + + // When outputting rows, we may need to interrupt the file write to sort the underlying data + // (SPARK-8890) to avoid running out of memory due to creating too many outputWriters. Thus, + // we extract this functionality into its own function that can be called with updated + // underlying data. + // TODO Add tracking of whether data has been sorted in outputWriterForRow, so that + // previously used outputWriters may be closed when complete. + def writeRowsSafe(iterator: Iterator[InternalRow]): Unit ={ + while (iterator.hasNext) { + val internalRow = iterator.next() + + if (writerContainer.canGetOutputWriter(internalRow) && + !writtenRows.contains(internalRow)) { + val partitionPart = partitionProj(internalRow) + val dataPart = dataConverter(dataProj(internalRow)) + + writerContainer.outputWriterForRow(partitionPart).write(dataPart) + writtenRows += internalRow + } else { + // TODO Sort the data by partition key + val sortedRows: Iterator[InternalRow] = sortRows(iterator) + writeRowsSafe(sortedRows) + } + } } + writeRowsSafe(iterator) writerContainer.commitTask() } catch { case cause: Throwable => logError("Aborting task.", cause) @@ -527,7 +559,7 @@ private[sql] class DynamicPartitionWriterContainer( */ def computePartitionPath(row: InternalRow): String = { val partitionPath = { - val partitionPathBuilder = new mutable.StringBuilder + val partitionPathBuilder = new StringBuilder var i = 0 while (i < partitionColumns.length) { @@ -551,10 +583,10 @@ private[sql] class DynamicPartitionWriterContainer( } /** - * Returns true if it's possible to create a new outputWriter for a given row or to use an + * Returns true if it's possible to create a new outputWriter for a given row or to use an * existing writer without triggering a sort operation on the incoming data to avoid memory * problems. - * + * * During {{ InsertIntoHadoopFsRelation }} new outputWriters are created for every partition. * Creating too many outputWriters can cause us to run out of memory (SPARK-8890). Therefore, only * create up to a certain number of outputWriters. If the number of allowed writers is exceeded, From 4588f82d249438296d44647c8766308c9bcc3f73 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Sun, 19 Jul 2015 10:03:43 -0700 Subject: [PATCH 3/7] Added logic to close existing outputWriters if data being processed has been sorted --- .../apache/spark/sql/sources/commands.scala | 37 +++++++++++++++---- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index f65eb4f268ccd..b69995a09f136 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -245,6 +245,10 @@ private[sql] case class InsertIntoHadoopFsRelation( // we don't end up outputting the same data twice val writtenRows: mutable.HashSet[InternalRow] = new HashSet[InternalRow] + // Flag to track whether data has been sorted in which case it's safe to close previously + // used outputWriters + var sorted: Boolean = false + // If anything below fails, we should abort the task. try { writerContainer.executorSideSetup(taskContext) @@ -263,6 +267,7 @@ private[sql] case class InsertIntoHadoopFsRelation( // Sort the data by partition so that it's possible to use a single outputWriter at a // time to process the incoming data def sortRows(iterator: Iterator[InternalRow]): Iterator[InternalRow] = { + // TODO Sort the data by partition key throw new NotImplementedException() } @@ -270,8 +275,6 @@ private[sql] case class InsertIntoHadoopFsRelation( // (SPARK-8890) to avoid running out of memory due to creating too many outputWriters. Thus, // we extract this functionality into its own function that can be called with updated // underlying data. - // TODO Add tracking of whether data has been sorted in outputWriterForRow, so that - // previously used outputWriters may be closed when complete. def writeRowsSafe(iterator: Iterator[InternalRow]): Unit ={ while (iterator.hasNext) { val internalRow = iterator.next() @@ -281,11 +284,11 @@ private[sql] case class InsertIntoHadoopFsRelation( val partitionPart = partitionProj(internalRow) val dataPart = dataConverter(dataProj(internalRow)) - writerContainer.outputWriterForRow(partitionPart).write(dataPart) + writerContainer.outputWriterForRow(partitionPart, sorted).write(dataPart) writtenRows += internalRow } else { - // TODO Sort the data by partition key val sortedRows: Iterator[InternalRow] = sortRows(iterator) + sorted = true writeRowsSafe(sortedRows) } } @@ -547,7 +550,7 @@ private[sql] class DynamicPartitionWriterContainer( // All output writers are created on executor side. @transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _ - protected var maxOutputWriters = 50; + protected var maxOutputWriters = 50 override protected def initWriters(): Unit = { outputWriters = new java.util.HashMap[String, OutputWriter] @@ -602,8 +605,12 @@ private[sql] class DynamicPartitionWriterContainer( } } - // The `row` argument is supposed to only contain partition column values which have been casted - // to strings. + /** + * Create the outputWriter to output a given row to disk. + * + * @param row The `row` argument is supposed to only contain partition column values + * which have been casted to strings. + */ override def outputWriterForRow(row: InternalRow): OutputWriter = { val partitionPath: String = computePartitionPath(row) @@ -620,6 +627,22 @@ private[sql] class DynamicPartitionWriterContainer( } } + /** + * Create the outputWriter to output a given row to disk. If dealing with sorted data, we + * can close previously used writers since they will no longer be necessary. + * + * @param row The `row` argument is supposed to only contain partition column values + * which have been casted to strings. + * @param shouldCloseWriters If true, close all existing writers before creating new writers + */ + def outputWriterForRow(row: InternalRow, shouldCloseWriters: Boolean): OutputWriter = { + if (shouldCloseWriters) { + clearOutputWriters() + } + + outputWriterForRow(row) + } + private def clearOutputWriters(): Unit = { if (!outputWriters.isEmpty) { asScalaIterator(outputWriters.values().iterator()).foreach(_.close()) From 494b49f514996c7ff93e23d873ce0cc475993889 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Sun, 19 Jul 2015 10:11:16 -0700 Subject: [PATCH 4/7] Style fix --- .../main/scala/org/apache/spark/sql/sources/commands.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index b69995a09f136..860f4101e1a75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -275,12 +275,12 @@ private[sql] case class InsertIntoHadoopFsRelation( // (SPARK-8890) to avoid running out of memory due to creating too many outputWriters. Thus, // we extract this functionality into its own function that can be called with updated // underlying data. - def writeRowsSafe(iterator: Iterator[InternalRow]): Unit ={ + def writeRowsSafe(iterator: Iterator[InternalRow]): Unit = { while (iterator.hasNext) { val internalRow = iterator.next() - if (writerContainer.canGetOutputWriter(internalRow) && - !writtenRows.contains(internalRow)) { + if (!writtenRows.contains(internalRow) && + writerContainer.canGetOutputWriter(internalRow)) { val partitionPart = partitionProj(internalRow) val dataPart = dataConverter(dataProj(internalRow)) From 9e15cdc37d12d895d3852e09deab509b5038867d Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Sun, 19 Jul 2015 10:14:04 -0700 Subject: [PATCH 5/7] import ordering --- .../scala/org/apache/spark/sql/sources/commands.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 860f4101e1a75..1a759427406ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -19,12 +19,11 @@ package org.apache.spark.sql.sources import java.util.{Date, UUID} -import org.apache.commons.lang.NotImplementedException - import scala.collection.JavaConversions.asScalaIterator -import scala.collection.mutable import scala.collection.mutable.HashSet +import org.apache.commons.lang.NotImplementedException + import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat} @@ -42,7 +41,6 @@ import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.types.StringType import org.apache.spark.util.SerializableConfiguration - private[sql] case class InsertIntoDataSource( logicalRelation: LogicalRelation, query: LogicalPlan, @@ -243,7 +241,7 @@ private[sql] case class InsertIntoHadoopFsRelation( def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { // Track which rows have been output to disk so that if a data sort is necessary mid-write, // we don't end up outputting the same data twice - val writtenRows: mutable.HashSet[InternalRow] = new HashSet[InternalRow] + val writtenRows: HashSet[InternalRow] = new HashSet[InternalRow] // Flag to track whether data has been sorted in which case it's safe to close previously // used outputWriters From 035f53739e88726689856b1f288d377544b7bef6 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Mon, 20 Jul 2015 08:26:10 -0700 Subject: [PATCH 6/7] Added sorting function. Fixed a traversal bug --- .../apache/spark/sql/sources/commands.scala | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 1a759427406ad..76de298ba8416 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -19,15 +19,9 @@ package org.apache.spark.sql.sources import java.util.{Date, UUID} -import scala.collection.JavaConversions.asScalaIterator -import scala.collection.mutable.HashSet - -import org.apache.commons.lang.NotImplementedException - import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat} - +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter => MapReduceFileOutputCommitter} import org.apache.spark._ import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil @@ -41,6 +35,9 @@ import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.types.StringType import org.apache.spark.util.SerializableConfiguration +import scala.collection.JavaConversions.asScalaIterator +import scala.collection.mutable.HashSet + private[sql] case class InsertIntoDataSource( logicalRelation: LogicalRelation, query: LogicalPlan, @@ -265,8 +262,8 @@ private[sql] case class InsertIntoHadoopFsRelation( // Sort the data by partition so that it's possible to use a single outputWriter at a // time to process the incoming data def sortRows(iterator: Iterator[InternalRow]): Iterator[InternalRow] = { - // TODO Sort the data by partition key - throw new NotImplementedException() + // Sort by the same key used to look up the outputWriter to allow us to recyle the writer + iterator.toArray.sortBy(writerContainer.computePartitionPath).toIterator } // When outputting rows, we may need to interrupt the file write to sort the underlying data @@ -277,6 +274,8 @@ private[sql] case class InsertIntoHadoopFsRelation( while (iterator.hasNext) { val internalRow = iterator.next() + // Only output rows that we haven't already output, this code can be called after a sort + // mid-traversal. if (!writtenRows.contains(internalRow) && writerContainer.canGetOutputWriter(internalRow)) { val partitionPart = partitionProj(internalRow) @@ -284,7 +283,10 @@ private[sql] case class InsertIntoHadoopFsRelation( writerContainer.outputWriterForRow(partitionPart, sorted).write(dataPart) writtenRows += internalRow - } else { + } else if (!writtenRows.contains(internalRow)) { + // If there are no more available output writers, sort the data, and set the sorted + // flag to true. This will then cause subsequent output writers to be cleared after + // use, minimizing the memory footprint. val sortedRows: Iterator[InternalRow] = sortRows(iterator) sorted = true writeRowsSafe(sortedRows) From 9d5a70e4808f911dd985e63836c6cc30a0c804c8 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Fri, 31 Jul 2015 09:55:08 -0700 Subject: [PATCH 7/7] import ordering --- .../apache/spark/sql/execution/datasources/commands.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala index d46b8f1933f65..f57d1da6314a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala @@ -19,6 +19,9 @@ package org.apache.spark.sql.execution.datasources import java.util.{Date, UUID} +import scala.collection.JavaConversions.asScalaIterator +import scala.collection.mutable.HashSet + import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter => MapReduceFileOutputCommitter} @@ -30,14 +33,11 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.StringType import org.apache.spark.util.SerializableConfiguration -import scala.collection.JavaConversions.asScalaIterator -import scala.collection.mutable.HashSet private[sql] case class InsertIntoDataSource( logicalRelation: LogicalRelation,