Skip to content

Commit 81c99a5

Browse files
xuanyuankingcloud-fan
authored andcommitted
[SPARK-21435][SQL] Empty files should be skipped while write to file
## What changes were proposed in this pull request? Add EmptyDirectoryWriteTask for empty task while writing files. Fix the empty result for parquet format by leaving the first partition for meta writing. ## How was this patch tested? Add new test in `FileFormatWriterSuite ` Author: xuanyuanking <[email protected]> Closes #18654 from xuanyuanking/SPARK-21435.
1 parent 84f1b25 commit 81c99a5

File tree

2 files changed

+51
-1
lines changed

2 files changed

+51
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,10 @@ object FileFormatWriter extends Logging {
236236
committer.setupTask(taskAttemptContext)
237237

238238
val writeTask =
239-
if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) {
239+
if (sparkPartitionId != 0 && !iterator.hasNext) {
240+
// In case of empty job, leave first partition to save meta for file format like parquet.
241+
new EmptyDirectoryWriteTask
242+
} else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) {
240243
new SingleDirectoryWriteTask(description, taskAttemptContext, committer)
241244
} else {
242245
new DynamicPartitionWriteTask(description, taskAttemptContext, committer)
@@ -301,6 +304,20 @@ object FileFormatWriter extends Logging {
301304
}
302305
}
303306

307+
/** ExecuteWriteTask for empty partitions */
308+
private class EmptyDirectoryWriteTask extends ExecuteWriteTask {
309+
310+
override def execute(iter: Iterator[InternalRow]): ExecutedWriteSummary = {
311+
ExecutedWriteSummary(
312+
updatedPartitions = Set.empty,
313+
numOutputFile = 0,
314+
numOutputBytes = 0,
315+
numOutputRows = 0)
316+
}
317+
318+
override def releaseResources(): Unit = {}
319+
}
320+
304321
/** Writes data to a single directory (used for non-dynamic-partition writes). */
305322
private class SingleDirectoryWriteTask(
306323
description: WriteJobDescription,
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources
19+
20+
import org.apache.spark.sql.QueryTest
21+
import org.apache.spark.sql.test.SharedSQLContext
22+
23+
class FileFormatWriterSuite extends QueryTest with SharedSQLContext {
24+
25+
test("empty file should be skipped while write to file") {
26+
withTempPath { path =>
27+
spark.range(100).repartition(10).where("id = 50").write.parquet(path.toString)
28+
val partFiles = path.listFiles()
29+
.filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
30+
assert(partFiles.length === 2)
31+
}
32+
}
33+
}

0 commit comments

Comments
 (0)