Skip to content

Commit c9b85a2

Browse files
committed
[SPARK-53063][CORE] Implement and call new APIs in FileCommitProtocol instead of the deprecated
### What changes were proposed in this pull request? This PR implements and calls new APIs in FileCommitProtocol instead of the deprecated ### Why are the changes needed? FileCommitProtocol and related classes are complicated as they play a lot of tricks for tasks like file naming, config setting/propagation, e.t.c. Removing these references can improve the call stack a bit. And also, we can make these deprecated ones ignorable。 ### Does this PR introduce _any_ user-facing change? No, nothing changes for existing implementations or end-users ### How was this patch tested? Pass existing CIs ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#51772 from yaooqinn/SPARK-53063. Authored-by: Kent Yao <[email protected]> Signed-off-by: Kent Yao <[email protected]>
1 parent e19ef49 commit c9b85a2

File tree

7 files changed

+24
-22
lines changed

7 files changed

+24
-22
lines changed

common/utils/src/main/scala/org/apache/spark/SparkException.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,14 @@ object SparkException {
131131
messageParameters: java.util.Map[String, String]): Map[String, String] = {
132132
messageParameters.asScala.toMap
133133
}
134+
135+
def mustOverrideOneMethodError(methodName: String): RuntimeException = {
136+
val msg = s"You must override one `$methodName`. It's preferred to not override the " +
137+
"deprecated one."
138+
new SparkRuntimeException(
139+
"INTERNAL_ERROR",
140+
Map("message" -> msg))
141+
}
134142
}
135143

136144
/**

core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.internal.io
2020
import org.apache.hadoop.fs._
2121
import org.apache.hadoop.mapreduce._
2222

23+
import org.apache.spark.SparkException
2324
import org.apache.spark.annotation.Unstable
2425
import org.apache.spark.internal.Logging
2526
import org.apache.spark.util.Utils
@@ -96,7 +97,9 @@ abstract class FileCommitProtocol extends Logging {
9697
* guarantees that files written by different tasks will not conflict.
9798
*/
9899
@deprecated("use newTaskTempFile(..., spec: FileNameSpec) instead", "3.3.0")
99-
def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String
100+
def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
101+
throw SparkException.mustOverrideOneMethodError("newTaskTempFile")
102+
}
100103

101104
/**
102105
* Notifies the commit protocol to add a new file, and gets back the full path that should be
@@ -135,7 +138,9 @@ abstract class FileCommitProtocol extends Logging {
135138
*/
136139
@deprecated("use newTaskTempFileAbsPath(..., spec: FileNameSpec) instead", "3.3.0")
137140
def newTaskTempFileAbsPath(
138-
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String
141+
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
142+
throw SparkException.mustOverrideOneMethodError("newTaskTempFileAbsPath")
143+
}
139144

140145
/**
141146
* Similar to newTaskTempFile(), but allows files to committed to an absolute output location.

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,6 @@ class HadoopMapReduceCommitProtocol(
117117
format.getOutputCommitter(context)
118118
}
119119

120-
override def newTaskTempFile(
121-
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
122-
newTaskTempFile(taskContext, dir, FileNameSpec("", ext))
123-
}
124-
125120
override def newTaskTempFile(
126121
taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = {
127122
val filename = getFilename(taskContext, spec)
@@ -145,11 +140,6 @@ class HadoopMapReduceCommitProtocol(
145140
}
146141
}
147142

148-
override def newTaskTempFileAbsPath(
149-
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
150-
newTaskTempFileAbsPath(taskContext, absoluteDir, FileNameSpec("", ext))
151-
}
152-
153143
override def newTaskTempFileAbsPath(
154144
taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = {
155145
val filename = getFilename(taskContext, spec)

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.util.Locale
2121

2222
import org.apache.hadoop.fs.Path
2323

24-
import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkRuntimeException, SparkThrowable, SparkUnsupportedOperationException}
24+
import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable, SparkUnsupportedOperationException}
2525
import org.apache.spark.sql.AnalysisException
2626
import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, FunctionIdentifier, InternalRow, QualifiedTableName, TableIdentifier}
2727
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, FunctionAlreadyExistsException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, Star, TableAlreadyExistsException, UnresolvedRegex}
@@ -4203,9 +4203,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
42034203
def mustOverrideOneMethodError(methodName: String): RuntimeException = {
42044204
val msg = s"You must override one `$methodName`. It's preferred to not override the " +
42054205
"deprecated one."
4206-
new SparkRuntimeException(
4207-
"INTERNAL_ERROR",
4208-
Map("message" -> msg))
4206+
SparkException.mustOverrideOneMethodError(msg)
42094207
}
42104208

42114209
def cannotAssignEventTimeColumn(): Throwable = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class SingleDirectoryDataWriter(
172172
val currentPath = committer.newTaskTempFile(
173173
taskAttemptContext,
174174
None,
175-
f"-c$fileCounter%03d" + ext)
175+
FileNameSpec("", f"-c$fileCounter%03d" + ext))
176176

177177
currentWriter = description.outputWriterFactory.newInstance(
178178
path = currentPath,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ManifestFileCommitProtocol.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
2727

2828
import org.apache.spark.internal.{Logging, MDC}
2929
import org.apache.spark.internal.LogKeys.{BATCH_ID, PATH}
30-
import org.apache.spark.internal.io.FileCommitProtocol
30+
import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec}
3131
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
3232
import org.apache.spark.sql.errors.QueryExecutionErrors
3333

@@ -114,13 +114,13 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
114114
}
115115

116116
override def newTaskTempFile(
117-
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
117+
taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = {
118118
// The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
119119
// Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
120120
// the file name is fine and won't overflow.
121121
val split = taskContext.getTaskAttemptID.getTaskID.getId
122122
val uuid = UUID.randomUUID.toString
123-
val filename = f"part-$split%05d-$uuid$ext"
123+
val filename = f"part-$split%05d-$uuid${spec.suffix}"
124124

125125
val file = dir.map { d =>
126126
new Path(new Path(path, d), filename).toString
@@ -133,7 +133,7 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
133133
}
134134

135135
override def newTaskTempFileAbsPath(
136-
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
136+
taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = {
137137
throw QueryExecutionErrors.addFilesWithAbsolutePathUnsupportedError(this.toString)
138138
}
139139

sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
2525

2626
import org.apache.spark.TestUtils
2727
import org.apache.spark.internal.Logging
28+
import org.apache.spark.internal.io.FileNameSpec
2829
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
2930
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
3031
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -42,7 +43,7 @@ private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String
4243
with Serializable with Logging {
4344

4445
override def newTaskTempFileAbsPath(
45-
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
46+
taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = {
4647
throw new Exception("there should be no custom partition path")
4748
}
4849
}

0 commit comments

Comments
 (0)