Skip to content

Commit c602894

Browse files
cloud-fanrxin
authored andcommitted
[SPARK-17990][SPARK-18302][SQL] correct several partition related behaviours of ExternalCatalog
## What changes were proposed in this pull request? This PR corrects several partition related behaviors of `ExternalCatalog`: 1. default partition location should not always lower case the partition column names in path string(fix `HiveExternalCatalog`) 2. rename partition should not always lower case the partition column names in updated partition path string(fix `HiveExternalCatalog`) 3. rename partition should update the partition location only for managed table(fix `InMemoryCatalog`) 4. create partition with existing directory should be fine(fix `InMemoryCatalog`) 5. create partition with non-existing directory should create that directory(fix `InMemoryCatalog`) 6. drop partition from external table should not delete the directory(fix `InMemoryCatalog`) ## How was this patch tested? new tests in `ExternalCatalogSuite` Author: Wenchen Fan <[email protected]> Closes #15797 from cloud-fan/partition. (cherry picked from commit 2f7461f) Signed-off-by: Reynold Xin <[email protected]>
1 parent be3933d commit c602894

File tree

19 files changed

+397
-208
lines changed

19 files changed

+397
-208
lines changed
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.catalog
19+
20+
import org.apache.hadoop.fs.Path
21+
import org.apache.hadoop.util.Shell
22+
23+
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
24+
25+
object ExternalCatalogUtils {
26+
// This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since catalyst doesn't
27+
// depend on Hive.
28+
val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"
29+
30+
//////////////////////////////////////////////////////////////////////////////////////////////////
31+
// The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils).
32+
//////////////////////////////////////////////////////////////////////////////////////////////////
33+
34+
val charToEscape = {
35+
val bitSet = new java.util.BitSet(128)
36+
37+
/**
38+
* ASCII 01-1F are HTTP control characters that need to be escaped.
39+
* \u000A and \u000D are \n and \r, respectively.
40+
*/
41+
val clist = Array(
42+
'\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009',
43+
'\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013',
44+
'\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C',
45+
'\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F',
46+
'{', '[', ']', '^')
47+
48+
clist.foreach(bitSet.set(_))
49+
50+
if (Shell.WINDOWS) {
51+
Array(' ', '<', '>', '|').foreach(bitSet.set(_))
52+
}
53+
54+
bitSet
55+
}
56+
57+
def needsEscaping(c: Char): Boolean = {
58+
c >= 0 && c < charToEscape.size() && charToEscape.get(c)
59+
}
60+
61+
def escapePathName(path: String): String = {
62+
val builder = new StringBuilder()
63+
path.foreach { c =>
64+
if (needsEscaping(c)) {
65+
builder.append('%')
66+
builder.append(f"${c.asInstanceOf[Int]}%02X")
67+
} else {
68+
builder.append(c)
69+
}
70+
}
71+
72+
builder.toString()
73+
}
74+
75+
76+
def unescapePathName(path: String): String = {
77+
val sb = new StringBuilder
78+
var i = 0
79+
80+
while (i < path.length) {
81+
val c = path.charAt(i)
82+
if (c == '%' && i + 2 < path.length) {
83+
val code: Int = try {
84+
Integer.parseInt(path.substring(i + 1, i + 3), 16)
85+
} catch {
86+
case _: Exception => -1
87+
}
88+
if (code >= 0) {
89+
sb.append(code.asInstanceOf[Char])
90+
i += 3
91+
} else {
92+
sb.append(c)
93+
i += 1
94+
}
95+
} else {
96+
sb.append(c)
97+
i += 1
98+
}
99+
}
100+
101+
sb.toString()
102+
}
103+
104+
def generatePartitionPath(
105+
spec: TablePartitionSpec,
106+
partitionColumnNames: Seq[String],
107+
tablePath: Path): Path = {
108+
val partitionPathStrings = partitionColumnNames.map { col =>
109+
val partitionValue = spec(col)
110+
val partitionString = if (partitionValue == null) {
111+
DEFAULT_PARTITION_NAME
112+
} else {
113+
escapePathName(partitionValue)
114+
}
115+
escapePathName(col) + "=" + partitionString
116+
}
117+
partitionPathStrings.foldLeft(tablePath) { (totalPath, nextPartPath) =>
118+
new Path(totalPath, nextPartPath)
119+
}
120+
}
121+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 46 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ class InMemoryCatalog(
231231
assert(tableMeta.storage.locationUri.isDefined,
232232
"Managed table should always have table location, as we will assign a default location " +
233233
"to it if it doesn't have one.")
234-
val dir = new Path(tableMeta.storage.locationUri.get)
234+
val dir = new Path(tableMeta.location)
235235
try {
236236
val fs = dir.getFileSystem(hadoopConfig)
237237
fs.delete(dir, true)
@@ -259,7 +259,7 @@ class InMemoryCatalog(
259259
assert(oldDesc.table.storage.locationUri.isDefined,
260260
"Managed table should always have table location, as we will assign a default location " +
261261
"to it if it doesn't have one.")
262-
val oldDir = new Path(oldDesc.table.storage.locationUri.get)
262+
val oldDir = new Path(oldDesc.table.location)
263263
val newDir = new Path(catalog(db).db.locationUri, newName)
264264
try {
265265
val fs = oldDir.getFileSystem(hadoopConfig)
@@ -355,25 +355,28 @@ class InMemoryCatalog(
355355
}
356356
}
357357

358-
val tableDir = new Path(catalog(db).db.locationUri, table)
359-
val partitionColumnNames = getTable(db, table).partitionColumnNames
358+
val tableMeta = getTable(db, table)
359+
val partitionColumnNames = tableMeta.partitionColumnNames
360+
val tablePath = new Path(tableMeta.location)
360361
// TODO: we should follow hive to roll back if one partition path failed to create.
361362
parts.foreach { p =>
362-
// If location is set, the partition is using an external partition location and we don't
363-
// need to handle its directory.
364-
if (p.storage.locationUri.isEmpty) {
365-
val partitionPath = partitionColumnNames.flatMap { col =>
366-
p.spec.get(col).map(col + "=" + _)
367-
}.mkString("/")
368-
try {
369-
val fs = tableDir.getFileSystem(hadoopConfig)
370-
fs.mkdirs(new Path(tableDir, partitionPath))
371-
} catch {
372-
case e: IOException =>
373-
throw new SparkException(s"Unable to create partition path $partitionPath", e)
363+
val partitionPath = p.storage.locationUri.map(new Path(_)).getOrElse {
364+
ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath)
365+
}
366+
367+
try {
368+
val fs = tablePath.getFileSystem(hadoopConfig)
369+
if (!fs.exists(partitionPath)) {
370+
fs.mkdirs(partitionPath)
374371
}
372+
} catch {
373+
case e: IOException =>
374+
throw new SparkException(s"Unable to create partition path $partitionPath", e)
375375
}
376-
existingParts.put(p.spec, p)
376+
377+
existingParts.put(
378+
p.spec,
379+
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString))))
377380
}
378381
}
379382

@@ -392,19 +395,15 @@ class InMemoryCatalog(
392395
}
393396
}
394397

