Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ object SparkException {
messageParameters: java.util.Map[String, String]): Map[String, String] = {
messageParameters.asScala.toMap
}

def mustOverrideOneMethodError(methodName: String): RuntimeException = {
val msg = s"You must override one `$methodName`. It's preferred to not override the " +
"deprecated one."
new SparkRuntimeException(
"INTERNAL_ERROR",
Map("message" -> msg))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.internal.io
import org.apache.hadoop.fs._
import org.apache.hadoop.mapreduce._

import org.apache.spark.SparkException
import org.apache.spark.annotation.Unstable
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -96,7 +97,9 @@ abstract class FileCommitProtocol extends Logging {
* guarantees that files written by different tasks will not conflict.
*/
@deprecated("use newTaskTempFile(..., spec: FileNameSpec) instead", "3.3.0")
def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String
def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
throw SparkException.mustOverrideOneMethodError("newTaskTempFile")
Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words, I don't think this is required to achieve your goal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, it is not clear why these changes are being made.
I got confused with the intent as well - though now I realize what is being attempted.

}

/**
* Notifies the commit protocol to add a new file, and gets back the full path that should be
Expand Down Expand Up @@ -135,7 +138,9 @@ abstract class FileCommitProtocol extends Logging {
*/
@deprecated("use newTaskTempFileAbsPath(..., spec: FileNameSpec) instead", "3.3.0")
def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
throw SparkException.mustOverrideOneMethodError("newTaskTempFileAbsPath")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

}

/**
* Similar to newTaskTempFile(), but allows files to committed to an absolute output location.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,6 @@ class HadoopMapReduceCommitProtocol(
format.getOutputCommitter(context)
}

override def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
newTaskTempFile(taskContext, dir, FileNameSpec("", ext))
}

override def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = {
val filename = getFilename(taskContext, spec)
Expand All @@ -145,11 +140,6 @@ class HadoopMapReduceCommitProtocol(
}
}

override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
newTaskTempFileAbsPath(taskContext, absoluteDir, FileNameSpec("", ext))
}

override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = {
val filename = getFilename(taskContext, spec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.Locale

import org.apache.hadoop.fs.Path

import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkRuntimeException, SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, FunctionIdentifier, InternalRow, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, FunctionAlreadyExistsException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, Star, TableAlreadyExistsException, UnresolvedRegex}
Expand Down Expand Up @@ -4203,9 +4203,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
def mustOverrideOneMethodError(methodName: String): RuntimeException = {
val msg = s"You must override one `$methodName`. It's preferred to not override the " +
"deprecated one."
new SparkRuntimeException(
"INTERNAL_ERROR",
Map("message" -> msg))
SparkException.mustOverrideOneMethodError(msg)
Copy link
Contributor

@mridulm mridulm Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be SparkException.mustOverrideOneMethodError(methodName) ? Or better still remove mustOverrideOneMethodError and delegate to SparkException.mustOverrideOneMethodError insead ?

}

def cannotAssignEventTimeColumn(): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class SingleDirectoryDataWriter(
val currentPath = committer.newTaskTempFile(
taskAttemptContext,
None,
f"-c$fileCounter%03d" + ext)
FileNameSpec("", f"-c$fileCounter%03d" + ext))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a new independent change, too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling the newer newTaskTempFile with spec is the goal of this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR title is not~

Remove inner references of deprecated APIs in FileCommitProtocol

If this is this PR goal, please remove the throwing Exceptions in the deprecated APIs.

Calling the newer newTaskTempFile with spec is the goal of this PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun, I got your point. I've changed the title to Implement and call new APIs in FileCommitProtocol instead of deprecated. Does the change here look reasonable to you with this positive tone? If still not, I will separate the overrides and callers into 2 PRs

Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I trust your decision. Initially, I want to remove the default implementation, throw SparkException, from this PR. But, this could be a migration step too. So, I agree with your decision.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @dongjoon-hyun


currentWriter = description.outputWriterFactory.newInstance(
path = currentPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}

import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{BATCH_ID, PATH}
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.errors.QueryExecutionErrors

Expand Down Expand Up @@ -114,13 +114,13 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
}

override def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = {
// The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
// Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
// the file name is fine and won't overflow.
val split = taskContext.getTaskAttemptID.getTaskID.getId
val uuid = UUID.randomUUID.toString
val filename = f"part-$split%05d-$uuid$ext"
val filename = f"part-$split%05d-$uuid${spec.suffix}"

val file = dir.map { d =>
new Path(new Path(path, d), filename).toString
Expand All @@ -133,7 +133,7 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
}

override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = {
throw QueryExecutionErrors.addFilesWithAbsolutePathUnsupportedError(this.toString)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}

import org.apache.spark.TestUtils
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileNameSpec
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
Expand All @@ -42,7 +43,7 @@ private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String
with Serializable with Logging {

override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = {
throw new Exception("there should be no custom partition path")
}
}
Expand Down