-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27899][SQL] Make HiveMetastoreClient.getTableObjectsByName available in ExternalCatalog/SessionCatalog API #24774
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Hi @wangyum @gatorsmile , could you have a chance to review? |
|
Thank you @LantaoJin for doing that! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if names from different databases? for example:
Seq("db1.table1", "db2.table1")There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have considered this case. Could this be happened in the reality use cases? But good catch, it should be handled in code. How about throws an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This view was not cleaned up when the UT failed in my environment. So adding this could stable the test case I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird. If the view is not dropped, why the tables do not exist in your environment?
|
@LantaoJin Could you post some benchmark result? |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you submit a refactor PR first? This can reduce the code changes made by this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to do. Does it need to create a new issue or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope. You can reuse this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #24803
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a special case. We need more to ensure all the hive metastore versions can correctly support it . Also negative cases are needed. :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, what means? Hasn’t it been covered from 0.12 to 3.1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this only covers a single table. How about multiple tables? or mixed with non-existent tables? with illegal table names? or an empty seq? and so on. :-)
|
Thank you for your fast work! Will review the details after you address the above comments. |
|
ok to test |
|
Test build #106115 has finished for PR 24774 at commit
|
|
Test build #106184 has finished for PR 24774 at commit
|
|
Test build #106199 has finished for PR 24774 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us list the input name list in the error message
…ilable in ExternalCatalog/SessionCatalog API
|
After rebased from master, the commit history contains many unrelated commits. I have to create a new local branch and cherry pick the commits from old local branch and then force push it to the remote old branch to clean them. |
|
Looks good now |
gatorsmile
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one same database -> the same database.
| val dbs = names.map(_.database.getOrElse(getCurrentDatabase)) | ||
| if (dbs.distinct.size != 1) { | ||
| val tables = names.map(name => formatTableName(name.table)) | ||
| dbs.zip(tables).map { case (d, t) => QualifiedTableName(d, t)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
| dbs.zip(tables).map { case (d, t) => QualifiedTableName(d, t)} | ||
| throw new AnalysisException( | ||
| s"Only the tables/views belong to one same database can be retrieved. Querying " + | ||
| s"tables/views are ${dbs.zip(tables).map { case (d, t) => QualifiedTableName(d, t)}}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be replaced by a variable name.
| } | ||
|
|
||
| test("get tables by name when some tables do not exists") { | ||
| assert(newBasicCatalog().getTablesByName("db2", Seq("tbl1", "tblnotexist")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See validNameFormat. Add a test case when the seq of table names contains the invalid name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird. If the view is not dropped, why the tables do not exist in your environment?
| /** Returns the metadata for the specified table or None if it doesn't exist. */ | ||
| def getTableOption(dbName: String, tableName: String): Option[CatalogTable] | ||
|
|
||
| def getTablesByName(dbName: String, tableNames: Seq[String]): Seq[CatalogTable] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment to describe the function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
| } | ||
|
|
||
| test(s"$version: getTablesByName when multiple tables") { | ||
| assert(client.getTablesByName("default", Seq("src", "temporary")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also try the invalid names here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
| // of type "array<string>". This happens when the table is created using | ||
| // an earlier version of Hive. | ||
| if (classOf[MetadataTypedColumnsetSerDe].getName | ||
| == tTable.getSd.getSerdeInfo.getSerializationLib |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4-space indent
== needs to be moved to the line 1139
| if (!(HiveTableType.VIRTUAL_VIEW.toString == tTable.getTableType)) { | ||
| // Fix the non-printable chars | ||
| val parameters: JMap[String, String] = tTable.getSd.getParameters | ||
| val sf: String = parameters.get(serdeConstants.SERIALIZATION_FORMAT) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check null here?
|
Test build #106225 has finished for PR 24774 at commit
|
|
Test build #106228 has finished for PR 24774 at commit
|
|
I did a simple benchmark in our production environment(The default database has cat <<EOF > SPARK-27899.scala
def benchmark(func: () => Unit): Long = {
val start = System.currentTimeMillis()
for(i <- 0 until 2) { func() }
val end = System.currentTimeMillis()
end - start
}
def default(): Unit = {
val list = new java.util.ArrayList[Array[AnyRef]]()
val catalog = spark.sessionState.catalog
catalog.listTables("default").foreach { tableIdentifier =>
val catalogTable = catalog.getTableMetadata(tableIdentifier)
val rowData = Array[AnyRef](
"",
catalogTable.database,
catalogTable.identifier.table,
catalogTable.tableType,
catalogTable.comment.getOrElse(""))
list.add(rowData)
}
}
def spark_27899(): Unit = {
val list = new java.util.ArrayList[Array[AnyRef]]()
val catalog = spark.sessionState.catalog
catalog.getTablesByName(catalog.listTables("default")).foreach { catalogTable =>
val rowData = Array[AnyRef](
"",
catalogTable.database,
catalogTable.identifier.table,
catalogTable.tableType,
catalogTable.comment.getOrElse(""))
list.add(rowData)
}
}
val defaultTimeToken = benchmark(() => default)
val spark27899TimeToken = benchmark(() => spark_27899)
println(s"Default time token: $defaultTimeToken")
println(s"SPARK-27899 time token: $spark27899TimeToken")
EOFBenchmark result: |
|
#24774 (comment) |
|
Test build #106349 has finished for PR 24774 at commit
|
|
LGTM Thanks! Merged to master. cc @juliuszsompolski |
## What changes were proposed in this pull request? This is a part of apache#24774, to reduce the code changes made by that. ## How was this patch tested? Exist UTs. Closes apache#24803 from LantaoJin/SPARK-27899_refactor. Authored-by: LantaoJin <[email protected]> Signed-off-by: gatorsmile <[email protected]>
…ilable in ExternalCatalog/SessionCatalog API ## What changes were proposed in this pull request? The new Spark ThriftServer SparkGetTablesOperation implemented in apache#22794 does a catalog.getTableMetadata request for every table. This can get very slow for large schemas (~50ms per table with an external Hive metastore). Hive ThriftServer GetTablesOperation uses HiveMetastoreClient.getTableObjectsByName to get table information in bulk, but we don't expose that through our APIs that go through Hive -> HiveClientImpl (HiveClient) -> HiveExternalCatalog (ExternalCatalog) -> SessionCatalog. If we added and exposed getTableObjectsByName through our catalog APIs, we could resolve that performance problem in SparkGetTablesOperation. ## How was this patch tested? Add UT Closes apache#24774 from LantaoJin/SPARK-27899. Authored-by: LantaoJin <[email protected]> Signed-off-by: gatorsmile <[email protected]>
What changes were proposed in this pull request?
The new Spark ThriftServer SparkGetTablesOperation implemented in #22794 does a catalog.getTableMetadata request for every table. This can get very slow for large schemas (~50ms per table with an external Hive metastore).
Hive ThriftServer GetTablesOperation uses HiveMetastoreClient.getTableObjectsByName to get table information in bulk, but we don't expose that through our APIs that go through Hive -> HiveClientImpl (HiveClient) -> HiveExternalCatalog (ExternalCatalog) -> SessionCatalog.
If we added and exposed getTableObjectsByName through our catalog APIs, we could resolve that performance problem in SparkGetTablesOperation.
How was this patch tested?
Add UT