-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18185] Fix all forms of INSERT / OVERWRITE TABLE for Datasource tables #15814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6b6273d
40f4368
3c8b43e
38e09f4
fb7ba10
3318970
aa2536f
bbe1e12
a905b09
3c43fa5
13aa481
37afbb1
dddee47
e97c17f
f85bb27
6dc4cb5
d68d1db
b5bbb1d
d939b37
fae929e
fbd7b42
4296612
001cb1d
91f87de
9dbc3f1
81bb266
8e10ff7
1f090b1
1500566
51c1322
63f7f2e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,9 @@ | |
|
|
||
| package org.apache.spark.internal.io | ||
|
|
||
| import java.util.Date | ||
| import java.util.{Date, UUID} | ||
|
|
||
| import scala.collection.mutable | ||
|
|
||
| import org.apache.hadoop.conf.Configurable | ||
| import org.apache.hadoop.fs.Path | ||
|
|
@@ -42,6 +44,19 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) | |
| /** OutputCommitter from Hadoop is not serializable so marking it transient. */ | ||
| @transient private var committer: OutputCommitter = _ | ||
|
|
||
| /** | ||
| * Tracks files staged by this task for absolute output paths. These outputs are not managed by | ||
| * the Hadoop OutputCommitter, so we must move these to their final locations on job commit. | ||
| * | ||
| * The mapping is from the temp output path to the final desired output path of the file. | ||
| */ | ||
| @transient private var addedAbsPathFiles: mutable.Map[String, String] = null | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should document whether the strings are path to files, or just path to directories. I think they are just directories right? The naming suggests that they are files.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They are files. We need to track the unique output location of each file here in order to know where to place it. We could use directories, but they would end up with one file each anyways. |
||
|
|
||
| /** | ||
| * The staging directory for all files committed with absolute output paths. | ||
| */ | ||
| private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. document this
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
|
||
| protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { | ||
| val format = context.getOutputFormatClass.newInstance() | ||
| // If OutputFormat is Configurable, we should set conf to it. | ||
|
|
@@ -54,11 +69,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) | |
|
|
||
| override def newTaskTempFile( | ||
| taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { | ||
| // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet | ||
| // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, | ||
| // the file name is fine and won't overflow. | ||
| val split = taskContext.getTaskAttemptID.getTaskID.getId | ||
| val filename = f"part-$split%05d-$jobId$ext" | ||
| val filename = getFilename(taskContext, ext) | ||
|
|
||
| val stagingDir: String = committer match { | ||
| // For FileOutputCommitter it has its own staging path called "work path". | ||
|
|
@@ -73,6 +84,28 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) | |
| } | ||
| } | ||
|
|
||
| override def newTaskTempFileAbsPath( | ||
| taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { | ||
| val filename = getFilename(taskContext, ext) | ||
| val absOutputPath = new Path(absoluteDir, filename).toString | ||
|
|
||
| // Include a UUID here to prevent file collisions for one task writing to different dirs. | ||
| // In principle we could include hash(absoluteDir) instead but this is simpler. | ||
| val tmpOutputPath = new Path( | ||
| absPathStagingDir, UUID.randomUUID().toString() + "-" + filename).toString | ||
|
|
||
| addedAbsPathFiles(tmpOutputPath) = absOutputPath | ||
| tmpOutputPath | ||
| } | ||
|
|
||
| private def getFilename(taskContext: TaskAttemptContext, ext: String): String = { | ||
| // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet | ||
| // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, | ||
| // the file name is fine and won't overflow. | ||
| val split = taskContext.getTaskAttemptID.getTaskID.getId | ||
| f"part-$split%05d-$jobId$ext" | ||
| } | ||
|
|
||
| override def setupJob(jobContext: JobContext): Unit = { | ||
| // Setup IDs | ||
| val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0) | ||
|
|
@@ -93,26 +126,42 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) | |
|
|
||
| override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { | ||
| committer.commitJob(jobContext) | ||
| val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]]) | ||
| .foldLeft(Map[String, String]())(_ ++ _) | ||
| logDebug(s"Committing files staged for absolute locations $filesToMove") | ||
| val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) | ||
| for ((src, dst) <- filesToMove) { | ||
| fs.rename(new Path(src), new Path(dst)) | ||
| } | ||
| fs.delete(absPathStagingDir, true) | ||
| } | ||
|
|
||
| override def abortJob(jobContext: JobContext): Unit = { | ||
| committer.abortJob(jobContext, JobStatus.State.FAILED) | ||
| val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) | ||
| fs.delete(absPathStagingDir, true) | ||
| } | ||
|
|
||
| override def setupTask(taskContext: TaskAttemptContext): Unit = { | ||
| committer = setupCommitter(taskContext) | ||
| committer.setupTask(taskContext) | ||
| addedAbsPathFiles = mutable.Map[String, String]() | ||
| } | ||
|
|
||
| override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { | ||
| val attemptId = taskContext.getTaskAttemptID | ||
| SparkHadoopMapRedUtil.commitTask( | ||
| committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) | ||
| EmptyTaskCommitMessage | ||
| new TaskCommitMessage(addedAbsPathFiles.toMap) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't we just rename temp files to dest files in commitTask?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea it can go either way. Unclear which one is better. Renaming on job commit gives higher chance of corrupting data, whereas renaming in task commit is slightly more performant.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer renaming in task commit. |
||
| } | ||
|
|
||
| override def abortTask(taskContext: TaskAttemptContext): Unit = { | ||
| committer.abortTask(taskContext) | ||
| // best effort cleanup of other staged files | ||
| for ((src, _) <- addedAbsPathFiles) { | ||
| val tmp = new Path(src) | ||
| tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false) | ||
| } | ||
| } | ||
|
|
||
| /** Whether we are using a direct output committer */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -349,13 +349,15 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { | |
| * Options for writing new data into a table. | ||
| * | ||
| * @param enabled whether to overwrite existing data in the table. | ||
| * @param specificPartition only data in the specified partition will be overwritten. | ||
| * @param staticPartitionKeys if non-empty, specifies that we only want to overwrite partitions | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can the partition spec by partial? we should document that here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep it is in the next sentence. |
||
| * that match this partial partition spec. If empty, all partitions | ||
| * will be overwritten. | ||
| */ | ||
| case class OverwriteOptions( | ||
| enabled: Boolean, | ||
| specificPartition: Option[CatalogTypes.TablePartitionSpec] = None) { | ||
| if (specificPartition.isDefined) { | ||
| assert(enabled, "Overwrite must be enabled when specifying a partition to overwrite.") | ||
| staticPartitionKeys: CatalogTypes.TablePartitionSpec = Map.empty) { | ||
| if (staticPartitionKeys.nonEmpty) { | ||
| assert(enabled, "Overwrite must be enabled when specifying specific partitions.") | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,10 +24,10 @@ import org.apache.hadoop.fs.Path | |
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql._ | ||
| import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow} | ||
| import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, TableIdentifier} | ||
| import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala | ||
| import org.apache.spark.sql.catalyst.analysis._ | ||
| import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation} | ||
| import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, SimpleCatalogRelation} | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec | ||
| import org.apache.spark.sql.catalyst.expressions | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
|
|
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} | |
| import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning} | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} | ||
| import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, DDLUtils, ExecutedCommandExec} | ||
| import org.apache.spark.sql.execution.command._ | ||
| import org.apache.spark.sql.sources._ | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.unsafe.types.UTF8String | ||
|
|
@@ -182,41 +182,53 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { | |
| "Cannot overwrite a path that is also being read from.") | ||
| } | ||
|
|
||
| val overwritingSinglePartition = | ||
| overwrite.specificPartition.isDefined && | ||
| val partitionSchema = query.resolve( | ||
| t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver) | ||
| val partitionsTrackedByCatalog = | ||
| t.sparkSession.sessionState.conf.manageFilesourcePartitions && | ||
| l.catalogTable.isDefined && l.catalogTable.get.partitionColumnNames.nonEmpty && | ||
| l.catalogTable.get.tracksPartitionsInCatalog | ||
|
|
||
| val effectiveOutputPath = if (overwritingSinglePartition) { | ||
| val partition = t.sparkSession.sessionState.catalog.getPartition( | ||
| l.catalogTable.get.identifier, overwrite.specificPartition.get) | ||
| new Path(partition.location) | ||
| } else { | ||
| outputPath | ||
| } | ||
|
|
||
| val effectivePartitionSchema = if (overwritingSinglePartition) { | ||
| Nil | ||
| } else { | ||
| query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver) | ||
| var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil | ||
| var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty | ||
|
|
||
| // When partitions are tracked by the catalog, compute all custom partition locations that | ||
| // may be relevant to the insertion job. | ||
| if (partitionsTrackedByCatalog) { | ||
| val matchingPartitions = t.sparkSession.sessionState.catalog.listPartitions( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd create a new API in external catalog api and push this into it. In the future we can look into how to optimize this.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You also need the set of matching partitions (including those with default locations) in order to determine which ones to delete at the end of an overwrite call. This makes the optimization quite messy, so I'd rather not push it to the catalog for now. |
||
| l.catalogTable.get.identifier, Some(overwrite.staticPartitionKeys)) | ||
| initialMatchingPartitions = matchingPartitions.map(_.spec) | ||
| customPartitionLocations = getCustomPartitionLocations( | ||
| t.sparkSession, l.catalogTable.get, outputPath, matchingPartitions) | ||
| } | ||
|
|
||
| // Callback for updating metastore partition metadata after the insertion job completes. | ||
| // TODO(ekl) consider moving this into InsertIntoHadoopFsRelationCommand | ||
| def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = { | ||
| if (l.catalogTable.isDefined && updatedPartitions.nonEmpty && | ||
| l.catalogTable.get.partitionColumnNames.nonEmpty && | ||
| l.catalogTable.get.tracksPartitionsInCatalog) { | ||
| val metastoreUpdater = AlterTableAddPartitionCommand( | ||
| l.catalogTable.get.identifier, | ||
| updatedPartitions.map(p => (p, None)), | ||
| ifNotExists = true) | ||
| metastoreUpdater.run(t.sparkSession) | ||
| if (partitionsTrackedByCatalog) { | ||
| val newPartitions = updatedPartitions.toSet -- initialMatchingPartitions | ||
| if (newPartitions.nonEmpty) { | ||
| AlterTableAddPartitionCommand( | ||
| l.catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)), | ||
| ifNotExists = true).run(t.sparkSession) | ||
| } | ||
| if (overwrite.enabled) { | ||
| val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions | ||
| if (deletedPartitions.nonEmpty) { | ||
| AlterTableDropPartitionCommand( | ||
| l.catalogTable.get.identifier, deletedPartitions.toSeq, | ||
| ifExists = true, purge = true).run(t.sparkSession) | ||
| } | ||
| } | ||
| } | ||
| t.location.refresh() | ||
| } | ||
|
|
||
| val insertCmd = InsertIntoHadoopFsRelationCommand( | ||
| effectiveOutputPath, | ||
| effectivePartitionSchema, | ||
| outputPath, | ||
| if (overwrite.enabled) overwrite.staticPartitionKeys else Map.empty, | ||
| customPartitionLocations, | ||
| partitionSchema, | ||
| t.bucketSpec, | ||
| t.fileFormat, | ||
| refreshPartitionsCallback, | ||
|
|
@@ -227,6 +239,34 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { | |
|
|
||
| insertCmd | ||
| } | ||
|
|
||
| /** | ||
| * Given a set of input partitions, returns those that have locations that differ from the | ||
| * Hive default (e.g. /k1=v1/k2=v2). These partitions were manually assigned locations by | ||
| * the user. | ||
| * | ||
| * @return a mapping from partition specs to their custom locations | ||
| */ | ||
| private def getCustomPartitionLocations( | ||
| spark: SparkSession, | ||
| table: CatalogTable, | ||
| basePath: Path, | ||
| partitions: Seq[CatalogTablePartition]): Map[TablePartitionSpec, String] = { | ||
| val hadoopConf = spark.sessionState.newHadoopConf | ||
| val fs = basePath.getFileSystem(hadoopConf) | ||
| val qualifiedBasePath = basePath.makeQualified(fs.getUri, fs.getWorkingDirectory) | ||
| partitions.flatMap { p => | ||
| val defaultLocation = qualifiedBasePath.suffix( | ||
| "/" + PartitioningUtils.getPathFragment(p.spec, table.partitionSchema)).toString | ||
| val catalogLocation = new Path(p.location).makeQualified( | ||
| fs.getUri, fs.getWorkingDirectory).toString | ||
| if (catalogLocation != defaultLocation) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why we distinguish partition locations that equal to default location? Partitions always have locations(custom specified or generated by default), do we really need to care about who set it?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only purpose here is to optimize the common case where the directory scheme is followed. Othewise, we have to broadcast all the partition locations even if they are using the default. |
||
| Some(p.spec -> catalogLocation) | ||
| } else { | ||
| None | ||
| } | ||
| }.toMap | ||
| } | ||
| } | ||
|
|
||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rxin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want a default implementation? If the protocol doesn't implement this things will go seriously wrong at runtime wouldn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you unify this and
newTaskTempFile? If we treat the default partition location like custom location.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about combining it but I think the method semantics become too subtle then.