Skip to content

Commit 0d3d46d

Browse files
HeartSaVioRdongjoon-hyun
authored andcommitted
[SPARK-29999][SS] Handle FileStreamSink metadata correctly for empty partition
### What changes were proposed in this pull request? This patch checks the existence of output file for each task while committing the task, so that it doesn't throw FileNotFoundException while creating SinkFileStatus. The check is newly required for DSv2 implementation of FileStreamSink, as it is changed to create the output file lazily (as an improvement). JSON writer for example: https://github.com/apache/spark/blob/9ec2a4e58caa4128e9c690d72239cebd6b732084/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala#L49-L60 ### Why are the changes needed? Without this patch, FileStreamSink throws FileNotFoundException when writing empty partition. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added UT. Closes #26639 from HeartSaVioR/SPARK-29999. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent cb68e58 commit 0d3d46d

File tree

2 files changed

+67
-6
lines changed

2 files changed

+67
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.UUID
2222

2323
import scala.collection.mutable.ArrayBuffer
2424

25-
import org.apache.hadoop.fs.Path
25+
import org.apache.hadoop.fs.{FileSystem, Path}
2626
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
2727

2828
import org.apache.spark.internal.Logging
@@ -89,9 +89,7 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
8989
try {
9090
val fs = path.getFileSystem(jobContext.getConfiguration)
9191
// this is to make sure the file can be seen from driver as well
92-
if (fs.exists(path)) {
93-
fs.delete(path, false)
94-
}
92+
deleteIfExists(fs, path)
9593
} catch {
9694
case e: IOException =>
9795
logWarning(s"Fail to remove temporary file $path, continue removing next.", e)
@@ -139,7 +137,14 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
139137
if (addedFiles.nonEmpty) {
140138
val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)
141139
val statuses: Seq[SinkFileStatus] =
142-
addedFiles.map(f => SinkFileStatus(fs.getFileStatus(new Path(f))))
140+
addedFiles.flatMap { f =>
141+
val path = new Path(f)
142+
if (fs.exists(path)) {
143+
Some(SinkFileStatus(fs.getFileStatus(path)))
144+
} else {
145+
None
146+
}
147+
}
143148
new TaskCommitMessage(statuses)
144149
} else {
145150
new TaskCommitMessage(Seq.empty[SinkFileStatus])
@@ -150,7 +155,13 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
150155
// best effort cleanup of incomplete files
151156
if (addedFiles.nonEmpty) {
152157
val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)
153-
addedFiles.foreach { file => fs.delete(new Path(file), false) }
158+
addedFiles.foreach { file => deleteIfExists(fs, new Path(file)) }
159+
}
160+
}
161+
162+
private def deleteIfExists(fs: FileSystem, path: Path, recursive: Boolean = false): Unit = {
163+
if (fs.exists(path)) {
164+
fs.delete(path, recursive)
154165
}
155166
}
156167
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,11 +600,61 @@ class FileStreamSinkV1Suite extends FileStreamSinkSuite {
600600
}
601601

602602
class FileStreamSinkV2Suite extends FileStreamSinkSuite {
603+
import testImplicits._
604+
603605
override protected def sparkConf: SparkConf =
604606
super
605607
.sparkConf
606608
.set(SQLConf.USE_V1_SOURCE_LIST, "")
607609

610+
test("SPARK-29999 Handle FileStreamSink metadata correctly for empty partition") {
611+
Seq("parquet", "orc", "text", "json").foreach { format =>
612+
val inputData = MemoryStream[String]
613+
val df = inputData.toDF()
614+
615+
withTempDir { outputDir =>
616+
withTempDir { checkpointDir =>
617+
var query: StreamingQuery = null
618+
try {
619+
// repartition to more than the input to leave empty partitions
620+
query =
621+
df.repartition(10)
622+
.writeStream
623+
.option("checkpointLocation", checkpointDir.getCanonicalPath)
624+
.format(format)
625+
.start(outputDir.getCanonicalPath)
626+
627+
inputData.addData("1", "2", "3")
628+
inputData.addData("4", "5")
629+
630+
failAfter(streamingTimeout) {
631+
query.processAllAvailable()
632+
}
633+
} finally {
634+
if (query != null) {
635+
query.stop()
636+
}
637+
}
638+
639+
val fs = new Path(outputDir.getCanonicalPath).getFileSystem(
640+
spark.sessionState.newHadoopConf())
641+
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
642+
outputDir.getCanonicalPath)
643+
644+
val allFiles = sinkLog.allFiles()
645+
// only files from non-empty partition should be logged
646+
assert(allFiles.length < 10)
647+
assert(allFiles.forall(file => fs.exists(new Path(file.path))))
648+
649+
// the query should be able to read all rows correctly with metadata log
650+
val outputDf = spark.read.format(format).load(outputDir.getCanonicalPath)
651+
.selectExpr("CAST(value AS INT)").as[Int]
652+
checkDatasetUnorderly(outputDf, 1, 2, 3, 4, 5)
653+
}
654+
}
655+
}
656+
}
657+
608658
override def checkQueryExecution(df: DataFrame): Unit = {
609659
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
610660
// been inferred

0 commit comments

Comments
 (0)