Skip to content

Conversation

@ericl
Copy link
Contributor

@ericl ericl commented Nov 8, 2016

What changes were proposed in this pull request?

As of current 2.1, INSERT OVERWRITE with dynamic partitions against a Datasource table will overwrite the entire table instead of only the partitions matching the static keys, as in Hive. It also doesn't respect custom partition locations.

This PR adds support for all these operations to Datasource tables managed by the Hive metastore. It is implemented as follows

  • During planning time, the full set of partitions affected by an INSERT or OVERWRITE command is read from the Hive metastore.
  • The planner identifies any partitions with custom locations and includes this in the write task metadata.
  • FileFormatWriter tasks refer to this custom locations map when determining where to write for dynamic partition output.
  • When the write job finishes, the set of written partitions is compared against the initial set of matched partitions, and the Hive metastore is updated to reflect the newly added / removed partitions.

It was necessary to introduce a method for staging files with absolute output paths to FileCommitProtocol. These files are not handled by the Hadoop output committer but are moved to their final locations when the job commits.

The overwrite behavior of legacy Datasource tables is also changed: no longer will the entire table be overwritten if a partial partition spec is present.

cc @cloud-fan @yhuai

How was this patch tested?

Unit tests, existing tests.

/**
* Similar to newTaskTempFile(), but allows files to committed to an absolute output location.
* Depending on the implementation, there may be weaker guarantees around adding files this way.
*/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want a default implementation? If the protocol doesn't implement this things will go seriously wrong at runtime wouldn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you unify this and newTaskTempFile? If we treat the default partition location like custom location.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about combining it but I think the method semantics become too subtle then.

@rxin
Copy link
Contributor

rxin commented Nov 8, 2016

Can you change the title to indicate that this is a serious bug fix? It sounds like a new feature but in reality it is a serious behavior bug.

val plan =
InsertIntoHadoopFsRelationCommand(
outputPath,
Map.empty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's change the call to use named arguments for all the arguments here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}

test("insert into and overwrite new datasource tables with partial specs and custom locs") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we break this into multiple test cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
@transient private var addedAbsPathFiles: mutable.Map[String, String] = null

private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* Tracks files staged by this task for absolute output paths. These outputs are not managed by
* the Hadoop OutputCommitter, so we must move these to their final locations on job commit.
*/
@transient private var addedAbsPathFiles: mutable.Map[String, String] = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should document whether the strings are path to files, or just path to directories. I think they are just directories right? The naming suggests that they are files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are files. We need to track the unique output location of each file here in order to know where to place it. We could use directories, but they would end up with one file each anyways.

} else {
None
}
val staticPartitionKeys = partitionKeys.filter(_._2.nonEmpty).map(t => (t._1, t._2.get))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i find the _1 and _2s here impossible to read. Can we add an explicit type to val partitionKeys ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*
* @param enabled whether to overwrite existing data in the table.
* @param specificPartition only data in the specified partition will be overwritten.
* @param staticPartitionKeys if non-empty, specifies that we only want to overwrite partitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the partition spec by partial? we should document that here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep it is in the next sentence.

// When partitions are tracked by the catalog, compute all custom partition locations that
// may be relevant to the insertion job.
if (partitionsTrackedByCatalog) {
val matchingPartitions = t.sparkSession.sessionState.catalog.listPartitions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd create a new API in external catalog api and push this into it. In the future we can look into how to optimize this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need the set of matching partitions (including those with default locations) in order to determine which ones to delete at the end of an overwrite call.

This makes the optimization quite messy, so I'd rather not push it to the catalog for now.

@SparkQA
Copy link

SparkQA commented Nov 8, 2016

Test build #68360 has finished for PR 15814 at commit b5bbb1d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ericl ericl changed the title [SPARK-18185] Support all forms of INSERT / OVERWRITE TABLE for Datasource tables [SPARK-18185] Fix all forms of INSERT / OVERWRITE TABLE for Datasource tables Nov 8, 2016
@SparkQA
Copy link

SparkQA commented Nov 9, 2016

Test build #68385 has finished for PR 15814 at commit fbd7b42.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

"/" + PartitioningUtils.getPathFragment(p.spec, table.partitionSchema)).toString
val catalogLocation = new Path(p.storage.locationUri.get).makeQualified(
fs.getUri, fs.getWorkingDirectory).toString
if (catalogLocation != defaultLocation) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we distinguish partition locations that equal to default location? Partitions always have locations(custom specified or generated by default), do we really need to care about who set it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only purpose here is to optimize the common case where the directory scheme is followed. Othewise, we have to broadcast all the partition locations even if they are using the default.

@SparkQA
Copy link

SparkQA commented Nov 9, 2016

Test build #68387 has finished for PR 15814 at commit 4296612.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 9, 2016

Test build #68388 has finished for PR 15814 at commit 91f87de.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

SparkHadoopMapRedUtil.commitTask(
committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
EmptyTaskCommitMessage
new TaskCommitMessage(addedAbsPathFiles.toMap)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we just rename temp files to dest files in commitTask?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it can go either way. Unclear which one is better. Renaming on job commit gives higher chance of corrupting data, whereas renaming in task commit is slightly more performant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer renaming in task commit.

}
// first clear the path determined by the static partition keys (e.g. /table/foo=1)
val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix)
if (fs.exists(staticPrefixPath) && !fs.delete(staticPrefixPath, true /* recursively */)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want the behavior to delete the partitions matching the static prefix?

According to Hive manual at https://cwiki.apache.org/confluence/display/Hive/Tutorial#Tutorial-Dynamic-PartitionInsert:

FROM page_view_stg pvs
INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country)
SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip, pvs.country

