From d4fc201602c97e9470b42059ba5796c2a0606e56 Mon Sep 17 00:00:00 2001 From: Wang Zhen Date: Fri, 29 Jul 2022 15:52:19 +0800 Subject: [PATCH 1/2] [SPARK-37210] Allow forced use of staging directory --- .../internal/io/FileCommitProtocol.scala | 12 ++- .../io/HadoopMapReduceCommitProtocol.scala | 38 ++++++---- .../apache/spark/sql/internal/SQLConf.scala | 9 +++ .../InsertIntoHadoopFsRelationCommand.scala | 6 +- .../SQLHadoopMapReduceCommitProtocol.scala | 11 ++- .../spark/sql/sources/InsertSuite.scala | 76 ++++++++++++++++++- 6 files changed, 126 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index e2a96267082b8..866f11c1badb7 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -206,18 +206,22 @@ object FileCommitProtocol extends Logging { className: String, jobId: String, outputPath: String, - dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = { + dynamicPartitionOverwrite: Boolean = false, + forceUseStagingDir: Boolean = false): FileCommitProtocol = { logDebug(s"Creating committer $className; job $jobId; output=$outputPath;" + - s" dynamic=$dynamicPartitionOverwrite") + s" dynamic=$dynamicPartitionOverwrite;" + + s" forceUseStagingDir=$forceUseStagingDir") val clazz = Utils.classForName[FileCommitProtocol](className) // First try the constructor with arguments (jobId: String, outputPath: String, // dynamicPartitionOverwrite: Boolean). // If that doesn't exist, try the one with (jobId: string, outputPath: String). try { - val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean]) + val ctor = clazz.getDeclaredConstructor( + classOf[String], classOf[String], classOf[Boolean], classOf[Boolean]) logDebug("Using (String, String, Boolean) constructor") - ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean]) + ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean], + forceUseStagingDir.asInstanceOf[java.lang.Boolean]) } catch { case _: NoSuchMethodException => logDebug("Falling back to (String, String) constructor") diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 3a24da98ecc24..955b693d10472 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -67,7 +67,8 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil class HadoopMapReduceCommitProtocol( jobId: String, path: String, - dynamicPartitionOverwrite: Boolean = false) + dynamicPartitionOverwrite: Boolean = false, + forceUseStagingDir: Boolean = false) extends FileCommitProtocol with Serializable with Logging { import FileCommitProtocol._ @@ -125,22 +126,27 @@ class HadoopMapReduceCommitProtocol( taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = { val filename = getFilename(taskContext, spec) - val stagingDir: Path = committer match { - // For FileOutputCommitter it has its own staging path called "work path". - case f: FileOutputCommitter => - if (dynamicPartitionOverwrite) { - assert(dir.isDefined, - "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") - partitionPaths += dir.get - } - new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) - case _ => new Path(path) - } + if (forceUseStagingDir && !dynamicPartitionOverwrite) { + val absoluteDir = dir.map(new Path(path, _).toString).getOrElse(path) + this.newTaskTempFileAbsPath(taskContext, absoluteDir, spec) + } else { + val stagingDir: Path = committer match { + // For FileOutputCommitter it has its own staging path called "work path". + case f: FileOutputCommitter => + if (dynamicPartitionOverwrite) { + assert(dir.isDefined, "The dataset to be written must be partitioned" + + " when dynamicPartitionOverwrite is true.") + partitionPaths += dir.get + } + new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) + case _ => new Path(path) + } - dir.map { d => - new Path(new Path(stagingDir, d), filename).toString - }.getOrElse { - new Path(stagingDir, filename).toString + dir.map { d => + new Path(new Path(stagingDir, d), filename).toString + }.getOrElse { + new Path(stagingDir, filename).toString + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f78fa8c9ef251..c35da1d9b53c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1455,6 +1455,13 @@ object SQLConf { .createWithDefault( "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol") + val FORCE_USE_STAGING_DIR = + buildConf("spark.sql.sources.forceUseStagingDir") + .version("3.3.0") + .internal() + .booleanConf + .createWithDefault(false) + val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold") .doc("The maximum number of paths allowed for listing files at driver side. If the number " + @@ -4334,6 +4341,8 @@ class SQLConf extends Serializable with Logging { def fileCommitProtocolClass: String = getConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS) + def forceUseStagingDir: Boolean = getConf(SQLConf.FORCE_USE_STAGING_DIR) + def parallelPartitionDiscoveryThreshold: Int = getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index e20d9ed8b537a..2e1035876f907 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -107,12 +107,14 @@ case class InsertIntoHadoopFsRelationCommand( fs, catalogTable.get, qualifiedOutputPath, matchingPartitions) } + val forceUseStagingDir = sparkSession.sessionState.conf.forceUseStagingDir val jobId = java.util.UUID.randomUUID().toString val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = jobId, outputPath = outputPath.toString, - dynamicPartitionOverwrite = dynamicPartitionOverwrite) + dynamicPartitionOverwrite = dynamicPartitionOverwrite, + forceUseStagingDir = forceUseStagingDir) val doInsertion = if (mode == SaveMode.Append) { true @@ -167,7 +169,7 @@ case class InsertIntoHadoopFsRelationCommand( // For dynamic partition overwrite, FileOutputCommitter's output path is staging path, files // will be renamed from staging path to final output path during commit job - val committerOutputPath = if (dynamicPartitionOverwrite) { + val committerOutputPath = if (forceUseStagingDir || dynamicPartitionOverwrite) { FileCommitProtocol.getStagingDir(outputPath.toString, jobId) .makeQualified(fs.getUri, fs.getWorkingDirectory) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala index 144be2316f091..3e9ef89260ba2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala @@ -32,8 +32,9 @@ import org.apache.spark.sql.internal.SQLConf class SQLHadoopMapReduceCommitProtocol( jobId: String, path: String, - dynamicPartitionOverwrite: Boolean = false) - extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) + dynamicPartitionOverwrite: Boolean = false, + forceUseStagingDir: Boolean = false) + extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite, forceUseStagingDir) with Serializable with Logging { override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { @@ -55,7 +56,11 @@ class SQLHadoopMapReduceCommitProtocol( // The specified output committer is a FileOutputCommitter. // So, we will use the FileOutputCommitter-specified constructor. val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) - val committerOutputPath = if (dynamicPartitionOverwrite) stagingDir else new Path(path) + val committerOutputPath = if (forceUseStagingDir || dynamicPartitionOverwrite) { + stagingDir + } else { + new Path(path) + } committer = ctor.newInstance(committerOutputPath, context) } else { // The specified output committer is just an OutputCommitter. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 7edd61828d95d..e0d28d60a39ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -20,6 +20,9 @@ package org.apache.spark.sql.sources import java.io.{File, IOException} import java.sql.Date import java.time.{Duration, Period} +import java.util.concurrent.Callable + +import scala.collection.JavaConverters._ import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataOutputStream, Path, RawLocalFileSystem} @@ -34,7 +37,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} class SimpleInsertSource extends SchemaRelationProvider { override def createRelation( @@ -2148,6 +2151,77 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } } } + + test("SPARK-37210: Concurrent write partition table") { + withSQLConf(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM.key -> "1", + SQLConf.FORCE_USE_STAGING_DIR.key -> "true", + SQLConf.MAX_RECORDS_PER_FILE.key -> "1000") { + + def concurrentWrite(insertStatement: Int => String, + expectedAnswer: Seq[Row], + concurrent: Int = 5): Unit = { + withTable("t") { + sql("CREATE TABLE t (c1 int, c2 int) USING PARQUET PARTITIONED BY (c3 int, c4 int)") + val tasks: Seq[Callable[Unit]] = (1 to concurrent).map(i => { + new Callable[Unit] { + override def call(): Unit = { + sql(insertStatement(i)) + } + } + }) + ThreadUtils.newForkJoinPool("test-thread-pool", concurrent).invokeAll(tasks.toList.asJava) + checkAnswer( + sql("SELECT count(*), c3, c4 FROM t GROUP BY c3, c4 ORDER BY c3, c4"), + expectedAnswer) + } + } + + // source temp view + (1 to 10000).map(i => (i, i)).toDF("_1", "_2").createOrReplaceTempView("s") + + // Concurrent overwrite to different dynamic partitions + concurrentWrite( + (i: Int) => s"INSERT OVERWRITE TABLE t PARTITION(c3=3$i, c4)" + + s" SELECT _1 AS c1, _2 AS c2, 4$i as c4 FROM s LIMIT ${i * 2000}", + (1 to 5).map(i => Row(i * 2000, s"3$i".toInt, s"4$i".toInt)) + ) + + // concurrent append to different dynamic partitions + concurrentWrite( + (i: Int) => s"INSERT INTO TABLE t PARTITION(c3=3$i, c4)" + + s" SELECT _1 AS c1, _2 AS c2, 4$i as c4 FROM s LIMIT ${i * 2000}", + (1 to 5).map(i => Row(i * 2000, s"3$i".toInt, s"4$i".toInt)) + ) + + // concurrent append to same dynamic partition + concurrentWrite( + (i: Int) => s"INSERT INTO TABLE t PARTITION(c3=3, c4)" + + s" SELECT _1 AS c1, _2 AS c2, 4 as c4 FROM s LIMIT ${i * 2000}", + Seq(Row(15 * 2000, 3, 4)) + ) + + // concurrent overwrite to different static partitions + concurrentWrite( + (i: Int) => s"INSERT OVERWRITE TABLE t PARTITION(c3=3$i, c4=4$i)" + + s" SELECT _1 AS c1, _2 AS c2 FROM s LIMIT ${i * 2000}", + (1 to 5).map(i => Row(i * 2000, s"3$i".toInt, s"4$i".toInt)) + ) + + // concurrent append to different static partitions + concurrentWrite( + (i: Int) => s"INSERT INTO TABLE t PARTITION(c3=3$i, c4=4$i)" + + s" SELECT _1 AS c1, _2 AS c2 FROM s LIMIT ${i * 2000}", + (1 to 5).map(i => Row(i * 2000, s"3$i".toInt, s"4$i".toInt)) + ) + + // concurrent append to same static partition + concurrentWrite( + (i: Int) => s"INSERT INTO TABLE t PARTITION(c3=3, c4=4)" + + s" SELECT _1 AS c1, _2 AS c2 FROM s LIMIT ${i * 2000}", + Seq(Row(15 * 2000, 3, 4)) + ) + } + } } class FileExistingTestFileSystem extends RawLocalFileSystem { From 87de653857ab113a354a91a18f17f2bfb32c6c27 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Fri, 29 Jul 2022 23:08:36 +0800 Subject: [PATCH 2/2] fix --- .../internal/io/FileCommitProtocol.scala | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 866f11c1badb7..7c13e81194327 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -214,22 +214,33 @@ object FileCommitProtocol extends Logging { s" forceUseStagingDir=$forceUseStagingDir") val clazz = Utils.classForName[FileCommitProtocol](className) // First try the constructor with arguments (jobId: String, outputPath: String, + // dynamicPartitionOverwrite: Boolean, forceUseStagingDir: Boolean). + // Second try the constructor with arguments (jobId: String, outputPath: String, // dynamicPartitionOverwrite: Boolean). // If that doesn't exist, try the one with (jobId: string, outputPath: String). try { val ctor = clazz.getDeclaredConstructor( classOf[String], classOf[String], classOf[Boolean], classOf[Boolean]) - logDebug("Using (String, String, Boolean) constructor") + logDebug("Using (String, String, Boolean, Boolean) constructor") ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean], forceUseStagingDir.asInstanceOf[java.lang.Boolean]) } catch { case _: NoSuchMethodException => - logDebug("Falling back to (String, String) constructor") - require(!dynamicPartitionOverwrite, - "Dynamic Partition Overwrite is enabled but" + - s" the committer ${className} does not have the appropriate constructor") - val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String]) - ctor.newInstance(jobId, outputPath) + try { + val ctor = clazz.getDeclaredConstructor( + classOf[String], classOf[String], classOf[Boolean]) + logDebug("Using (String, String, Boolean) constructor") + ctor.newInstance(jobId, outputPath, + dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean]) + } catch { + case _: NoSuchMethodException => + logDebug("Falling back to (String, String) constructor") + require(!dynamicPartitionOverwrite, + "Dynamic Partition Overwrite is enabled but" + + s" the committer ${className} does not have the appropriate constructor") + val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String]) + ctor.newInstance(jobId, outputPath) + } } }