Skip to content

Commit e7685a0

Browse files
committed
Throw PartitionsAlreadyExistException
1 parent e100834 commit e7685a0

File tree

1 file changed

+21
-2
lines changed

1 file changed

+21
-2
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import scala.util.control.NonFatal
2727

2828
import org.apache.hadoop.conf.Configuration
2929
import org.apache.hadoop.fs.{FileSystem, Path}
30+
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException
3031
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.DDL_TIME
3132
import org.apache.hadoop.hive.ql.metadata.HiveException
3233
import org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT
@@ -36,7 +37,7 @@ import org.apache.spark.{SparkConf, SparkException}
3637
import org.apache.spark.internal.Logging
3738
import org.apache.spark.sql.AnalysisException
3839
import org.apache.spark.sql.catalyst.TableIdentifier
39-
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
40+
import org.apache.spark.sql.catalyst.analysis.{PartitionsAlreadyExistException, TableAlreadyExistsException}
4041
import org.apache.spark.sql.catalyst.catalog._
4142
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
4243
import org.apache.spark.sql.catalyst.expressions._
@@ -981,6 +982,19 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
981982
spec.map { case (k, v) => partCols.find(_.equalsIgnoreCase(k)).get -> v }
982983
}
983984

985+
private def isAlreadyExistsException(e: Throwable): Boolean = {
986+
val maxDepth = 4
987+
var depth = 0
988+
var cause = e
989+
var found = false
990+
while (!found && depth < maxDepth && cause != null) {
991+
found = cause.getClass.getCanonicalName == classOf[AlreadyExistsException].getCanonicalName
992+
cause = cause.getCause
993+
depth += 1
994+
}
995+
found
996+
}
997+
984998
override def createPartitions(
985999
db: String,
9861000
table: String,
@@ -1002,7 +1016,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
10021016
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri)))
10031017
}
10041018
val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
1005-
client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
1019+
try {
1020+
client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
1021+
} catch {
1022+
case e: Throwable if isAlreadyExistsException(e) =>
1023+
throw new PartitionsAlreadyExistException(db, table, parts.map(_.spec))
1024+
}
10061025
}
10071026

10081027
override def dropPartitions(

0 commit comments

Comments
 (0)