Skip to content

Commit f834246

Browse files
committed
Move wrapping to HiveClientImpl
1 parent e7685a0 commit f834246

File tree

3 files changed

+37
-24
lines changed

3 files changed

+37
-24
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import scala.util.control.NonFatal
2727

2828
import org.apache.hadoop.conf.Configuration
2929
import org.apache.hadoop.fs.{FileSystem, Path}
30-
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException
3130
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.DDL_TIME
3231
import org.apache.hadoop.hive.ql.metadata.HiveException
3332
import org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT
@@ -37,7 +36,7 @@ import org.apache.spark.{SparkConf, SparkException}
3736
import org.apache.spark.internal.Logging
3837
import org.apache.spark.sql.AnalysisException
3938
import org.apache.spark.sql.catalyst.TableIdentifier
40-
import org.apache.spark.sql.catalyst.analysis.{PartitionsAlreadyExistException, TableAlreadyExistsException}
39+
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
4140
import org.apache.spark.sql.catalyst.catalog._
4241
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
4342
import org.apache.spark.sql.catalyst.expressions._
@@ -982,19 +981,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
982981
spec.map { case (k, v) => partCols.find(_.equalsIgnoreCase(k)).get -> v }
983982
}
984983

985-
private def isAlreadyExistsException(e: Throwable): Boolean = {
986-
val maxDepth = 4
987-
var depth = 0
988-
var cause = e
989-
var found = false
990-
while (!found && depth < maxDepth && cause != null) {
991-
found = cause.getClass.getCanonicalName == classOf[AlreadyExistsException].getCanonicalName
992-
cause = cause.getCause
993-
depth += 1
994-
}
995-
found
996-
}
997-
998984
override def createPartitions(
999985
db: String,
1000986
table: String,
@@ -1016,12 +1002,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
10161002
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri)))
10171003
}
10181004
val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
1019-
try {
1020-
client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
1021-
} catch {
1022-
case e: Throwable if isAlreadyExistsException(e) =>
1023-
throw new PartitionsAlreadyExistException(db, table, parts.map(_.spec))
1024-
}
1005+
client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
10251006
}
10261007

10271008
override def dropPartitions(

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.client
1919

2020
import java.io.{File, PrintStream}
2121
import java.lang.{Iterable => JIterable}
22+
import java.lang.reflect.InvocationTargetException
2223
import java.nio.charset.StandardCharsets.UTF_8
2324
import java.util.{Locale, Map => JMap}
2425
import java.util.concurrent.TimeUnit._
@@ -48,7 +49,7 @@ import org.apache.spark.internal.Logging
4849
import org.apache.spark.metrics.source.HiveCatalogMetrics
4950
import org.apache.spark.sql.AnalysisException
5051
import org.apache.spark.sql.catalyst.TableIdentifier
51-
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
52+
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, PartitionsAlreadyExistException}
5253
import org.apache.spark.sql.catalyst.catalog._
5354
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
5455
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -598,7 +599,17 @@ private[hive] class HiveClientImpl(
598599
table: String,
599600
parts: Seq[CatalogTablePartition],
600601
ignoreIfExists: Boolean): Unit = withHiveState {
601-
shim.createPartitions(client, db, table, parts, ignoreIfExists)
602+
def replaceExistException(e: Throwable): Unit = e match {
603+
case _: HiveException if e.getCause.isInstanceOf[AlreadyExistsException] =>
604+
throw new PartitionsAlreadyExistException(db, table, parts.map(_.spec))
605+
case _ => throw e
606+
}
607+
try {
608+
shim.createPartitions(client, db, table, parts, ignoreIfExists)
609+
} catch {
610+
case e: InvocationTargetException => replaceExistException(e.getCause)
611+
case e: Throwable => replaceExistException(e)
612+
}
602613
}
603614

604615
override def dropPartitions(

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.SparkFunSuite
3333
import org.apache.spark.internal.Logging
3434
import org.apache.spark.sql.{AnalysisException, Row}
3535
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
36-
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException}
36+
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException, PartitionsAlreadyExistException}
3737
import org.apache.spark.sql.catalyst.catalog._
3838
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
3939
import org.apache.spark.sql.catalyst.util.quietly
@@ -594,6 +594,27 @@ class VersionsSuite extends SparkFunSuite with Logging {
594594
assert(client.getPartitionOption("default", "src_part", spec).isEmpty)
595595
}
596596

597+
test(s"$version: createPartitions if already exists") {
598+
val partitions = Seq(CatalogTablePartition(
599+
Map("key1" -> "101", "key2" -> "102"),
600+
storageFormat))
601+
try {
602+
client.createPartitions("default", "src_part", partitions, ignoreIfExists = false)
603+
val errMsg = intercept[PartitionsAlreadyExistException] {
604+
client.createPartitions("default", "src_part", partitions, ignoreIfExists = false)
605+
}.getMessage
606+
assert(errMsg.contains("partitions already exists"))
607+
} finally {
608+
client.dropPartitions(
609+
"default",
610+
"src_part",
611+
partitions.map(_.spec),
612+
ignoreIfNotExists = true,
613+
purge = false,
614+
retainData = false)
615+
}
616+
}
617+
597618
///////////////////////////////////////////////////////////////////////////
598619
// Function related API
599620
///////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)