Skip to content

Commit f1e09f4

Browse files
author
Marcelo Vanzin
committed
[SPARK-18752][hive] "isSrcLocal" value should be set from user query.
The value of the "isSrcLocal" parameter passed to Hive's loadTable and loadPartition methods needs to be set according to the user query (e.g. "LOAD DATA LOCAL"), and not the current code that tries to guess what it should be. For existing versions of Hive the current behavior is probably ok, but some recent changes in the Hive code changed the semantics slightly, making code that sets "isSrcLocal" to "true" incorrectly to do the wrong thing. It would end up moving the parent directory of the files into the final location, instead of the file themselves, resulting in a table that cannot be read.
1 parent eeed38e commit f1e09f4

File tree

10 files changed

+60
-38
lines changed

10 files changed

+60
-38
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ abstract class ExternalCatalog {
119119
table: String,
120120
loadPath: String,
121121
isOverwrite: Boolean,
122-
holdDDLTime: Boolean): Unit
122+
holdDDLTime: Boolean,
123+
isSrcLocal: Boolean): Unit
123124

124125
def loadPartition(
125126
db: String,
@@ -128,7 +129,8 @@ abstract class ExternalCatalog {
128129
partition: TablePartitionSpec,
129130
isOverwrite: Boolean,
130131
holdDDLTime: Boolean,
131-
inheritTableSpecs: Boolean): Unit
132+
inheritTableSpecs: Boolean,
133+
isSrcLocal: Boolean): Unit
132134

133135
def loadDynamicPartitions(
134136
db: String,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,8 @@ class InMemoryCatalog(
312312
table: String,
313313
loadPath: String,
314314
isOverwrite: Boolean,
315-
holdDDLTime: Boolean): Unit = {
315+
holdDDLTime: Boolean,
316+
isSrcLocal: Boolean): Unit = {
316317
throw new UnsupportedOperationException("loadTable is not implemented")
317318
}
318319

@@ -323,7 +324,8 @@ class InMemoryCatalog(
323324
partition: TablePartitionSpec,
324325
isOverwrite: Boolean,
325326
holdDDLTime: Boolean,
326-
inheritTableSpecs: Boolean): Unit = {
327+
inheritTableSpecs: Boolean,
328+
isSrcLocal: Boolean): Unit = {
327329
throw new UnsupportedOperationException("loadPartition is not implemented.")
328330
}
329331

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -311,12 +311,13 @@ class SessionCatalog(
311311
name: TableIdentifier,
312312
loadPath: String,
313313
isOverwrite: Boolean,
314-
holdDDLTime: Boolean): Unit = {
314+
holdDDLTime: Boolean,
315+
isSrcLocal: Boolean): Unit = {
315316
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
316317
val table = formatTableName(name.table)
317318
requireDbExists(db)
318319
requireTableExists(TableIdentifier(table, Some(db)))
319-
externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime)
320+
externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime, isSrcLocal)
320321
}
321322