When there are already non-empty partitions exists for the dynamic partition columns, (for example, country='CA' exists under some ds root partition), it will be overwritten if the dynamic partition insert saw the same value (say 'CA') in the input data. This is in line with the 'insert overwrite' semantics. However, if the partition value 'CA' does not appear in the input data, the existing partition will not be overwritten.

Thus when the static prefix is dt=2008-06-08, we should only overwrite the partitions with country=CA if the input data only contains country=CA. If we delete all partitions belonging to the static prefix, we delete other partitions such as country=US, country=UK, etc. too.

Do I understand it correctly here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh rats. These semantics might be a little difficult to implement, but perhaps we can use the loadDynamicPartitions call here after writing to a dummy location. I'll take a look tomorrow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just discussed this with @yhuai and it might be ok to keep this behavior if we carefully document it. Implementing the Hive behavior with the file commit protocol is tricky and might be too much to get into 2.1.

cc @rxin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally find the hive behavior pretty weird.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK for me if we document it.

@SparkQA
Copy link

SparkQA commented Nov 9, 2016

Test build #68423 has finished for PR 15814 at commit 81bb266.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class JavaInteractionExample
    • class GBTClassifierWrapperWriter(instance: GBTClassifierWrapper)
    • class GBTClassifierWrapperReader extends MLReader[GBTClassifierWrapper]
    • class GBTRegressorWrapperWriter(instance: GBTRegressorWrapper)
    • class GBTRegressorWrapperReader extends MLReader[GBTRegressorWrapper]

@ericl
Copy link
Contributor Author

ericl commented Nov 10, 2016

I've temporarily merged in the changes from #15797 to see if the tests will pass.

@SparkQA
Copy link

SparkQA commented Nov 10, 2016

Test build #68431 has finished for PR 15814 at commit 51c1322.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
	sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
	sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@SparkQA
Copy link

SparkQA commented Nov 11, 2016

Test build #68492 has finished for PR 15814 at commit 63f7f2e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Nov 11, 2016

Merging in master/branch-2.1. Thanks.

asfgit pushed a commit that referenced this pull request Nov 11, 2016
…e tables

## What changes were proposed in this pull request?

As of current 2.1, INSERT OVERWRITE with dynamic partitions against a Datasource table will overwrite the entire table instead of only the partitions matching the static keys, as in Hive. It also doesn't respect custom partition locations.

This PR adds support for all these operations to Datasource tables managed by the Hive metastore. It is implemented as follows
- During planning time, the full set of partitions affected by an INSERT or OVERWRITE command is read from the Hive metastore.
- The planner identifies any partitions with custom locations and includes this in the write task metadata.
- FileFormatWriter tasks refer to this custom locations map when determining where to write for dynamic partition output.
- When the write job finishes, the set of written partitions is compared against the initial set of matched partitions, and the Hive metastore is updated to reflect the newly added / removed partitions.

It was necessary to introduce a method for staging files with absolute output paths to `FileCommitProtocol`. These files are not handled by the Hadoop output committer but are moved to their final locations when the job commits.

The overwrite behavior of legacy Datasource tables is also changed: no longer will the entire table be overwritten if a partial partition spec is present.

cc cloud-fan yhuai

## How was this patch tested?

Unit tests, existing tests.

Author: Eric Liang <[email protected]>
Author: Wenchen Fan <[email protected]>

Closes #15814 from ericl/sc-5027.

(cherry picked from commit a335634)
Signed-off-by: Reynold Xin <[email protected]>
@asfgit asfgit closed this in a335634 Nov 11, 2016
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…e tables

## What changes were proposed in this pull request?

As of current 2.1, INSERT OVERWRITE with dynamic partitions against a Datasource table will overwrite the entire table instead of only the partitions matching the static keys, as in Hive. It also doesn't respect custom partition locations.

This PR adds support for all these operations to Datasource tables managed by the Hive metastore. It is implemented as follows
- During planning time, the full set of partitions affected by an INSERT or OVERWRITE command is read from the Hive metastore.
- The planner identifies any partitions with custom locations and includes this in the write task metadata.
- FileFormatWriter tasks refer to this custom locations map when determining where to write for dynamic partition output.
- When the write job finishes, the set of written partitions is compared against the initial set of matched partitions, and the Hive metastore is updated to reflect the newly added / removed partitions.

It was necessary to introduce a method for staging files with absolute output paths to `FileCommitProtocol`. These files are not handled by the Hadoop output committer but are moved to their final locations when the job commits.

The overwrite behavior of legacy Datasource tables is also changed: no longer will the entire table be overwritten if a partial partition spec is present.

cc cloud-fan yhuai

## How was this patch tested?

Unit tests, existing tests.

Author: Eric Liang <[email protected]>
Author: Wenchen Fan <[email protected]>

Closes apache#15814 from ericl/sc-5027.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants