From 80184556a48b22793ef2bdf991595cc845a9aded Mon Sep 17 00:00:00 2001 From: Keuntae Park Date: Tue, 25 Aug 2015 10:21:22 +0900 Subject: [PATCH 01/10] do not make empty file when insert overwrite into Hive table --- .../hive/execution/InsertIntoHiveTable.scala | 50 ++++++++++--------- .../sql/hive/InsertIntoHiveTableSuite.scala | 26 ++++++++++ 2 files changed, 52 insertions(+), 24 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 12c667e6e92d..5b617b3ea9d0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -87,33 +87,35 @@ case class InsertIntoHiveTable( // Note that this function is executed on executor side def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { - val serializer = newSerializer(fileSinkConf.getTableInfo) - val standardOI = ObjectInspectorUtils - .getStandardObjectInspector( - fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, - ObjectInspectorCopyOption.JAVA) - .asInstanceOf[StructObjectInspector] - - val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray - val dataTypes: Array[DataType] = child.output.map(_.dataType).toArray - val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt)} - val outputData = new Array[Any](fieldOIs.length) - - writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) - - iterator.foreach { row => - var i = 0 - while (i < fieldOIs.length) { - outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i))) - i += 1 + if (iterator.hasNext) { + val serializer = newSerializer(fileSinkConf.getTableInfo) + val standardOI = ObjectInspectorUtils + .getStandardObjectInspector( + fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val dataTypes: Array[DataType] = child.output.map(_.dataType).toArray + val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt)} + val outputData = new Array[Any](fieldOIs.length) + + writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) + + iterator.foreach { row => + var i = 0 + while (i < fieldOIs.length) { + outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i))) + i += 1 + } + + writerContainer + .getLocalFileWriter(row, table.schema) + .write(serializer.serialize(outputData, standardOI)) } - writerContainer - .getLocalFileWriter(row, table.schema) - .write(serializer.serialize(outputData, standardOI)) + writerContainer.close() } - - writerContainer.close() } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index d33e81227db8..12c08f108ebe 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -262,4 +262,30 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { sql("DROP TABLE table_with_partition") } + + test("Remove empty file creation") { + val testData = TestHive.sparkContext.parallelize( + (1 to 2).map(i => TestData(i, i.toString))).toDF() + testData.registerTempTable("testData") + + val tmpDir = Utils.createTempDir() + sql( + s""" + |CREATE TABLE table1(key int,value string) + |location '${tmpDir.toURI.toString}' + """.stripMargin) + sql( + """ + |INSERT OVERWRITE TABLE table1 + |SELECT count(key), value FROM testData GROUP BY value + """.stripMargin) + def listFiles(path: File): List[File] = { + val file = path.listFiles() + file.filter { e => e.isFile && !e.getName.endsWith(".crc")}.toList + } + val fileList = listFiles(tmpDir) + assert(fileList.filter(e => e.length > 0).sortBy(_.getName) === fileList.sortBy(_.getName)) + + sql("DROP TABLE table1") + } } From e2749d7bc78d6b64644cd7fcbf89fbda27e5688b Mon Sep 17 00:00:00 2001 From: Keuntae Park Date: Tue, 25 Aug 2015 13:07:37 +0900 Subject: [PATCH 02/10] change test name to reflect issue number and name --- .../org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 12c08f108ebe..64db73cbd3b9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -263,7 +263,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { sql("DROP TABLE table_with_partition") } - test("Remove empty file creation") { + test("SPARK-10216: Avoid creating empty files during overwrite into Hive table with group by query") { val testData = TestHive.sparkContext.parallelize( (1 to 2).map(i => TestData(i, i.toString))).toDF() testData.registerTempTable("testData") From acdc53778eb5f752d3a9462d7152aaa36df3ac41 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 3 May 2016 09:55:26 +0900 Subject: [PATCH 03/10] Stash chagnes --- .../hive/execution/InsertIntoHiveTable.scala | 51 +++++++++---------- .../sql/hive/InsertIntoHiveTableSuite.scala | 27 +--------- 2 files changed, 26 insertions(+), 52 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 5b617b3ea9d0..c6493f1e546f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -87,35 +87,33 @@ case class InsertIntoHiveTable( // Note that this function is executed on executor side def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { - if (iterator.hasNext) { - val serializer = newSerializer(fileSinkConf.getTableInfo) - val standardOI = ObjectInspectorUtils - .getStandardObjectInspector( - fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, - ObjectInspectorCopyOption.JAVA) - .asInstanceOf[StructObjectInspector] - - val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray - val dataTypes: Array[DataType] = child.output.map(_.dataType).toArray - val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt)} - val outputData = new Array[Any](fieldOIs.length) - - writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) - - iterator.foreach { row => - var i = 0 - while (i < fieldOIs.length) { - outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i))) - i += 1 - } - - writerContainer - .getLocalFileWriter(row, table.schema) - .write(serializer.serialize(outputData, standardOI)) + val serializer = newSerializer(fileSinkConf.getTableInfo) + val standardOI = ObjectInspectorUtils + .getStandardObjectInspector( + fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val dataTypes: Array[DataType] = child.output.map(_.dataType).toArray + val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt)} + val outputData = new Array[Any](fieldOIs.length) + + writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) + + iterator.foreach { row => + var i = 0 + while (i < fieldOIs.length) { + outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i))) + i += 1 } - writerContainer.close() + writerContainer + .getLocalFileWriter(row, table.schema) + .write(serializer.serialize(outputData, standardOI)) } + + writerContainer.close() } } @@ -265,3 +263,4 @@ case class InsertIntoHiveTable( sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1) } } + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 64db73cbd3b9..5ecb0bcff115 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -262,30 +262,5 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { sql("DROP TABLE table_with_partition") } - - test("SPARK-10216: Avoid creating empty files during overwrite into Hive table with group by query") { - val testData = TestHive.sparkContext.parallelize( - (1 to 2).map(i => TestData(i, i.toString))).toDF() - testData.registerTempTable("testData") - - val tmpDir = Utils.createTempDir() - sql( - s""" - |CREATE TABLE table1(key int,value string) - |location '${tmpDir.toURI.toString}' - """.stripMargin) - sql( - """ - |INSERT OVERWRITE TABLE table1 - |SELECT count(key), value FROM testData GROUP BY value - """.stripMargin) - def listFiles(path: File): List[File] = { - val file = path.listFiles() - file.filter { e => e.isFile && !e.getName.endsWith(".crc")}.toList - } - val fileList = listFiles(tmpDir) - assert(fileList.filter(e => e.length > 0).sortBy(_.getName) === fileList.sortBy(_.getName)) - - sql("DROP TABLE table1") - } } + From 57f2eccdcb2b8249142da22509068ceefbef04e1 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 3 May 2016 10:22:59 +0900 Subject: [PATCH 04/10] Add the function in hiveWriterContainers and polish test codes --- .../hive/execution/InsertIntoHiveTable.scala | 1 - .../spark/sql/hive/hiveWriterContainers.scala | 24 +++++++------ .../sql/hive/InsertIntoHiveTableSuite.scala | 36 ++++++++++++++++--- 3 files changed, 44 insertions(+), 17 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index a657550bc675..73ccec2ee0ba 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -303,4 +303,3 @@ case class InsertIntoHiveTable( sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1) } } - diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 794fe264ead5..706fdbc2604f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -178,19 +178,21 @@ private[hive] class SparkHiveWriterContainer( // this function is executed on executor side def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { - val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = prepareForWrite() - executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) - - iterator.foreach { row => - var i = 0 - while (i < fieldOIs.length) { - outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i))) - i += 1 + if (iterator.hasNext) { + val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = prepareForWrite() + executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) + + iterator.foreach { row => + var i = 0 + while (i < fieldOIs.length) { + outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i))) + i += 1 + } + writer.write(serializer.serialize(outputData, standardOI)) } - writer.write(serializer.serialize(outputData, standardOI)) - } - close() + close() + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 1e8974c3b44c..cce236a1736b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -116,10 +116,10 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef sql( s""" - |CREATE TABLE table_with_partition(c1 string) - |PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string) - |location '${tmpDir.toURI.toString}' - """.stripMargin) + |CREATE TABLE table_with_partition(c1 string) + |PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string) + |location '${tmpDir.toURI.toString}' + """.stripMargin) sql( """ |INSERT OVERWRITE TABLE table_with_partition @@ -213,5 +213,31 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef sql("DROP TABLE hiveTableWithStructValue") } -} + test("SPARK-10216: Avoid empty files during overwrite into Hive table with group by query") { + val testDataset = hiveContext.sparkContext.parallelize( + (1 to 2).map(i => TestData(i, i.toString))).toDF() + testDataset.registerTempTable("testDataset") + + val tmpDir = Utils.createTempDir() + sql( + s""" + |CREATE TABLE table1(key int,value string) + |location '${tmpDir.toURI.toString}' + """.stripMargin) + sql( + """ + |INSERT OVERWRITE TABLE table1 + |SELECT count(key), value FROM testDataset GROUP BY value + """.stripMargin) + + val orcFiles = tmpDir.listFiles() + .filter(f => f.isFile && f.getName.endsWith(".crc")) + .sortBy(_.getName) + val orcFilesWithoutEmpty = orcFiles.filter(_.length > 0) + + assert(orcFiles === orcFilesWithoutEmpty) + + sql("DROP TABLE table1") + } +} From 294b4474477449fdac192320d022fa28960d87ca Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 3 May 2016 10:34:27 +0900 Subject: [PATCH 05/10] Edit variable names --- .../apache/spark/sql/hive/InsertIntoHiveTableSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index cce236a1736b..3d8509cf53a1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -231,12 +231,12 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef |SELECT count(key), value FROM testDataset GROUP BY value """.stripMargin) - val orcFiles = tmpDir.listFiles() + val overwrittenFiles = tmpDir.listFiles() .filter(f => f.isFile && f.getName.endsWith(".crc")) .sortBy(_.getName) - val orcFilesWithoutEmpty = orcFiles.filter(_.length > 0) + val overwrittenFilesWithoutEmpty = overwrittenFiles.filter(_.length > 0) - assert(orcFiles === orcFilesWithoutEmpty) + assert(overwrittenFiles === overwrittenFilesWithoutEmpty) sql("DROP TABLE table1") } From ab2d0922da40cc4c7377b14cb7175dcd242a8608 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 3 May 2016 12:45:01 +0900 Subject: [PATCH 06/10] Appropriate checking in test codes --- .../org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 3d8509cf53a1..5dcd5bf81e1a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -232,7 +232,7 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef """.stripMargin) val overwrittenFiles = tmpDir.listFiles() - .filter(f => f.isFile && f.getName.endsWith(".crc")) + .filter(f => f.isFile && !f.getName.endsWith(".crc")) .sortBy(_.getName) val overwrittenFilesWithoutEmpty = overwrittenFiles.filter(_.length > 0) From dee6a4ebf11a28c958d27162e897641db4ac0aec Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 3 May 2016 19:35:53 +0900 Subject: [PATCH 07/10] Do not write empty files for internal datasources as well. --- .../datasources/WriterContainer.scala | 221 +++++++++--------- .../sql/sources/HadoopFsRelationTest.scala | 18 ++ 2 files changed, 131 insertions(+), 108 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 3b064a5bc489..7e12bbb2128b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -239,48 +239,50 @@ private[sql] class DefaultWriterContainer( extends BaseWriterContainer(relation, job, isAppend) { def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { - executorSideSetup(taskContext) - val configuration = taskAttemptContext.getConfiguration - configuration.set("spark.sql.sources.output.path", outputPath) - var writer = newOutputWriter(getWorkPath) - writer.initConverter(dataSchema) - - // If anything below fails, we should abort the task. - try { - Utils.tryWithSafeFinallyAndFailureCallbacks { - while (iterator.hasNext) { - val internalRow = iterator.next() - writer.writeInternal(internalRow) - } - commitTask() - }(catchBlock = abortTask()) - } catch { - case t: Throwable => - throw new SparkException("Task failed while writing rows", t) - } + if (iterator.hasNext) { + executorSideSetup(taskContext) + val configuration = taskAttemptContext.getConfiguration + configuration.set("spark.sql.sources.output.path", outputPath) + var writer = newOutputWriter(getWorkPath) + writer.initConverter(dataSchema) - def commitTask(): Unit = { + // If anything below fails, we should abort the task. try { - if (writer != null) { - writer.close() - writer = null - } - super.commitTask() + Utils.tryWithSafeFinallyAndFailureCallbacks { + while (iterator.hasNext) { + val internalRow = iterator.next() + writer.writeInternal(internalRow) + } + commitTask() + }(catchBlock = abortTask()) } catch { - case cause: Throwable => - // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and - // will cause `abortTask()` to be invoked. - throw new RuntimeException("Failed to commit task", cause) + case t: Throwable => + throw new SparkException("Task failed while writing rows", t) } - } - def abortTask(): Unit = { - try { - if (writer != null) { - writer.close() + def commitTask(): Unit = { + try { + if (writer != null) { + writer.close() + writer = null + } + super.commitTask() + } catch { + case cause: Throwable => + // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and + // will cause `abortTask()` to be invoked. + throw new RuntimeException("Failed to commit task", cause) + } + } + + def abortTask(): Unit = { + try { + if (writer != null) { + writer.close() + } + } finally { + super.abortTask() } - } finally { - super.abortTask() } } } @@ -363,84 +365,87 @@ private[sql] class DynamicPartitionWriterContainer( } def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { - executorSideSetup(taskContext) - - // We should first sort by partition columns, then bucket id, and finally sorting columns. - val sortingExpressions: Seq[Expression] = partitionColumns ++ bucketIdExpression ++ sortColumns - val getSortingKey = UnsafeProjection.create(sortingExpressions, inputSchema) - - val sortingKeySchema = StructType(sortingExpressions.map { - case a: Attribute => StructField(a.name, a.dataType, a.nullable) - // The sorting expressions are all `Attribute` except bucket id. - case _ => StructField("bucketId", IntegerType, nullable = false) - }) - - // Returns the data columns to be written given an input row - val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema) - - // Returns the partition path given a partition key. - val getPartitionString = - UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns) - - // Sorts the data before write, so that we only need one writer at the same time. - // TODO: inject a local sort operator in planning. - val sorter = new UnsafeKVExternalSorter( - sortingKeySchema, - StructType.fromAttributes(dataColumns), - SparkEnv.get.blockManager, - SparkEnv.get.serializerManager, - TaskContext.get().taskMemoryManager().pageSizeBytes) - - while (iterator.hasNext) { - val currentRow = iterator.next() - sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow)) - } - logInfo(s"Sorting complete. Writing out partition files one at a time.") - - val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) { - identity - } else { - UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map { - case (expr, ordinal) => BoundReference(ordinal, expr.dataType, expr.nullable) + if (iterator.hasNext) { + executorSideSetup(taskContext) + + // We should first sort by partition columns, then bucket id, and finally sorting columns. + val sortingExpressions: Seq[Expression] = + partitionColumns ++ bucketIdExpression ++ sortColumns + val getSortingKey = UnsafeProjection.create(sortingExpressions, inputSchema) + + val sortingKeySchema = StructType(sortingExpressions.map { + case a: Attribute => StructField(a.name, a.dataType, a.nullable) + // The sorting expressions are all `Attribute` except bucket id. + case _ => StructField("bucketId", IntegerType, nullable = false) }) - } - val sortedIterator = sorter.sortedIterator() + // Returns the data columns to be written given an input row + val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema) + + // Returns the partition path given a partition key. + val getPartitionString = + UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns) + + // Sorts the data before write, so that we only need one writer at the same time. + // TODO: inject a local sort operator in planning. + val sorter = new UnsafeKVExternalSorter( + sortingKeySchema, + StructType.fromAttributes(dataColumns), + SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, + TaskContext.get().taskMemoryManager().pageSizeBytes) + + while (iterator.hasNext) { + val currentRow = iterator.next() + sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow)) + } + logInfo(s"Sorting complete. Writing out partition files one at a time.") + + val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) { + identity + } else { + UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map { + case (expr, ordinal) => BoundReference(ordinal, expr.dataType, expr.nullable) + }) + } - // If anything below fails, we should abort the task. - var currentWriter: OutputWriter = null - try { - Utils.tryWithSafeFinallyAndFailureCallbacks { - var currentKey: UnsafeRow = null - while (sortedIterator.next()) { - val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow] - if (currentKey != nextKey) { - if (currentWriter != null) { - currentWriter.close() - currentWriter = null - } - currentKey = nextKey.copy() - logDebug(s"Writing partition: $currentKey") + val sortedIterator = sorter.sortedIterator() - currentWriter = newOutputWriter(currentKey, getPartitionString) + // If anything below fails, we should abort the task. + var currentWriter: OutputWriter = null + try { + Utils.tryWithSafeFinallyAndFailureCallbacks { + var currentKey: UnsafeRow = null + while (sortedIterator.next()) { + val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow] + if (currentKey != nextKey) { + if (currentWriter != null) { + currentWriter.close() + currentWriter = null + } + currentKey = nextKey.copy() + logDebug(s"Writing partition: $currentKey") + + currentWriter = newOutputWriter(currentKey, getPartitionString) + } + currentWriter.writeInternal(sortedIterator.getValue) + } + if (currentWriter != null) { + currentWriter.close() + currentWriter = null } - currentWriter.writeInternal(sortedIterator.getValue) - } - if (currentWriter != null) { - currentWriter.close() - currentWriter = null - } - commitTask() - }(catchBlock = { - if (currentWriter != null) { - currentWriter.close() - } - abortTask() - }) - } catch { - case t: Throwable => - throw new SparkException("Task failed while writing rows", t) + commitTask() + }(catchBlock = { + if (currentWriter != null) { + currentWriter.close() + } + abortTask() + }) + } catch { + case t: Throwable => + throw new SparkException("Task failed while writing rows", t) + } } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 67b403a9bd3a..168691ded570 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -695,6 +695,24 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } } } + + test("SPARK-10216: Avoid empty files during overwriting table with group by query") { + withTempPath { path => + val df = sqlContext.range(0, 5) + val groupedDF = df.groupBy("id").count() + groupedDF.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .save(path.getCanonicalPath) + + val overwrittenFiles = path.listFiles() + .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) + .sortBy(_.getName) + val overwrittenFilesWithoutEmpty = overwrittenFiles.filter(_.length > 0) + + assert(overwrittenFiles === overwrittenFilesWithoutEmpty) + } + } } // This class is used to test SPARK-8578. We should not use any custom output committer when From b595b7f84eb7567f68ed78c42cc2d94f2173fb2c Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 3 May 2016 19:41:20 +0900 Subject: [PATCH 08/10] Rename test in HadoopFsRelationTest --- .../org/apache/spark/sql/sources/HadoopFsRelationTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 168691ded570..f3a618e641b0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -696,7 +696,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } } - test("SPARK-10216: Avoid empty files during overwriting table with group by query") { + test("SPARK-10216: Avoid empty files during overwriting with group by query") { withTempPath { path => val df = sqlContext.range(0, 5) val groupedDF = df.groupBy("id").count() From 24e16b794f61dc629bbeced274cbcab02d044bbc Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 10 May 2016 09:17:09 +0900 Subject: [PATCH 09/10] Remove unused import --- .../org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index b45877925c39..3cc9b05ffeac 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive import java.io.File -import org.apache.hadoop.hive.conf.HiveConf import org.scalatest.BeforeAndAfter import org.apache.spark.SparkException From 5f780a7ba60b2f0518897a9a369d27455cab47cb Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 17 May 2016 14:35:35 +0900 Subject: [PATCH 10/10] Explicitly set shuffle partition --- .../sql/hive/InsertIntoHiveTableSuite.scala | 45 ++++++++++--------- .../sql/sources/HadoopFsRelationTest.scala | 28 ++++++------ 2 files changed, 39 insertions(+), 34 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index c61f54e20415..883cdac110e0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -22,9 +22,10 @@ import java.io.File import org.scalatest.BeforeAndAfter import org.apache.spark.SparkException -import org.apache.spark.sql.{QueryTest, _} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -216,30 +217,32 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } test("SPARK-10216: Avoid empty files during overwrite into Hive table with group by query") { - val testDataset = hiveContext.sparkContext.parallelize( - (1 to 2).map(i => TestData(i, i.toString))).toDF() - testDataset.registerTempTable("testDataset") + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + val testDataset = hiveContext.sparkContext.parallelize( + (1 to 2).map(i => TestData(i, i.toString))).toDF() + testDataset.createOrReplaceTempView("testDataset") - val tmpDir = Utils.createTempDir() - sql( - s""" - |CREATE TABLE table1(key int,value string) - |location '${tmpDir.toURI.toString}' - """.stripMargin) - sql( - """ - |INSERT OVERWRITE TABLE table1 - |SELECT count(key), value FROM testDataset GROUP BY value - """.stripMargin) + val tmpDir = Utils.createTempDir() + sql( + s""" + |CREATE TABLE table1(key int,value string) + |location '${tmpDir.toURI.toString}' + """.stripMargin) + sql( + """ + |INSERT OVERWRITE TABLE table1 + |SELECT count(key), value FROM testDataset GROUP BY value + """.stripMargin) - val overwrittenFiles = tmpDir.listFiles() - .filter(f => f.isFile && !f.getName.endsWith(".crc")) - .sortBy(_.getName) - val overwrittenFilesWithoutEmpty = overwrittenFiles.filter(_.length > 0) + val overwrittenFiles = tmpDir.listFiles() + .filter(f => f.isFile && !f.getName.endsWith(".crc")) + .sortBy(_.getName) + val overwrittenFilesWithoutEmpty = overwrittenFiles.filter(_.length > 0) - assert(overwrittenFiles === overwrittenFilesWithoutEmpty) + assert(overwrittenFiles === overwrittenFilesWithoutEmpty) - sql("DROP TABLE table1") + sql("DROP TABLE table1") + } } test("Reject partitioning that does not match table") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 11a30a0996b2..78d2dc28d6b5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ import org.apache.spark.sql.execution.DataSourceScanExec -import org.apache.spark.sql.execution.datasources.{FileScanRDD, HadoopFsRelation, LocalityTestFileSystem, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{FileScanRDD, LocalityTestFileSystem} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -881,20 +881,22 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } test("SPARK-10216: Avoid empty files during overwriting with group by query") { - withTempPath { path => - val df = sqlContext.range(0, 5) - val groupedDF = df.groupBy("id").count() - groupedDF.write - .format(dataSourceName) - .mode(SaveMode.Overwrite) - .save(path.getCanonicalPath) + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + withTempPath { path => + val df = spark.range(0, 5) + val groupedDF = df.groupBy("id").count() + groupedDF.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .save(path.getCanonicalPath) - val overwrittenFiles = path.listFiles() - .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) - .sortBy(_.getName) - val overwrittenFilesWithoutEmpty = overwrittenFiles.filter(_.length > 0) + val overwrittenFiles = path.listFiles() + .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) + .sortBy(_.getName) + val overwrittenFilesWithoutEmpty = overwrittenFiles.filter(_.length > 0) - assert(overwrittenFiles === overwrittenFilesWithoutEmpty) + assert(overwrittenFiles === overwrittenFilesWithoutEmpty) + } } } }