Skip to content

Commit abfb6f5

Browse files
committed
modify util func
1 parent b6bc466 commit abfb6f5

File tree

22 files changed

+89
-85
lines changed

22 files changed

+89
-85
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -166,24 +166,26 @@ object CatalogUtils {
166166

167167
/**
168168
* Convert URI to String.
169-
* Since URI.toString does not decode for the uri string, we need to use
170-
* Path(uri).toString to decode it.
169+
* Since URI.toString does not decode the uri, e.g. change '%25' to '%'.
170+
* Here we create a hadoop Path with the given URI, and rely on Path.toString
171+
* to decode the uri
171172
* @param uri the URI of the path
172173
* @return the String of the path
173174
*/
174-
def URIToString(uri: Option[URI]): Option[String] = {
175-
uri.map(new Path(_).toString)
175+
def URIToString(uri: URI): String = {
176+
new Path(uri).toString
176177
}
177178

178179
/**
179180
* Convert String to URI.
180-
* Since new URI(string) does not encode for the path string, we need to use
181-
* Path(string).toURI to encode it.
181+
* Since new URI(string) does not encode string, e.g. change '%' to '%25'.
182+
* Here we create a hadoop Path with the given String, and rely on Path.toUri
183+
* to encode the string
182184
* @param str the String of the path
183185
* @return the URI of the path
184186
*/
185-
def stringToURI(str: Option[String]): Option[URI] = {
186-
str.map(new Path(_).toUri)
187+
def stringToURI(str: String): URI = {
188+
new Path(str).toUri
187189
}
188190

189191
private def normalizeColumnName(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,10 @@ class SessionCatalog(
132132
* does not contain a scheme, this path will not be changed after the default
133133
* FileSystem is changed.
134134
*/
135-
private def makeQualifiedPath(path: URI): Path = {
135+
private def makeQualifiedPath(path: URI): URI = {
136136
val hadoopPath = new Path(path)
137137
val fs = hadoopPath.getFileSystem(hadoopConf)
138-
fs.makeQualified(hadoopPath)
138+
fs.makeQualified(hadoopPath).toUri
139139
}
140140

141141
private def requireDbExists(db: String): Unit = {
@@ -171,7 +171,7 @@ class SessionCatalog(
171171
"you cannot create a database with this name.")
172172
}
173173
validateName(dbName)
174-
val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toUri
174+
val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri)
175175
externalCatalog.createDatabase(
176176
dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
177177
ignoreIfExists)
@@ -229,9 +229,9 @@ class SessionCatalog(
229229
* Get the path for creating a non-default database when database location is not provided
230230
* by users.
231231
*/
232-
def getDefaultDBPath(db: String): String = {
232+
def getDefaultDBPath(db: String): URI = {
233233
val database = formatDatabaseName(db)
234-
new Path(new Path(conf.warehousePath), database + ".db").toString
234+
new Path(new Path(conf.warehousePath), database + ".db").toUri
235235
}
236236

237237
// ----------------------------------------------------------------------------
@@ -352,11 +352,11 @@ class SessionCatalog(
352352
db, table, loadPath, spec, isOverwrite, inheritTableSpecs, isSrcLocal)
353353
}
354354

355-
def defaultTablePath(tableIdent: TableIdentifier): String = {
355+
def defaultTablePath(tableIdent: TableIdentifier): URI = {
356356
val dbName = formatDatabaseName(tableIdent.database.getOrElse(getCurrentDatabase))
357357
val dbLocation = getDatabaseMetadata(dbName).locationUri
358358

359-
new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString
359+
new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toUri
360360
}
361361

362362
// ----------------------------------------------

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
555555
test("alter partitions") {
556556
val catalog = newBasicCatalog()
557557
try {
558-
val newLocation = new Path(newUriForDatabase()).toUri
558+
val newLocation = newUriForDatabase()
559559
val newSerde = "com.sparkbricks.text.EasySerde"
560560
val newSerdeProps = Map("spark" -> "bricks", "compressed" -> "false")
561561
// alter but keep spec the same

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import scala.collection.JavaConverters._
2121

2222
import org.antlr.v4.runtime.{ParserRuleContext, Token}
2323
import org.antlr.v4.runtime.tree.TerminalNode
24-
import org.apache.hadoop.fs.Path
2524

2625
import org.apache.spark.sql.SaveMode
2726
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
@@ -387,7 +386,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
387386
"LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " +
388387
"you can only specify one of them.", ctx)
389388
}
390-
val customLocation = storage.locationUri.orElse(CatalogUtils.stringToURI(location))
389+
val customLocation = storage.locationUri.orElse(location.map(CatalogUtils.stringToURI(_)))
391390

392391
val tableType = if (customLocation.isDefined) {
393392
CatalogTableType.EXTERNAL
@@ -1082,7 +1081,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
10821081
operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx)
10831082
}
10841083

1085-
val locUri = CatalogUtils.stringToURI(location)
1084+
val locUri = location.map(CatalogUtils.stringToURI(_))
10861085
val storage = CatalogStorageFormat(
10871086
locationUri = locUri,
10881087
inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
5858

5959
// Create the relation to validate the arguments before writing the metadata to the metastore,
6060
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
61-
val pathOption = CatalogUtils.URIToString(table.storage.locationUri).map("path" -> _)
61+
val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
6262
// Fill in some default table options from the session conf
6363
val tableWithDefaultOptions = table.copy(
6464
identifier = table.identifier.copy(
@@ -184,7 +184,7 @@ case class CreateDataSourceTableAsSelectCommand(
184184
mode: SaveMode,
185185
tableExists: Boolean): BaseRelation = {
186186
// Create the relation based on the input logical plan: `data`.
187-
val pathOption = CatalogUtils.URIToString(tableLocation).map("path" -> _)
187+
val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
188188
val dataSource = DataSource(
189189
session,
190190
className = table.provider.get,

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ case class CreateDatabaseCommand(
6666
CatalogDatabase(
6767
databaseName,
6868
comment.getOrElse(""),
69-
new Path(path.getOrElse(catalog.getDefaultDBPath(databaseName))).toUri,
69+
path.map(CatalogUtils.stringToURI(_)).getOrElse(catalog.getDefaultDBPath(databaseName)),
7070
props),
7171
ifNotExists)
7272
Seq.empty[Row]
@@ -427,7 +427,7 @@ case class AlterTableAddPartitionCommand(
427427
sparkSession.sessionState.conf.resolver)
428428
// inherit table storage format (possibly except for location)
429429
CatalogTablePartition(normalizedSpec, table.storage.copy(
430-
locationUri = CatalogUtils.stringToURI(location)))
430+
locationUri = location.map(CatalogUtils.stringToURI(_))))
431431
}
432432
catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)
433433
Seq.empty[Row]
@@ -742,7 +742,7 @@ case class AlterTableSetLocationCommand(
742742
override def run(sparkSession: SparkSession): Seq[Row] = {
743743
val catalog = sparkSession.sessionState.catalog
744744
val table = catalog.getTableMetadata(tableName)
745-
val locUri = new Path(location).toUri
745+
val locUri = CatalogUtils.stringToURI(location)
746746
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
747747
partitionSpec match {
748748
case Some(spec) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ case class CreateTableLikeCommand(
7979
CatalogTable(
8080
identifier = targetTable,
8181
tableType = tblType,
82-
storage = sourceTableDesc.storage.copy(locationUri = CatalogUtils.stringToURI(location)),
82+
storage = sourceTableDesc.storage.copy(
83+
locationUri = location.map(CatalogUtils.stringToURI(_))),
8384
schema = sourceTableDesc.schema,
8485
provider = newProvider,
8586
partitionColumnNames = sourceTableDesc.partitionColumnNames,
@@ -495,7 +496,7 @@ case class DescribeTableCommand(
495496
append(buffer, "Owner:", table.owner, "")
496497
append(buffer, "Create Time:", new Date(table.createTime).toString, "")
497498
append(buffer, "Last Access Time:", new Date(table.lastAccessTime).toString, "")
498-
append(buffer, "Location:", CatalogUtils.URIToString(table.storage.locationUri)
499+
append(buffer, "Location:", table.storage.locationUri.map(CatalogUtils.URIToString(_))
499500
.getOrElse(""), "")
500501
append(buffer, "Table Type:", table.tableType.name, "")
501502
table.stats.foreach(s => append(buffer, "Statistics:", s.simpleString, ""))
@@ -588,7 +589,7 @@ case class DescribeTableCommand(
588589
append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", ")}]", "")
589590
append(buffer, "Database:", table.database, "")
590591
append(buffer, "Table:", tableIdentifier.table, "")
591-
append(buffer, "Location:", CatalogUtils.URIToString(partition.storage.locationUri)
592+
append(buffer, "Location:", partition.storage.locationUri.map(CatalogUtils.URIToString(_))
592593
.getOrElse(""), "")
593594
append(buffer, "Partition Parameters:", "", "")
594595
partition.parameters.foreach { case (key, value) =>
@@ -955,7 +956,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
955956
// when the table creation DDL contains the PATH option.
956957
None
957958
} else {
958-
Some(s"path '${escapeSingleQuotedString(new Path(location).toString)}'")
959+
Some(s"path '${escapeSingleQuotedString(CatalogUtils.URIToString(location))}'")
959960
}
960961
}
961962

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,6 @@ object DataSource {
598598
val path = CaseInsensitiveMap(options).get("path")
599599
val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path")
600600
CatalogStorageFormat.empty.copy(
601-
locationUri = CatalogUtils.stringToURI(path), properties = optionsWithoutPath)
601+
locationUri = path.map(CatalogUtils.stringToURI(_)), properties = optionsWithoutPath)
602602
}
603603
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
222222

223223
val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
224224
override def call(): LogicalPlan = {
225-
val pathOption = CatalogUtils.URIToString(table.storage.locationUri).map("path" -> _)
225+
val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
226226
val dataSource =
227227
DataSource(
228228
sparkSession,

sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
7979
new Database(
8080
name = metadata.name,
8181
description = metadata.description,
82-
locationUri = new Path(metadata.locationUri).toString)
82+
locationUri = CatalogUtils.URIToString(metadata.locationUri))
8383
}
8484

8585
/**

0 commit comments

Comments
 (0)