Skip to content

Commit a66fe36

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-20236][SQL] dynamic partition overwrite
## What changes were proposed in this pull request? When overwriting a partitioned table with dynamic partition columns, the behavior is different between data source and hive tables. data source table: delete all partition directories that match the static partition values provided in the insert statement. hive table: only delete partition directories which have data written into it This PR adds a new config to make users be able to choose hive's behavior. ## How was this patch tested? new tests Author: Wenchen Fan <[email protected]> Closes #18714 from cloud-fan/overwrite-partition.
1 parent 1a87a16 commit a66fe36

File tree

6 files changed

+200
-29
lines changed

6 files changed

+200
-29
lines changed

core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ import org.apache.spark.util.Utils
2828
*
2929
* 1. Implementations must be serializable, as the committer instance instantiated on the driver
3030
* will be used for tasks on executors.
31-
* 2. Implementations should have a constructor with 2 arguments:
32-
* (jobId: String, path: String)
31+
* 2. Implementations should have a constructor with 2 or 3 arguments:
32+
* (jobId: String, path: String) or
33+
* (jobId: String, path: String, dynamicPartitionOverwrite: Boolean)
3334
* 3. A committer should not be reused across multiple Spark jobs.
3435
*
3536
* The proper call sequence is:
@@ -139,10 +140,22 @@ object FileCommitProtocol {
139140
/**
140141
* Instantiates a FileCommitProtocol using the given className.
141142
*/
142-
def instantiate(className: String, jobId: String, outputPath: String)
143-
: FileCommitProtocol = {
143+
def instantiate(
144+
className: String,
145+
jobId: String,
146+
outputPath: String,
147+
dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = {
144148
val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
145-
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
146-
ctor.newInstance(jobId, outputPath)
149+
// First try the constructor with arguments (jobId: String, outputPath: String,
150+
// dynamicPartitionOverwrite: Boolean).
151+
// If that doesn't exist, try the one with (jobId: string, outputPath: String).
152+
try {
153+
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean])
154+
ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean])
155+
} catch {
156+
case _: NoSuchMethodException =>
157+
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
158+
ctor.newInstance(jobId, outputPath)
159+
}
147160
}
148161
}

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

Lines changed: 59 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,19 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
3939
*
4040
* @param jobId the job's or stage's id
4141
* @param path the job's output path, or null if committer acts as a noop
42+
* @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime
43+
* dynamically, i.e., we first write files under a staging
44+
* directory with partition path, e.g.
45+
* /path/to/staging/a=1/b=1/xxx.parquet. When committing the job,
46+
* we first clean up the corresponding partition directories at
47+
* destination path, e.g. /path/to/destination/a=1/b=1, and move
48+
* files from staging directory to the corresponding partition
49+
* directories under destination path.
4250
*/
43-
class HadoopMapReduceCommitProtocol(jobId: String, path: String)
51+
class HadoopMapReduceCommitProtocol(
52+
jobId: String,
53+
path: String,
54+
dynamicPartitionOverwrite: Boolean = false)
4455
extends FileCommitProtocol with Serializable with Logging {
4556

4657
import FileCommitProtocol._
@@ -67,9 +78,17 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
6778
@transient private var addedAbsPathFiles: mutable.Map[String, String] = null
6879

6980
/**
70-
* The staging directory for all files committed with absolute output paths.
81+
* Tracks partitions with default path that have new files written into them by this task,
82+
* e.g. a=1/b=2. Files under these partitions will be saved into staging directory and moved to
83+
* destination directory at the end, if `dynamicPartitionOverwrite` is true.
7184
*/
72-
private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId)
85+
@transient private var partitionPaths: mutable.Set[String] = null
86+
87+
/**
88+
* The staging directory of this write job. Spark uses it to deal with files with absolute output
89+
* path, or writing data into partitioned directory with dynamicPartitionOverwrite=true.
90+
*/
91+
private def stagingDir = new Path(path, ".spark-staging-" + jobId)
7392