395-
val tableDir = new Path(catalog(db).db.locationUri, table)
396-
val partitionColumnNames = getTable(db, table).partitionColumnNames
397-
// TODO: we should follow hive to roll back if one partition path failed to delete.
398+
val shouldRemovePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED
399+
// TODO: we should follow hive to roll back if one partition path failed to delete, and support
400+
// partial partition spec.
398401
partSpecs.foreach { p =>
399-
// If location is set, the partition is using an external partition location and we don't
400-
// need to handle its directory.
401-
if (existingParts.contains(p) && existingParts(p).storage.locationUri.isEmpty) {
402-
val partitionPath = partitionColumnNames.flatMap { col =>
403-
p.get(col).map(col + "=" + _)
404-
}.mkString("/")
402+
if (existingParts.contains(p) && shouldRemovePartitionLocation) {
403+
val partitionPath = new Path(existingParts(p).location)
405404
try {
406-
val fs = tableDir.getFileSystem(hadoopConfig)
407-
fs.delete(new Path(tableDir, partitionPath), true)
405+
val fs = partitionPath.getFileSystem(hadoopConfig)
406+
fs.delete(partitionPath, true)
408407
} catch {
409408
case e: IOException =>
410409
throw new SparkException(s"Unable to delete partition path $partitionPath", e)
@@ -423,33 +422,34 @@ class InMemoryCatalog(
423422
requirePartitionsExist(db, table, specs)
424423
requirePartitionsNotExist(db, table, newSpecs)
425424

426-
val tableDir = new Path(catalog(db).db.locationUri, table)
427-
val partitionColumnNames = getTable(db, table).partitionColumnNames
425+
val tableMeta = getTable(db, table)
426+
val partitionColumnNames = tableMeta.partitionColumnNames
427+
val tablePath = new Path(tableMeta.location)
428+
val shouldUpdatePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED
429+
val existingParts = catalog(db).tables(table).partitions
428430
// TODO: we should follow hive to roll back if one partition path failed to rename.
429431
specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
430-
val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec)
431-
val existingParts = catalog(db).tables(table).partitions
432-
433-
// If location is set, the partition is using an external partition location and we don't
434-
// need to handle its directory.
435-
if (newPart.storage.locationUri.isEmpty) {
436-
val oldPath = partitionColumnNames.flatMap { col =>
437-
oldSpec.get(col).map(col + "=" + _)
438-
}.mkString("/")
439-
val newPath = partitionColumnNames.flatMap { col =>
440-
newSpec.get(col).map(col + "=" + _)
441-
}.mkString("/")
432+
val oldPartition = getPartition(db, table, oldSpec)
433+
val newPartition = if (shouldUpdatePartitionLocation) {
434+
val oldPartPath = new Path(oldPartition.location)
435+
val newPartPath = ExternalCatalogUtils.generatePartitionPath(
436+
newSpec, partitionColumnNames, tablePath)
442437
try {
443-
val fs = tableDir.getFileSystem(hadoopConfig)
444-
fs.rename(new Path(tableDir, oldPath), new Path(tableDir, newPath))
438+
val fs = tablePath.getFileSystem(hadoopConfig)
439+
fs.rename(oldPartPath, newPartPath)
445440
} catch {
446441
case e: IOException =>
447-
throw new SparkException(s"Unable to rename partition path $oldPath", e)
442+
throw new SparkException(s"Unable to rename partition path $oldPartPath", e)
448443
}
444+
oldPartition.copy(
445+
spec = newSpec,
446+
storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toString)))
447+
} else {
448+
oldPartition.copy(spec = newSpec)
449449
}
450450

451451
existingParts.remove(oldSpec)
452-
existingParts.put(newSpec, newPart)
452+
existingParts.put(newSpec, newPartition)
453453
}
454454
}
455455

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ case class CatalogTablePartition(
9999
output.filter(_.nonEmpty).mkString("CatalogPartition(\n\t", "\n\t", ")")
100100
}
101101

102+
/** Return the partition location, assuming it is specified. */
103+
def location: String = storage.locationUri.getOrElse {
104+
val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
105+
throw new AnalysisException(s"Partition [$specString] did not specify locationUri")
106+
}
107+
102108
/**
103109
* Given the partition schema, returns a row with that schema holding the partition values.
104110
*/
@@ -171,6 +177,11 @@ case class CatalogTable(
171177
throw new AnalysisException(s"table $identifier did not specify database")
172178
}
173179

180+
/** Return the table location, assuming it is specified. */
181+
def location: String = storage.locationUri.getOrElse {
182+
throw new AnalysisException(s"table $identifier did not specify locationUri")
183+
}
184+
174185
/** Return the fully qualified name of this table, assuming the database was specified. */
175186
def qualifiedName: String = identifier.unquotedString
176187

0 commit comments

Comments
 (0)