From f7fa5606f394a31ba85bbfd658e12502d18d27eb Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 1 Aug 2025 14:37:27 +0800 Subject: [PATCH 1/2] [SPARK-53063][CORE] Remove inner references of deprecated APIs in FileCommitProtocol --- .../main/scala/org/apache/spark/SparkException.scala | 8 ++++++++ .../apache/spark/internal/io/FileCommitProtocol.scala | 9 +++++++-- .../internal/io/HadoopMapReduceCommitProtocol.scala | 10 ---------- .../spark/sql/errors/QueryCompilationErrors.scala | 6 ++---- .../execution/datasources/FileFormatDataWriter.scala | 2 +- .../streaming/runtime/ManifestFileCommitProtocol.scala | 8 ++++---- .../spark/sql/sources/PartitionedWriteSuite.scala | 3 ++- 7 files changed, 24 insertions(+), 22 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala b/common/utils/src/main/scala/org/apache/spark/SparkException.scala index 00989fd29095..7e217115ca11 100644 --- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala +++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala @@ -131,6 +131,14 @@ object SparkException { messageParameters: java.util.Map[String, String]): Map[String, String] = { messageParameters.asScala.toMap } + + def mustOverrideOneMethodError(methodName: String): RuntimeException = { + val msg = s"You must override one `$methodName`. It's preferred to not override the " + + "deprecated one." + new SparkRuntimeException( + "INTERNAL_ERROR", + Map("message" -> msg)) + } } /** diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index e2a96267082b..651895bf1f7a 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -20,6 +20,7 @@ package org.apache.spark.internal.io import org.apache.hadoop.fs._ import org.apache.hadoop.mapreduce._ +import org.apache.spark.SparkException import org.apache.spark.annotation.Unstable import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -96,7 +97,9 @@ abstract class FileCommitProtocol extends Logging { * guarantees that files written by different tasks will not conflict. */ @deprecated("use newTaskTempFile(..., spec: FileNameSpec) instead", "3.3.0") - def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String + def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { + throw SparkException.mustOverrideOneMethodError("newTaskTempFile") + } /** * Notifies the commit protocol to add a new file, and gets back the full path that should be @@ -135,7 +138,9 @@ abstract class FileCommitProtocol extends Logging { */ @deprecated("use newTaskTempFileAbsPath(..., spec: FileNameSpec) instead", "3.3.0") def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String + taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { + throw SparkException.mustOverrideOneMethodError("newTaskTempFileAbsPath") + } /** * Similar to newTaskTempFile(), but allows files to committed to an absolute output location. diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 476cddc64395..79218dffff9e 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -117,11 +117,6 @@ class HadoopMapReduceCommitProtocol( format.getOutputCommitter(context) } - override def newTaskTempFile( - taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { - newTaskTempFile(taskContext, dir, FileNameSpec("", ext)) - } - override def newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = { val filename = getFilename(taskContext, spec) @@ -145,11 +140,6 @@ class HadoopMapReduceCommitProtocol( } } - override def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { - newTaskTempFileAbsPath(taskContext, absoluteDir, FileNameSpec("", ext)) - } - override def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { val filename = getFilename(taskContext, spec) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 14f279ad5ad7..b296036c8fc0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -21,7 +21,7 @@ import java.util.Locale import org.apache.hadoop.fs.Path -import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkRuntimeException, SparkThrowable, SparkUnsupportedOperationException} +import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable, SparkUnsupportedOperationException} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, FunctionIdentifier, InternalRow, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, FunctionAlreadyExistsException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, Star, TableAlreadyExistsException, UnresolvedRegex} @@ -4203,9 +4203,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat def mustOverrideOneMethodError(methodName: String): RuntimeException = { val msg = s"You must override one `$methodName`. It's preferred to not override the " + "deprecated one." - new SparkRuntimeException( - "INTERNAL_ERROR", - Map("message" -> msg)) + SparkException.mustOverrideOneMethodError(msg) } def cannotAssignEventTimeColumn(): Throwable = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index 7d071124b0b3..374b29e4c1a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -172,7 +172,7 @@ class SingleDirectoryDataWriter( val currentPath = committer.newTaskTempFile( taskAttemptContext, None, - f"-c$fileCounter%03d" + ext) + FileNameSpec("", f"-c$fileCounter%03d" + ext)) currentWriter = description.outputWriterFactory.newInstance( path = currentPath, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ManifestFileCommitProtocol.scala index b382642eb6bf..6574dfd9b5bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ManifestFileCommitProtocol.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.{BATCH_ID, PATH} -import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.errors.QueryExecutionErrors @@ -114,13 +114,13 @@ class ManifestFileCommitProtocol(jobId: String, path: String) } override def newTaskTempFile( - taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { + taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = { // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, // the file name is fine and won't overflow. val split = taskContext.getTaskAttemptID.getTaskID.getId val uuid = UUID.randomUUID.toString - val filename = f"part-$split%05d-$uuid$ext" + val filename = f"part-$split%05d-$uuid${spec.suffix}" val file = dir.map { d => new Path(new Path(path, d), filename).toString @@ -133,7 +133,7 @@ class ManifestFileCommitProtocol(jobId: String, path: String) } override def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { + taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { throw QueryExecutionErrors.addFilesWithAbsolutePathUnsupportedError(this.toString) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 55bee7d4713d..ffced8c8fcef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.spark.TestUtils import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.FileNameSpec import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -42,7 +43,7 @@ private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String with Serializable with Logging { override def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { + taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { throw new Exception("there should be no custom partition path") } } From fa54cdc1043d879b9824accc0578a5ac29ccc614 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 4 Aug 2025 13:45:39 +0800 Subject: [PATCH 2/2] Apply suggestion from @yaooqinn --- .../org/apache/spark/sql/sources/PartitionedWriteSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index ffced8c8fcef..e742fd68e915 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -43,7 +43,7 @@ private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String with Serializable with Logging { override def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { + taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { throw new Exception("there should be no custom partition path") } }