Skip to content

Commit 86a7d2f

Browse files
committed
avoid to output empty parquet files
1 parent 12cd007 commit 86a7d2f

File tree

2 files changed

+8
-5
lines changed

2 files changed

+8
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,10 @@ object FileFormatWriter extends Logging {
292292
override def execute(iter: Iterator[InternalRow]): Set[String] = {
293293
var fileCounter = 0
294294
var recordsInFile: Long = 0L
295-
newOutputWriter(fileCounter)
295+
// Skip the empty partition to avoid creating a mass of 'empty' files.
296+
if (iter.hasNext) {
297+
newOutputWriter(fileCounter)
298+
}
296299
while (iter.hasNext) {
297300
if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) {
298301
fileCounter += 1

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,10 @@ class ParquetFileFormat
140140
// initialized.
141141
private val parquetLogRedirector = ParquetLogRedirector.INSTANCE
142142

143-
override def newInstance(
144-
path: String,
145-
dataSchema: StructType,
146-
context: TaskAttemptContext): OutputWriter = {
143+
override def newInstance(
144+
path: String,
145+
dataSchema: StructType,
146+
context: TaskAttemptContext): OutputWriter = {
147147
new ParquetOutputWriter(path, context)
148148
}
149149

0 commit comments

Comments
 (0)