Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ object FileFormatWriter extends Logging {
statsTrackers = statsTrackers
)

SQLExecution.checkSQLExecutionId(sparkSession)

// propagate the description UUID into the jobs, so that committers
// get an ID guaranteed to be unique.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid)

// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
committer.setupJob(job)

// We should first sort by partition columns, then bucket id, and finally sorting columns.
val requiredOrdering =
partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
Expand All @@ -208,16 +218,6 @@ object FileFormatWriter extends Logging {
}
}

SQLExecution.checkSQLExecutionId(sparkSession)

// propagate the description UUID into the jobs, so that committers
// get an ID guaranteed to be unique.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid)

// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
committer.setupJob(job)

try {
val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
(materializedPlan.execute(), None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.schema.PrimitiveType
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkContext, TestUtils}
import org.apache.spark.{SparkContext, SparkException, TestUtils}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
Expand Down Expand Up @@ -1275,4 +1275,26 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
}
}
}

test("SPARK-43327: location exists when insertoverwrite fails") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue did not occur in the latest master branch, only in Spark3.2 and Spark3.3

withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t", "t1") {
sql("CREATE TABLE t(c1 int) USING parquet")
sql("CREATE TABLE t1(c2 long) USING parquet")
sql("INSERT OVERWRITE TABLE t1 SELECT 6000044164")

val identifier = TableIdentifier("t")
val location = spark.sessionState.catalog.getTableMetadata(identifier).location

intercept[SparkException] {
sql("INSERT OVERWRITE TABLE t SELECT c2 FROM " +
"(SELECT cast(c2 as int) as c2 FROM t1 distribute by c2)")
}
// scalastyle:off hadoopconfiguration
val fs = FileSystem.get(location, spark.sparkContext.hadoopConfiguration)
// scalastyle:on hadoopconfiguration
assert(fs.exists(new Path(location)))
}
}
}
}