-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-32512][SQL] add alter table add/drop partition command for datasourcev2 #29339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Change-Id: I002942962f8b41115edad0461c8980f67517947d
Change-Id: Id4c4ee16dec31def7fbbb8609ef5ed41804c5402
| val partParams = new java.util.HashMap[String, String](table.properties()) | ||
| location.foreach(locationUri => | ||
| partParams.put("location", locationUri)) | ||
| partParams.put("ignoreIfExists", ignoreIfExists.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this added to the partition parameters? I think Spark should handle this by ignoring PartitionAlreadyExistsException like we do in other cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine, I change it.
| if (conflictKeys.nonEmpty) { | ||
| throw new AnalysisException( | ||
| s"Partition key ${conflictKeys.mkString(",")} " + | ||
| s"not exists in ${ident.namespace().quoted}.${ident.name()}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: indentation doesn't match between these lines.
|
|
||
| def convertPartitionIndentifers( | ||
| partSpec: TablePartitionSpec, | ||
| partSchema: StructType): InternalRow = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this included with the implicits when it isn't an implicit class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, a little ugly to me if it defined with implicits. I can change it if you think it's better with implicits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it needs to be implicit. I just don't think it belongs in the implicits class if it isn't an implicit. I think there is a util class you could include this in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I misunderstood. thanks.
| val partValues = partSchema.map { part => | ||
| part.dataType match { | ||
| case _: ByteType => | ||
| partSpec.getOrElse(part.name, "0").toByte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conversion to InternalRow should not modify the partition values by filling in defaults. Filling in a default like this is a correctness bug.
I think this should require that all partition names are present in the map, and pass null if a name is present but does not have a value. If the partition doesn't allow null partition values, then it should throw an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. sounds reasonable to me.
| */ | ||
| case class AlterTableAddPartitionExec( | ||
| catalog: TableCatalog, | ||
| ident: Identifier, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not pass a Table instance like other plans that modify table data (e.g., AppendDataExec)?
We generally like to load tables early, so that we can do as much validation as possible in the analyzer and planner. By loading the table before passing it here, we would be able to use analyzer rules to validate the partition specs against the table's partition schema, and to make sure the table implements SupportsPartitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will change it.
|
Test build #126993 has finished for PR 29339 at commit
|
...c/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableDropPartitionExec.scala
Show resolved
Hide resolved
| specs: Seq[TablePartitionSpec], | ||
| ignoreIfNotExists: Boolean, | ||
| purge: Boolean, | ||
| retainData: Boolean) extends V2CommandExec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should think about how to deal with purge and retainData. Shall we just put them into partition properties? Or put it into dropPartition parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two configurations seem to be mainly used in the hive tables. Besides, the retainData is always false, and purge only works in some versions.
Maybe put then into table properties? AFAIT, it the table that define these operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have give a warning for purge. Does this looks fine to you? @cloud-fan
Change-Id: I10d4ff8d86fb70f195efa21156eed03dd0a74a32
|
Test build #127060 has finished for PR 29339 at commit
|
Change-Id: I725a84ec99187b8da6807b4acbdd7b39a740c036
Change-Id: I11a9dc214102854f533330a739d32a41500d9278
|
Test build #127072 has finished for PR 29339 at commit
|
|
cc @rdblue and @cloud-fan |
|
Test build #127075 has finished for PR 29339 at commit
|
|
retest this please |
| override protected def run(): Seq[InternalRow] = { | ||
| partitions.foreach { case (partIdent, properties) => | ||
| try { | ||
| table.createPartition(partIdent, properties.asJava) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SQL command can add multiple partitions at once, and ideally it should be atomic.
I have 2 thoughts:
- for v2, we don't allow the SQL command to add more than one partitions at once.
- the v2 API should be
createPartitionsthat takes an array of partitions.
Which one do you prefer? The same problem applies to drop partitions as well. @rdblue @stczwd
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, the hive catalog API is createPartitions, which can create a list of partitions at once. Personally I prefer 2, other catalog implementation can fail with more than one partitions if they can't do it in an atomic way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. Supporting createPartitions means that Table needs to support the atomic partitions operations. Once there is a problem in the middle operation, such as partition already exists, it can roll back to the state before operations.
Hive did support atomic createPartitions, I don't know if others support this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that a variant of createPartition that works for multiple partitions should be added as an optional trait, or using optional methods in the existing interface.
We want sources to have predictable, standard behavior. I think that means that when adding a single partition in a group fails, the ones that were already successful should be rolled back. That way multiple calls to createPartition have the same result as a single call to createPartitions and the SQL statement has well-defined and reliable behavior.
If we agree on that behavior, then I think it is clear that Spark should handle calling either createPartition multiple times or calling createPartitions because that's the best way to get reliable behavior, while keeping table implementations simple -- those that don't support an atomic operation just implement createPartition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That way multiple calls to createPartition have the same result as a single call to createPartitions and the SQL statement has well-defined and reliable behavior.
It is hard to support atomic operation with multiple calls to createPartition, as the program may stop in the middle of operations.
How about this? We create a new optional trait SupportsAtomicPartitions to support multiple partition atomic operations, and for those who don't support, we should return UnsupportedOperationException if user trying to operate multiple partitions at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rolling back partition changes is like what we do with CTAS. If the write fails for non-atomic CTAS, we drop the table that was created. That won't always work, but at least the expectation is that the commands have the same behavior.
I'm okay with failing ADD PARTITION commands that have multiple partitions if atomic create/drop is not supported as well. That seems like another reasonable way to go. The important thing to me is that the commands have the same stated behavior across sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The important thing to me is that the commands have the same stated behavior across sources.
Yes, agree with that.
We create a new optional trait SupportsAtomicPartitions to support multiple partition atomic operations, and for those who don't support, we should return UnsupportedOperationException if user trying to operate multiple partitions at the same time.
I still prefer this. @cloud-fan does this look good to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SupportsAtomicPartitions SGTM, but we need to make the naming better.
How about SupportsPartitionManagement and SupportsAtomicPartitionManagement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's OK to me. I'll change it, thanks.
|
Test build #127086 has finished for PR 29339 at commit
|
Change-Id: I7b782bfafe77e62b842fd6533347d6c705c62033
Change-Id: I377cb196b8c8189b96db5765e3435bd464a2e6b2
|
Test build #127145 has finished for PR 29339 at commit
|
Change-Id: Ia9f4c9d2724f6c05ecf1aa6a025d379140586853
|
Test build #127146 has finished for PR 29339 at commit
|
| case AlterTableDropPartitionStatement(tbl, specs, ifExists, purge, retainData) => | ||
| val v1TableName = parseV1Table(tbl, "ALTER TABLE DROP PARTITION") | ||
| case AlterTableDropPartition( | ||
| r @ ResolvedTable(_, _, _: V1Table), specs, ifExists, purge, retainData) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for some minor comments
| failAnalysis(s"Table ${table.name()} can not alter partitions.") | ||
|
|
||
| // Skip atomic partition tables | ||
| case (_: SupportsAtomicPartitionManagement, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR: I'm wondering if we do need this separation. Do we have a concern that it's hard for implementations to add/drop multiple partitions atomically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Em, it depends on whether the third-party system or storage supports transaction. MySQL and Hive can support this very well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an example, TableCatalog.alterTable accepts a list of TableChange without adding a new atomic API. I don't know why partition API needs to be different.
|
Kubernetes integration test starting |
|
Test build #130878 has finished for PR 29339 at commit
|
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #130879 has finished for PR 29339 at commit
|
|
Test build #130883 has finished for PR 29339 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #130903 has finished for PR 29339 at commit
|
|
retest this please |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #130906 has finished for PR 29339 at commit
|
|
GA passed, merging to master, thanks! |
|
@cloud-fan @rdblue @MaxGekk |
|
I removed duplicate tests from |
…TABLE .. PARTITIONS from DataSourceV2SQLSuite ### What changes were proposed in this pull request? Remove tests from `DataSourceV2SQLSuite` that were copied to `AlterTablePartitionV2SQLSuite` by #29339. ### Why are the changes needed? - To reduce tests execution time - To improve test maintenance ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By running the modified tests: ``` $ build/sbt "test:testOnly *DataSourceV2SQLSuite" $ build/sbt "test:testOnly *AlterTablePartitionV2SQLSuite" ``` Closes #30444 from MaxGekk/dedup-tests-AlterTablePartitionV2SQLSuite. Authored-by: Max Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This patch is trying to add
AlterTableAddPartitionExecandAlterTableDropPartitionExecwith the new table partition API, defined in #28617.Does this PR introduce any user-facing change?
Yes. User can use
alter table add partitionoralter table drop partitionto create/drop partition in V2Table.How was this patch tested?
Run suites and fix old tests.