7493
protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
7594
val format = context.getOutputFormatClass.newInstance()
@@ -85,11 +104,16 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
85104
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
86105
val filename = getFilename(taskContext, ext)
87106

88-
val stagingDir: String = committer match {
107+
val stagingDir: Path = committer match {
108+
case _ if dynamicPartitionOverwrite =>
109+
assert(dir.isDefined,
110+
"The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
111+
partitionPaths += dir.get
112+
this.stagingDir
89113
// For FileOutputCommitter it has its own staging path called "work path".
90114
case f: FileOutputCommitter =>
91-
Option(f.getWorkPath).map(_.toString).getOrElse(path)
92-
case _ => path
115+
new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
116+
case _ => new Path(path)
93117
}
94118

95119
dir.map { d =>
@@ -106,8 +130,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
106130

107131
// Include a UUID here to prevent file collisions for one task writing to different dirs.
108132
// In principle we could include hash(absoluteDir) instead but this is simpler.
109-
val tmpOutputPath = new Path(
110-
absPathStagingDir, UUID.randomUUID().toString() + "-" + filename).toString
133+
val tmpOutputPath = new Path(stagingDir, UUID.randomUUID().toString() + "-" + filename).toString
111134

112135
addedAbsPathFiles(tmpOutputPath) = absOutputPath
113136
tmpOutputPath
@@ -141,37 +164,57 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
141164

142165
override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
143166
committer.commitJob(jobContext)
144-
val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]])
145-
.foldLeft(Map[String, String]())(_ ++ _)
146-
logDebug(s"Committing files staged for absolute locations $filesToMove")
167+
147168
if (hasValidPath) {
148-
val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
169+
val (allAbsPathFiles, allPartitionPaths) =
170+
taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip
171+
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
172+
173+
val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
174+
logDebug(s"Committing files staged for absolute locations $filesToMove")
175+
if (dynamicPartitionOverwrite) {
176+
val absPartitionPaths = filesToMove.values.map(new Path(_).getParent).toSet
177+
logDebug(s"Clean up absolute partition directories for overwriting: $absPartitionPaths")
178+
absPartitionPaths.foreach(fs.delete(_, true))
179+
}
149180
for ((src, dst) <- filesToMove) {
150181
fs.rename(new Path(src), new Path(dst))
151182
}
152-
fs.delete(absPathStagingDir, true)
183+
184+
if (dynamicPartitionOverwrite) {
185+
val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _)
186+
logDebug(s"Clean up default partition directories for overwriting: $partitionPaths")
187+
for (part <- partitionPaths) {
188+
val finalPartPath = new Path(path, part)
189+
fs.delete(finalPartPath, true)
190+
fs.rename(new Path(stagingDir, part), finalPartPath)
191+
}
192+
}
193+
194+
fs.delete(stagingDir, true)
153195
}
154196
}
155197

156198
override def abortJob(jobContext: JobContext): Unit = {
157199
committer.abortJob(jobContext, JobStatus.State.FAILED)
158200
if (hasValidPath) {
159-
val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
160-
fs.delete(absPathStagingDir, true)
201+
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
202+
fs.delete(stagingDir, true)
161203
}
162204
}
163205

164206
override def setupTask(taskContext: TaskAttemptContext): Unit = {
165207
committer = setupCommitter(taskContext)
166208
committer.setupTask(taskContext)
167209
addedAbsPathFiles = mutable.Map[String, String]()
210+
partitionPaths = mutable.Set[String]()
168211
}
169212

170213
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
171214
val attemptId = taskContext.getTaskAttemptID
172215
SparkHadoopMapRedUtil.commitTask(
173216
committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
174-
new TaskCommitMessage(addedAbsPathFiles.toMap)
217+
new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
175218
}
176219

