Skip to content

Commit 143927a

Browse files
committed
Simplify code.
1 parent cc1d472 commit 143927a

File tree

1 file changed

+15
-32
lines changed

1 file changed

+15
-32
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 15 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -69,22 +69,16 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
6969
val table = synchronized {
7070
client.getTable(in.database, in.name)
7171
}
72-
val schemaString = Option(table.getProperty("spark.sql.sources.schema"))
73-
.orElse {
74-
// If spark.sql.sources.schema is not defined, we either splitted the schema to multiple
75-
// parts or the schema was not defined. To determine if the schema was defined,
76-
// we check spark.sql.sources.schema.numOfParts.
77-
Option(table.getProperty("spark.sql.sources.schema.numOfParts")) match {
78-
case Some(numOfParts) =>
79-
val parts = (0 until numOfParts.toInt).map { index =>
80-
Option(table.getProperty(s"spark.sql.sources.schema.part.${index}"))
81-
.getOrElse("Could not read schema from the metastore because it is corrupted.")
82-
}
83-
// Stick all parts back to a single schema string in the JSON representation.
84-
Some(parts.mkString)
85-
case None => None // The schema was not defined.
72+
val schemaString = Option(table.getProperty("spark.sql.sources.schema.numOfParts")) match {
73+
case Some(numOfParts) =>
74+
val parts = (0 until numOfParts.toInt).map { index =>
75+
Option(table.getProperty(s"spark.sql.sources.schema.part.${index}"))
76+
.getOrElse("Could not read schema from the metastore because it is corrupted.")
8677
}
87-
}
78+
// Stick all parts back to a single schema string in the JSON representation.
79+
Some(parts.mkString)
80+
case None => None // The schema was not defined.
81+
}
8882

8983
val userSpecifiedSchema =
9084
schemaString.flatMap(s => Some(DataType.fromJson(s).asInstanceOf[StructType]))
@@ -133,23 +127,12 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
133127
if (userSpecifiedSchema.isDefined) {
134128
val threshold = hive.conf.schemaStringLengthThreshold
135129
val schemaJsonString = userSpecifiedSchema.get.json
136-
// Check if the size of the JSON string of the schema exceeds the threshold.
137-
if (schemaJsonString.size > threshold) {
138-
// Need to split the string.
139-
val parts = schemaJsonString.grouped(threshold).toSeq
140-
// First, record the total number of parts we have.
141-
tbl.setProperty("spark.sql.sources.schema.numOfParts", parts.size.toString)
142-
// Second, write every part to table property.
143-
parts.zipWithIndex.foreach {
144-
case (part, index) =>
145-
tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part)
146-
}
147-
} else {
148-
// The length is less than the threshold, just put it in the table property.
149-
tbl.setProperty("spark.sql.sources.schema.numOfParts", "1")
150-
// We use spark.sql.sources.schema instead of using spark.sql.sources.schema.part.0
151-
// because users may have already created data source tables in metastore.
152-
tbl.setProperty("spark.sql.sources.schema", schemaJsonString)
130+
// Split the JSON string.
131+
val parts = schemaJsonString.grouped(threshold).toSeq
132+
tbl.setProperty("spark.sql.sources.schema.numOfParts", parts.size.toString)
133+
parts.zipWithIndex.foreach {
134+
case (part, index) =>
135+
tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part)
153136
}
154137
}
155138
options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }

0 commit comments

Comments
 (0)