Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,26 +206,41 @@ object FileCommitProtocol extends Logging {
className: String,
jobId: String,
outputPath: String,
dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = {
dynamicPartitionOverwrite: Boolean = false,
forceUseStagingDir: Boolean = false): FileCommitProtocol = {

logDebug(s"Creating committer $className; job $jobId; output=$outputPath;" +
s" dynamic=$dynamicPartitionOverwrite")
s" dynamic=$dynamicPartitionOverwrite;" +
s" forceUseStagingDir=$forceUseStagingDir")
val clazz = Utils.classForName[FileCommitProtocol](className)
// First try the constructor with arguments (jobId: String, outputPath: String,
// dynamicPartitionOverwrite: Boolean, forceUseStagingDir: Boolean).
// Second try the constructor with arguments (jobId: String, outputPath: String,
// dynamicPartitionOverwrite: Boolean).
// If that doesn't exist, try the one with (jobId: string, outputPath: String).
try {
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean])
logDebug("Using (String, String, Boolean) constructor")
ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean])
val ctor = clazz.getDeclaredConstructor(
classOf[String], classOf[String], classOf[Boolean], classOf[Boolean])
logDebug("Using (String, String, Boolean, Boolean) constructor")
ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean],
forceUseStagingDir.asInstanceOf[java.lang.Boolean])
} catch {
case _: NoSuchMethodException =>
logDebug("Falling back to (String, String) constructor")
require(!dynamicPartitionOverwrite,
"Dynamic Partition Overwrite is enabled but" +
s" the committer ${className} does not have the appropriate constructor")
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
ctor.newInstance(jobId, outputPath)
try {
val ctor = clazz.getDeclaredConstructor(
classOf[String], classOf[String], classOf[Boolean])
logDebug("Using (String, String, Boolean) constructor")
ctor.newInstance(jobId, outputPath,
dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean])
} catch {
case _: NoSuchMethodException =>
logDebug("Falling back to (String, String) constructor")
require(!dynamicPartitionOverwrite,
"Dynamic Partition Overwrite is enabled but" +
s" the committer ${className} does not have the appropriate constructor")
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
ctor.newInstance(jobId, outputPath)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
class HadoopMapReduceCommitProtocol(
jobId: String,
path: String,
dynamicPartitionOverwrite: Boolean = false)
dynamicPartitionOverwrite: Boolean = false,
forceUseStagingDir: Boolean = false)
extends FileCommitProtocol with Serializable with Logging {

import FileCommitProtocol._
Expand Down Expand Up @@ -125,22 +126,27 @@ class HadoopMapReduceCommitProtocol(
taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = {
val filename = getFilename(taskContext, spec)

val stagingDir: Path = committer match {
// For FileOutputCommitter it has its own staging path called "work path".
case f: FileOutputCommitter =>
if (dynamicPartitionOverwrite) {
assert(dir.isDefined,
"The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
partitionPaths += dir.get
}
new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
case _ => new Path(path)
}
if (forceUseStagingDir && !dynamicPartitionOverwrite) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when spark.sql.hive.convertMetastoreParquet or spark.sql.hive.convertMetastoreOrc is false, mean use hive serde. We also call newTaskTempFileAbsPath. Here will trigger rename. I suspect this is a conflict with hive serde logic.

val absoluteDir = dir.map(new Path(path, _).toString).getOrElse(path)
this.newTaskTempFileAbsPath(taskContext, absoluteDir, spec)
} else {
val stagingDir: Path = committer match {
// For FileOutputCommitter it has its own staging path called "work path".
case f: FileOutputCommitter =>
if (dynamicPartitionOverwrite) {
assert(dir.isDefined, "The dataset to be written must be partitioned" +
" when dynamicPartitionOverwrite is true.")
partitionPaths += dir.get
}
new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
case _ => new Path(path)
}

dir.map { d =>
new Path(new Path(stagingDir, d), filename).toString
}.getOrElse {
new Path(stagingDir, filename).toString
dir.map { d =>
new Path(new Path(stagingDir, d), filename).toString
}.getOrElse {
new Path(stagingDir, filename).toString
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,13 @@ object SQLConf {
.createWithDefault(
"org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")

val FORCE_USE_STAGING_DIR =
buildConf("spark.sql.sources.forceUseStagingDir")
.version("3.3.0")
.internal()
.booleanConf
.createWithDefault(false)

val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold")
.doc("The maximum number of paths allowed for listing files at driver side. If the number " +
Expand Down Expand Up @@ -4334,6 +4341,8 @@ class SQLConf extends Serializable with Logging {

def fileCommitProtocolClass: String = getConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS)

def forceUseStagingDir: Boolean = getConf(SQLConf.FORCE_USE_STAGING_DIR)

def parallelPartitionDiscoveryThreshold: Int =
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,14 @@ case class InsertIntoHadoopFsRelationCommand(
fs, catalogTable.get, qualifiedOutputPath, matchingPartitions)
}

val forceUseStagingDir = sparkSession.sessionState.conf.forceUseStagingDir
val jobId = java.util.UUID.randomUUID().toString
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = jobId,
outputPath = outputPath.toString,
dynamicPartitionOverwrite = dynamicPartitionOverwrite)
dynamicPartitionOverwrite = dynamicPartitionOverwrite,
forceUseStagingDir = forceUseStagingDir)

val doInsertion = if (mode == SaveMode.Append) {
true
Expand Down Expand Up @@ -167,7 +169,7 @@ case class InsertIntoHadoopFsRelationCommand(

// For dynamic partition overwrite, FileOutputCommitter's output path is staging path, files
// will be renamed from staging path to final output path during commit job
val committerOutputPath = if (dynamicPartitionOverwrite) {
val committerOutputPath = if (forceUseStagingDir || dynamicPartitionOverwrite) {
FileCommitProtocol.getStagingDir(outputPath.toString, jobId)
.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import org.apache.spark.sql.internal.SQLConf
class SQLHadoopMapReduceCommitProtocol(
jobId: String,
path: String,
dynamicPartitionOverwrite: Boolean = false)
extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite)
dynamicPartitionOverwrite: Boolean = false,
forceUseStagingDir: Boolean = false)
extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite, forceUseStagingDir)
with Serializable with Logging {

override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
Expand All @@ -55,7 +56,11 @@ class SQLHadoopMapReduceCommitProtocol(
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
val committerOutputPath = if (dynamicPartitionOverwrite) stagingDir else new Path(path)
val committerOutputPath = if (forceUseStagingDir || dynamicPartitionOverwrite) {
stagingDir
} else {
new Path(path)
}
committer = ctor.newInstance(committerOutputPath, context)
} else {
// The specified output committer is just an OutputCommitter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package org.apache.spark.sql.sources
import java.io.{File, IOException}
import java.sql.Date
import java.time.{Duration, Period}
import java.util.concurrent.Callable

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataOutputStream, Path, RawLocalFileSystem}

Expand All @@ -34,7 +37,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

class SimpleInsertSource extends SchemaRelationProvider {
override def createRelation(
Expand Down Expand Up @@ -2148,6 +2151,77 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
}
}
}

test("SPARK-37210: Concurrent write partition table") {
withSQLConf(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM.key -> "1",
SQLConf.FORCE_USE_STAGING_DIR.key -> "true",
SQLConf.MAX_RECORDS_PER_FILE.key -> "1000") {

def concurrentWrite(insertStatement: Int => String,
expectedAnswer: Seq[Row],
concurrent: Int = 5): Unit = {
withTable("t") {
sql("CREATE TABLE t (c1 int, c2 int) USING PARQUET PARTITIONED BY (c3 int, c4 int)")
val tasks: Seq[Callable[Unit]] = (1 to concurrent).map(i => {
new Callable[Unit] {
override def call(): Unit = {
sql(insertStatement(i))
}
}
})
ThreadUtils.newForkJoinPool("test-thread-pool", concurrent).invokeAll(tasks.toList.asJava)
checkAnswer(
sql("SELECT count(*), c3, c4 FROM t GROUP BY c3, c4 ORDER BY c3, c4"),
expectedAnswer)
}
}

// source temp view
(1 to 10000).map(i => (i, i)).toDF("_1", "_2").createOrReplaceTempView("s")

// Concurrent overwrite to different dynamic partitions
concurrentWrite(
(i: Int) => s"INSERT OVERWRITE TABLE t PARTITION(c3=3$i, c4)" +
s" SELECT _1 AS c1, _2 AS c2, 4$i as c4 FROM s LIMIT ${i * 2000}",
(1 to 5).map(i => Row(i * 2000, s"3$i".toInt, s"4$i".toInt))
)

// concurrent append to different dynamic partitions
concurrentWrite(
(i: Int) => s"INSERT INTO TABLE t PARTITION(c3=3$i, c4)" +
s" SELECT _1 AS c1, _2 AS c2, 4$i as c4 FROM s LIMIT ${i * 2000}",
(1 to 5).map(i => Row(i * 2000, s"3$i".toInt, s"4$i".toInt))
)

// concurrent append to same dynamic partition
concurrentWrite(
(i: Int) => s"INSERT INTO TABLE t PARTITION(c3=3, c4)" +
s" SELECT _1 AS c1, _2 AS c2, 4 as c4 FROM s LIMIT ${i * 2000}",
Seq(Row(15 * 2000, 3, 4))
)

// concurrent overwrite to different static partitions
concurrentWrite(
(i: Int) => s"INSERT OVERWRITE TABLE t PARTITION(c3=3$i, c4=4$i)" +
s" SELECT _1 AS c1, _2 AS c2 FROM s LIMIT ${i * 2000}",
(1 to 5).map(i => Row(i * 2000, s"3$i".toInt, s"4$i".toInt))
)

// concurrent append to different static partitions
concurrentWrite(
(i: Int) => s"INSERT INTO TABLE t PARTITION(c3=3$i, c4=4$i)" +
s" SELECT _1 AS c1, _2 AS c2 FROM s LIMIT ${i * 2000}",
(1 to 5).map(i => Row(i * 2000, s"3$i".toInt, s"4$i".toInt))
)

// concurrent append to same static partition
concurrentWrite(
(i: Int) => s"INSERT INTO TABLE t PARTITION(c3=3, c4=4)" +
s" SELECT _1 AS c1, _2 AS c2 FROM s LIMIT ${i * 2000}",
Seq(Row(15 * 2000, 3, 4))
)
}
}
}

class FileExistingTestFileSystem extends RawLocalFileSystem {
Expand Down