177220
override def abortTask(taskContext: TaskAttemptContext): Unit = {

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,6 +1068,24 @@ object SQLConf {
10681068
.timeConf(TimeUnit.MILLISECONDS)
10691069
.createWithDefault(100)
10701070

1071+
object PartitionOverwriteMode extends Enumeration {
1072+
val STATIC, DYNAMIC = Value
1073+
}
1074+
1075+
val PARTITION_OVERWRITE_MODE =
1076+
buildConf("spark.sql.sources.partitionOverwriteMode")
1077+
.doc("When INSERT OVERWRITE a partitioned data source table, we currently support 2 modes: " +
1078+
"static and dynamic. In static mode, Spark deletes all the partitions that match the " +
1079+
"partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before " +
1080+
"overwriting. In dynamic mode, Spark doesn't delete partitions ahead, and only overwrite " +
1081+
"those partitions that have data written into it at runtime. By default we use static " +
1082+
"mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn't " +
1083+
"affect Hive serde tables, as they are always overwritten with dynamic mode.")
1084+
.stringConf
1085+
.transform(_.toUpperCase(Locale.ROOT))
1086+
.checkValues(PartitionOverwriteMode.values.map(_.toString))
1087+
.createWithDefault(PartitionOverwriteMode.STATIC.toString)
1088+
10711089
object Deprecated {
10721090
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
10731091
}
@@ -1394,6 +1412,9 @@ class SQLConf extends Serializable with Logging {
13941412

13951413
def concatBinaryAsString: Boolean = getConf(CONCAT_BINARY_AS_STRING)
13961414

1415+
def partitionOverwriteMode: PartitionOverwriteMode.Value =
1416+
PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))
1417+
13971418
/** ********************** SQLConf functionality methods ************ */
13981419

13991420
/** Set Spark SQL configuration properties. */

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
2929
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3030
import org.apache.spark.sql.execution.SparkPlan
3131
import org.apache.spark.sql.execution.command._
32+
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
3233
import org.apache.spark.sql.util.SchemaUtils
3334

3435
/**
@@ -89,20 +90,29 @@ case class InsertIntoHadoopFsRelationCommand(
8990
}
9091

9192
val pathExists = fs.exists(qualifiedOutputPath)
92-
// If we are appending data to an existing dir.
93-
val isAppend = pathExists && (mode == SaveMode.Append)
93+
94+
val enableDynamicOverwrite =
95+
sparkSession.sessionState.conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
96+
// This config only makes sense when we are overwriting a partitioned dataset with dynamic
97+
// partition columns.
98+
val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite &&
99+
staticPartitions.size < partitionColumns.length
94100

95101
val committer = FileCommitProtocol.instantiate(
96102
sparkSession.sessionState.conf.fileCommitProtocolClass,
97103
jobId = java.util.UUID.randomUUID().toString,
98-
outputPath = outputPath.toString)
104+
outputPath = outputPath.toString,
105+
dynamicPartitionOverwrite = dynamicPartitionOverwrite)
99106

100107
val doInsertion = (mode, pathExists) match {
101108
case (SaveMode.ErrorIfExists, true) =>
102109
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
103110
case (SaveMode.Overwrite, true) =>
104111
if (ifPartitionNotExists && matchingPartitions.nonEmpty) {
105112
false
113+
} else if (dynamicPartitionOverwrite) {
114+
// For dynamic partition overwrite, do not delete partition directories ahead.
115+
true
106116
} else {
107117
deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer)
108118
true
@@ -126,7 +136,9 @@ case class InsertIntoHadoopFsRelationCommand(
126136
catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)),
127137
ifNotExists = true).run(sparkSession)
128138
}
129-
if (mode == SaveMode.Overwrite) {
139+
// For dynamic partition overwrite, we never remove partitions but only update existing
140+
// ones.
141+
if (mode == SaveMode.Overwrite && !dynamicPartitionOverwrite) {
130142
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
131143
if (deletedPartitions.nonEmpty) {
132144
AlterTableDropPartitionCommand(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,15 @@ import org.apache.spark.sql.internal.SQLConf
2929
* A variant of [[HadoopMapReduceCommitProtocol]] that allows specifying the actual
3030
* Hadoop output committer using an option specified in SQLConf.
3131
*/
32-
class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String)
33-
extends HadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging {
32+
class SQLHadoopMapReduceCommitProtocol(
33+
jobId: String,
34+
path: String,
35+
dynamicPartitionOverwrite: Boolean = false)
36+
extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite)
37+
with Serializable with Logging {
3438

3539
override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
36-
var committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)
40+
var committer = super.setupCommitter(context)
3741

