Skip to content

Commit ee12c81

Browse files
ulysses-youcloud-fan
authored andcommitted
[SPARK-43281][SQL] Fix concurrent writer does not update file metrics
### What changes were proposed in this pull request? `DynamicPartitionDataConcurrentWriter` it uses temp file path to get file status after commit task. However, the temp file has already moved to new path during commit task. This pr calls `closeFile` before commit task. ### Why are the changes needed? fix bug ### Does this PR introduce _any_ user-facing change? yes, after this pr the metrics is correct ### How was this patch tested? add test Closes #40952 from ulysses-you/SPARK-43281. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 592e922) Signed-off-by: Wenchen Fan <[email protected]>
1 parent cacaed8 commit ee12c81

File tree

4 files changed

+68
-5
lines changed

4 files changed

+68
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,6 @@ class BasicWriteTaskStatsTracker(
159159
}
160160

161161
override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
162-
submittedFiles.foreach(updateFileStats)
163-
submittedFiles.clear()
164-
165162
// Reports bytesWritten and recordsWritten to the Spark output metrics.
166163
Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { outputMetrics =>
167164
outputMetrics.setBytesWritten(numBytes)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ class DynamicPartitionDataConcurrentWriter(
427427
if (status.outputWriter != null) {
428428
try {
429429
status.outputWriter.close()
430+
statsTrackers.foreach(_.closeFile(status.outputWriter.path()))
430431
} finally {
431432
status.outputWriter = null
432433
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,22 +85,23 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite {
8585
val missing = new Path(tempDirPath, "missing")
8686
val tracker = new BasicWriteTaskStatsTracker(conf)
8787
tracker.newFile(missing.toString)
88+
tracker.closeFile(missing.toString)
8889
assertStats(tracker, 0, 0)
8990
}
9091

9192
test("Empty filename is forwarded") {
9293
val tracker = new BasicWriteTaskStatsTracker(conf)
9394
tracker.newFile("")
9495
intercept[IllegalArgumentException] {
95-
finalStatus(tracker)
96+
tracker.closeFile("")
9697
}
9798
}
9899

99100
test("Null filename is only picked up in final status") {
100101
val tracker = new BasicWriteTaskStatsTracker(conf)
101102
tracker.newFile(null)
102103
intercept[IllegalArgumentException] {
103-
finalStatus(tracker)
104+
tracker.closeFile(null)
104105
}
105106
}
106107

@@ -109,6 +110,7 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite {
109110
val tracker = new BasicWriteTaskStatsTracker(conf)
110111
tracker.newFile(file.toString)
111112
touch(file)
113+
tracker.closeFile(file.toString)
112114
assertStats(tracker, 1, 0)
113115
}
114116

@@ -117,6 +119,7 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite {
117119
val tracker = new BasicWriteTaskStatsTracker(conf)
118120
tracker.newFile(file.toString)
119121
write1(file)
122+
tracker.closeFile(file.toString)
120123
assertStats(tracker, 1, len1)
121124
}
122125

@@ -125,6 +128,7 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite {
125128
val tracker = new BasicWriteTaskStatsTracker(conf)
126129
tracker.newFile(file.toString)
127130
val stream = localfs.create(file, true)
131+
tracker.closeFile(file.toString)
128132
try {
129133
assertStats(tracker, 1, 0)
130134
stream.write(data1)
@@ -141,8 +145,10 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite {
141145
val tracker = new BasicWriteTaskStatsTracker(conf)
142146
tracker.newFile(file1.toString)
143147
write1(file1)
148+
tracker.closeFile(file1.toString)
144149
tracker.newFile(file2.toString)
145150
write2(file2)
151+
tracker.closeFile(file2.toString)
146152
assertStats(tracker, 2, len1 + len2)
147153
}
148154

@@ -153,10 +159,13 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite {
153159
val tracker = new BasicWriteTaskStatsTracker(conf)
154160
tracker.newFile(file1.toString)
155161
write1(file1)
162+
tracker.closeFile(file1.toString)
156163
tracker.newFile(file2.toString)
157164
write2(file2)
165+
tracker.closeFile(file2.toString)
158166
tracker.newFile(file3.toString)
159167
touch(file3)
168+
tracker.closeFile(file3.toString)
160169
assertStats(tracker, 3, len1 + len2)
161170
}
162171

@@ -168,13 +177,16 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite {
168177
// file 1
169178
tracker.newFile(file1.toString)
170179
write1(file1)
180+
tracker.closeFile(file1.toString)
171181

172182
// file 2 is noted, but not created
173183
tracker.newFile(file2.toString)
184+
tracker.closeFile(file2.toString)
174185

175186
// file 3 is noted & then created
176187
tracker.newFile(file3.toString)
177188
write2(file3)
189+
tracker.closeFile(file3.toString)
178190

179191
// the expected size is file1 + file3; only two files are reported
180192
// as found

sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.apache.spark.sql._
4040
import org.apache.spark.sql.catalyst.TableIdentifier
4141
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression}
4242
import org.apache.spark.sql.execution.QueryExecution
43+
import org.apache.spark.sql.execution.command.DataWritingCommandExec
4344
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, HadoopFsRelation, LogicalRelation}
4445
import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
4546
import org.apache.spark.sql.internal.SQLConf
@@ -1291,4 +1292,56 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
12911292
}
12921293
}
12931294
}
1295+
1296+
test("SPARK-43281: Fix concurrent writer does not update file metrics") {
1297+
withTable("t") {
1298+
withSQLConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS.key -> "3",
1299+
SQLConf.LEAF_NODE_DEFAULT_PARALLELISM.key -> "1") {
1300+
spark.sql("CREATE TABLE t(c int) USING parquet PARTITIONED BY (p String)")
1301+
var dataWriting: DataWritingCommandExec = null
1302+
val listener = new QueryExecutionListener {
1303+
override def onFailure(f: String, qe: QueryExecution, e: Exception): Unit = {}
1304+
override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
1305+
qe.executedPlan match {
1306+
case dataWritingCommandExec: DataWritingCommandExec =>
1307+
dataWriting = dataWritingCommandExec
1308+
case _ =>
1309+
}
1310+
}
1311+
}
1312+
spark.listenerManager.register(listener)
1313+
1314+
def checkMetrics(sqlStr: String, numFiles: Int, numOutputRows: Long): Unit = {
1315+
sql(sqlStr)
1316+
sparkContext.listenerBus.waitUntilEmpty()
1317+
assert(dataWriting != null)
1318+
val metrics = dataWriting.cmd.metrics
1319+
assert(metrics.contains("numFiles"))
1320+
assert(metrics("numFiles").value == numFiles)
1321+
assert(metrics.contains("numOutputBytes"))
1322+
assert(metrics("numOutputBytes").value > 0)
1323+
assert(metrics.contains("numOutputRows"))
1324+
assert(metrics("numOutputRows").value == numOutputRows)
1325+
}
1326+
1327+
try {
1328+
// without fallback
1329+
checkMetrics(
1330+
"INSERT INTO TABLE t PARTITION(p) SELECT * FROM VALUES(1, 'a'),(2, 'a'),(1, 'b')",
1331+
numFiles = 2,
1332+
numOutputRows = 3)
1333+
1334+
// with fallback
1335+
checkMetrics(
1336+
"""
1337+
|INSERT INTO TABLE t PARTITION(p)
1338+
|SELECT * FROM VALUES(1, 'a'),(2, 'b'),(1, 'c'),(2, 'd')""".stripMargin,
1339+
numFiles = 4,
1340+
numOutputRows = 4)
1341+
} finally {
1342+
spark.listenerManager.unregister(listener)
1343+
}
1344+
}
1345+
}
1346+
}
12941347
}

0 commit comments

Comments
 (0)