322323
/**
@@ -330,13 +331,14 @@ class SessionCatalog(
330331
partition: TablePartitionSpec,
331332
isOverwrite: Boolean,
332333
holdDDLTime: Boolean,
333-
inheritTableSpecs: Boolean): Unit = {
334+
inheritTableSpecs: Boolean,
335+
isSrcLocal: Boolean): Unit = {
334336
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
335337
val table = formatTableName(name.table)
336338
requireDbExists(db)
337339
requireTableExists(TableIdentifier(table, Some(db)))
338340
externalCatalog.loadPartition(
339-
db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs)
341+
db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
340342
}
341343

342344
def defaultTablePath(tableIdent: TableIdentifier): String = {

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,13 +297,15 @@ case class LoadDataCommand(
297297
partition.get,
298298
isOverwrite,
299299
holdDDLTime = false,
300-
inheritTableSpecs = true)
300+
inheritTableSpecs = true,
301+
isSrcLocal = isLocal)
301302
} else {
302303
catalog.loadTable(
303304
targetTable.identifier,
304305
loadPath.toString,
305306
isOverwrite,
306-
holdDDLTime = false)
307+
holdDDLTime = false,
308+
isSrcLocal = isLocal)
307309
}
308310
Seq.empty[Row]
309311
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -736,13 +736,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
736736
table: String,
737737
loadPath: String,
738738
isOverwrite: Boolean,
739-
holdDDLTime: Boolean): Unit = withClient {
739+
holdDDLTime: Boolean,
740+
isSrcLocal: Boolean): Unit = withClient {
740741
requireTableExists(db, table)
741742
client.loadTable(
742743
loadPath,
743744
s"$db.$table",
744745
isOverwrite,
745-
holdDDLTime)
746+
holdDDLTime,
747+
isSrcLocal)
746748
}
747749

748750
override def loadPartition(
@@ -752,7 +754,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
752754
partition: TablePartitionSpec,
753755
isOverwrite: Boolean,
754756
holdDDLTime: Boolean,
755-
inheritTableSpecs: Boolean): Unit = withClient {
757+
inheritTableSpecs: Boolean,
758+
isSrcLocal: Boolean): Unit = withClient {
756759
requireTableExists(db, table)
757760

758761
val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
@@ -771,7 +774,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
771774
orderedPartitionSpec,
772775
isOverwrite,
773776
holdDDLTime,
774-
inheritTableSpecs)
777+
inheritTableSpecs,
778+
isSrcLocal)
775779
}
776780

777781
override def loadDynamicPartitions(

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,14 +211,16 @@ private[hive] trait HiveClient {
211211
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
212212
replace: Boolean,
213213
holdDDLTime: Boolean,
214-
inheritTableSpecs: Boolean): Unit
214+
inheritTableSpecs: Boolean,
215+
isSrcLocal: Boolean): Unit
215216

216217
/** Loads data into an existing table. */
217218
def loadTable(
218219
loadPath: String, // TODO URI
219220
tableName: String,
220221
replace: Boolean,
221-
holdDDLTime: Boolean): Unit
222+
holdDDLTime: Boolean,
223+
isSrcLocal: Boolean): Unit
222224

223225
/** Loads new dynamic partitions into an existing table. */
224226
def loadDynamicPartitions(

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,8 @@ private[hive] class HiveClientImpl(
651651
partSpec: java.util.LinkedHashMap[String, String],
652652
replace: Boolean,
653653
holdDDLTime: Boolean,
654-
inheritTableSpecs: Boolean): Unit = withHiveState {
654+
inheritTableSpecs: Boolean,
655+
isSrcLocal: Boolean): Unit = withHiveState {
655656
val hiveTable = client.getTable(dbName, tableName, true /* throw exception */)
656657
shim.loadPartition(
657658
client,
@@ -661,20 +662,23 @@ private[hive] class HiveClientImpl(
661662
replace,
662663
holdDDLTime,
663664
inheritTableSpecs,
664-
isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories)
665+
isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories,
666+
isSrcLocal = isSrcLocal)
665667
}
666668

667669
def loadTable(
668670
loadPath: String, // TODO URI
669671
tableName: String,
670672
replace: Boolean,
671-
holdDDLTime: Boolean): Unit = withHiveState {
673+
holdDDLTime: Boolean,
674+
isSrcLocal: Boolean): Unit = withHiveState {
672675
shim.loadTable(
673676
client,
674677
new Path(loadPath),
675678
tableName,
676679
replace,
677-
holdDDLTime)
680+
holdDDLTime,
681+
isSrcLocal)
678682
}
679683

680684
def loadDynamicPartitions(

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,16 @@ private[client] sealed abstract class Shim {
9898
replace: Boolean,
9999
holdDDLTime: Boolean,
100100
inheritTableSpecs: Boolean,
101-
isSkewedStoreAsSubdir: Boolean): Unit
101+
isSkewedStoreAsSubdir: Boolean,
102+
isSrcLocal: Boolean): Unit
102103