3842
val configuration = context.getConfiguration
3943
val clazz =

sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import java.io.File
2121

2222
import org.apache.spark.SparkException
2323
import org.apache.spark.sql.{AnalysisException, Row}
24+
import org.apache.spark.sql.internal.SQLConf
25+
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
2426
import org.apache.spark.sql.test.SharedSQLContext
2527
import org.apache.spark.util.Utils
2628

@@ -442,4 +444,80 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
442444
assert(e.contains("Only Data Sources providing FileFormat are supported"))
443445
}
444446
}
447+
448+
test("SPARK-20236: dynamic partition overwrite without catalog table") {
449+
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
450+
withTempPath { path =>
451+
Seq((1, 1, 1)).toDF("i", "part1", "part2")
452+
.write.partitionBy("part1", "part2").parquet(path.getAbsolutePath)
453+
checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(1, 1, 1))
454+
455+
Seq((2, 1, 1)).toDF("i", "part1", "part2")
456+
.write.partitionBy("part1", "part2").mode("overwrite").parquet(path.getAbsolutePath)
457+
checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(2, 1, 1))
458+
459+
Seq((2, 2, 2)).toDF("i", "part1", "part2")
460+
.write.partitionBy("part1", "part2").mode("overwrite").parquet(path.getAbsolutePath)
461+
checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)
462+
}
463+
}
464+
}
465+
466+
test("SPARK-20236: dynamic partition overwrite") {
467+
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
468+
withTable("t") {
469+
sql(
470+
"""
471+
|create table t(i int, part1 int, part2 int) using parquet
472+
|partitioned by (part1, part2)
473+
""".stripMargin)
474+
475+
sql("insert into t partition(part1=1, part2=1) select 1")
476+
checkAnswer(spark.table("t"), Row(1, 1, 1))
477+
478+
sql("insert overwrite table t partition(part1=1, part2=1) select 2")
479+
checkAnswer(spark.table("t"), Row(2, 1, 1))
480+
481+
sql("insert overwrite table t partition(part1=2, part2) select 2, 2")
482+
checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)
483+
484+
sql("insert overwrite table t partition(part1=1, part2=2) select 3")
485+
checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
486+
487+
sql("insert overwrite table t partition(part1=1, part2) select 4, 1")
488+
checkAnswer(spark.table("t"), Row(4, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
489+
}
490+
}
491+
}
492+
493+
test("SPARK-20236: dynamic partition overwrite with customer partition path") {
494+
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
495+
withTable("t") {
496+
sql(
497+
"""
498+
|create table t(i int, part1 int, part2 int) using parquet
499+
|partitioned by (part1, part2)
500+
""".stripMargin)
501+
502+
val path1 = Utils.createTempDir()
503+
sql(s"alter table t add partition(part1=1, part2=1) location '$path1'")
504+
sql(s"insert into t partition(part1=1, part2=1) select 1")
505+
checkAnswer(spark.table("t"), Row(1, 1, 1))
506+
507+
sql("insert overwrite table t partition(part1=1, part2=1) select 2")
508+
checkAnswer(spark.table("t"), Row(2, 1, 1))
509+
510+
sql("insert overwrite table t partition(part1=2, part2) select 2, 2")
511+
checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)
512+
513+
val path2 = Utils.createTempDir()
514+
sql(s"alter table t add partition(part1=1, part2=2) location '$path2'")
515+
sql("insert overwrite table t partition(part1=1, part2=2) select 3")
516+
checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
517+
518+
sql("insert overwrite table t partition(part1=1, part2) select 4, 1")
519+
checkAnswer(spark.table("t"), Row(4, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
520+
}
521+
}
522+
}
445523
}

0 commit comments

Comments
 (0)