diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 7957224ce48b..f78a2e0d7b0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -292,7 +292,10 @@ object FileFormatWriter extends Logging { override def execute(iter: Iterator[InternalRow]): Set[String] = { var fileCounter = 0 var recordsInFile: Long = 0L - newOutputWriter(fileCounter) + // Skip the empty partition to avoid creating a mass of 'empty' files. + if (iter.hasNext) { + newOutputWriter(fileCounter) + } while (iter.hasNext) { if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) { fileCounter += 1 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 062aa5c8ea62..cf293be0968d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -140,10 +140,10 @@ class ParquetFileFormat // initialized. private val parquetLogRedirector = ParquetLogRedirector.INSTANCE - override def newInstance( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter = { + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { new ParquetOutputWriter(path, context) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index f67444fbc49d..fdc237eda54b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.streaming.{MemoryStream, MetadataLogFileIndex} import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType, TimestampType} import org.apache.spark.util.Utils class FileStreamSinkSuite extends StreamTest { @@ -172,6 +172,10 @@ class FileStreamSinkSuite extends StreamTest { .format("parquet") .start(outputDir) + val userDefinedSchema = new StructType() + .add("start", TimestampType, nullable = true) + .add("end", TimestampType, nullable = true) + .add("count", LongType, nullable = true) def addTimestamp(timestampInSecs: Int*): Unit = { inputData.addData(timestampInSecs.map(_ * 1L): _*) @@ -181,7 +185,7 @@ class FileStreamSinkSuite extends StreamTest { } def check(expectedResult: ((Long, Long), Long)*): Unit = { - val outputDf = spark.read.parquet(outputDir) + val outputDf = spark.read.schema(userDefinedSchema).parquet(outputDir) .selectExpr( "CAST(start as BIGINT) AS start", "CAST(end as BIGINT) AS end", @@ -212,7 +216,6 @@ class FileStreamSinkSuite extends StreamTest { test("Update and Complete output mode not supported") { val df = MemoryStream[Int].toDF().groupBy().count() - val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath withTempDir { dir =>