103104
def loadTable(
104105
hive: Hive,
105106
loadPath: Path,
106107
tableName: String,
107108
replace: Boolean,
108-
holdDDLTime: Boolean): Unit
109+
holdDDLTime: Boolean,
110+
isSrcLocal: Boolean): Unit
109111

110112
def loadDynamicPartitions(
111113
hive: Hive,
@@ -332,7 +334,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
332334
replace: Boolean,
333335
holdDDLTime: Boolean,
334336
inheritTableSpecs: Boolean,
335-
isSkewedStoreAsSubdir: Boolean): Unit = {
337+
isSkewedStoreAsSubdir: Boolean,
338+
isSrcLocal: Boolean): Unit = {
336339
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
337340
holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean)
338341
}
@@ -342,7 +345,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
342345
loadPath: Path,
343346
tableName: String,
344347
replace: Boolean,
345-
holdDDLTime: Boolean): Unit = {
348+
holdDDLTime: Boolean,
349+
isSrcLocal: Boolean): Unit = {
346350
loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean)
347351
}
348352

@@ -698,20 +702,22 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
698702
replace: Boolean,
699703
holdDDLTime: Boolean,
700704
inheritTableSpecs: Boolean,
701-
isSkewedStoreAsSubdir: Boolean): Unit = {
705+
isSkewedStoreAsSubdir: Boolean,
706+
isSrcLocal: Boolean): Unit = {
702707
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
703708
holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
704-
isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE)
709+
isSrcLocal: JBoolean, JBoolean.FALSE)
705710
}
706711

707712
override def loadTable(
708713
hive: Hive,
709714
loadPath: Path,
710715
tableName: String,
711716
replace: Boolean,
712-
holdDDLTime: Boolean): Unit = {
717+
holdDDLTime: Boolean,
718+
isSrcLocal: Boolean): Unit = {
713719
loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean,
714-
isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE, JBoolean.FALSE)
720+
isSrcLocal: JBoolean, JBoolean.FALSE, JBoolean.FALSE)
715721
}
716722

717723
override def loadDynamicPartitions(
@@ -749,12 +755,6 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
749755
TimeUnit.MILLISECONDS).asInstanceOf[Long]
750756
}
751757

752-
protected def isSrcLocal(path: Path, conf: HiveConf): Boolean = {
753-
val localFs = FileSystem.getLocal(conf)
754-
val pathFs = FileSystem.get(path.toUri(), conf)
755-
localFs.getUri() == pathFs.getUri()
756-
}
757-
758758
}
759759

760760
private[client] class Shim_v1_0 extends Shim_v0_14 {

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,8 @@ case class InsertIntoHiveTable(
316316
partitionSpec,
317317
isOverwrite = doHiveOverwrite,
318318
holdDDLTime = holdDDLTime,
319-
inheritTableSpecs = inheritTableSpecs)
319+
inheritTableSpecs = inheritTableSpecs,
320+
isSrcLocal = false)
320321
}
321322
}
322323
} else {
@@ -325,7 +326,8 @@ case class InsertIntoHiveTable(
325326
table.catalogTable.identifier.table,
326327
outputPath.toString, // TODO: URI
327328
overwrite,
328-
holdDDLTime)
329+
holdDDLTime,
330+
isSrcLocal = false)
329331
}
330332

331333
// Invalidate the cache.

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
172172
emptyDir,
173173
tableName = "src",
174174
replace = false,
175-
holdDDLTime = false)
175+
holdDDLTime = false,
176+
isSrcLocal = false)
176177
}
177178

178179
test(s"$version: tableExists") {
@@ -310,7 +311,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
310311
partSpec,
311312
replace = false,
312313
holdDDLTime = false,
313-
inheritTableSpecs = false)
314+
inheritTableSpecs = false,
315+
isSrcLocal = false)
314316
}
315317

316318
test(s"$version: loadDynamicPartitions") {

0 commit comments

Comments
 (0)