From f1e09f44d2c5e8c613582e84647dcec5de0a5ff3 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 6 Dec 2016 14:44:03 -0800 Subject: [PATCH 1/5] [SPARK-18752][hive] "isSrcLocal" value should be set from user query. The value of the "isSrcLocal" parameter passed to Hive's loadTable and loadPartition methods needs to be set according to the user query (e.g. "LOAD DATA LOCAL"), and not the current code that tries to guess what it should be. For existing versions of Hive the current behavior is probably ok, but some recent changes in the Hive code changed the semantics slightly, making code that sets "isSrcLocal" to "true" incorrectly to do the wrong thing. It would end up moving the parent directory of the files into the final location, instead of the file themselves, resulting in a table that cannot be read. --- .../catalyst/catalog/ExternalCatalog.scala | 6 ++-- .../catalyst/catalog/InMemoryCatalog.scala | 6 ++-- .../sql/catalyst/catalog/SessionCatalog.scala | 10 ++++--- .../spark/sql/execution/command/tables.scala | 6 ++-- .../spark/sql/hive/HiveExternalCatalog.scala | 12 +++++--- .../spark/sql/hive/client/HiveClient.scala | 6 ++-- .../sql/hive/client/HiveClientImpl.scala | 12 +++++--- .../spark/sql/hive/client/HiveShim.scala | 28 +++++++++---------- .../hive/execution/InsertIntoHiveTable.scala | 6 ++-- .../spark/sql/hive/client/VersionsSuite.scala | 6 ++-- 10 files changed, 60 insertions(+), 38 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 4b8cac8f32b0..d6b2a3e29739 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -119,7 +119,8 @@ abstract class ExternalCatalog { table: String, loadPath: String, isOverwrite: Boolean, - holdDDLTime: Boolean): Unit + holdDDLTime: Boolean, + isSrcLocal: Boolean): Unit def loadPartition( db: String, @@ -128,7 +129,8 @@ abstract class ExternalCatalog { partition: TablePartitionSpec, isOverwrite: Boolean, holdDDLTime: Boolean, - inheritTableSpecs: Boolean): Unit + inheritTableSpecs: Boolean, + isSrcLocal: Boolean): Unit def loadDynamicPartitions( db: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index a6bebe1a3938..816e4af2df66 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -312,7 +312,8 @@ class InMemoryCatalog( table: String, loadPath: String, isOverwrite: Boolean, - holdDDLTime: Boolean): Unit = { + holdDDLTime: Boolean, + isSrcLocal: Boolean): Unit = { throw new UnsupportedOperationException("loadTable is not implemented") } @@ -323,7 +324,8 @@ class InMemoryCatalog( partition: TablePartitionSpec, isOverwrite: Boolean, holdDDLTime: Boolean, - inheritTableSpecs: Boolean): Unit = { + inheritTableSpecs: Boolean, + isSrcLocal: Boolean): Unit = { throw new UnsupportedOperationException("loadPartition is not implemented.") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 7a3d2097a85c..e996a836fe73 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -311,12 +311,13 @@ class SessionCatalog( name: TableIdentifier, loadPath: String, isOverwrite: Boolean, - holdDDLTime: Boolean): Unit = { + holdDDLTime: Boolean, + isSrcLocal: Boolean): Unit = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Some(db))) - externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime) + externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime, isSrcLocal) } /** @@ -330,13 +331,14 @@ class SessionCatalog( partition: TablePartitionSpec, isOverwrite: Boolean, holdDDLTime: Boolean, - inheritTableSpecs: Boolean): Unit = { + inheritTableSpecs: Boolean, + isSrcLocal: Boolean): Unit = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Some(db))) externalCatalog.loadPartition( - db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs) + db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal) } def defaultTablePath(tableIdent: TableIdentifier): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 32e2f7573739..35c2ef040edc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -297,13 +297,15 @@ case class LoadDataCommand( partition.get, isOverwrite, holdDDLTime = false, - inheritTableSpecs = true) + inheritTableSpecs = true, + isSrcLocal = isLocal) } else { catalog.loadTable( targetTable.identifier, loadPath.toString, isOverwrite, - holdDDLTime = false) + holdDDLTime = false, + isSrcLocal = isLocal) } Seq.empty[Row] } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index f67ddc9be1a5..544f277cdf97 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -736,13 +736,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table: String, loadPath: String, isOverwrite: Boolean, - holdDDLTime: Boolean): Unit = withClient { + holdDDLTime: Boolean, + isSrcLocal: Boolean): Unit = withClient { requireTableExists(db, table) client.loadTable( loadPath, s"$db.$table", isOverwrite, - holdDDLTime) + holdDDLTime, + isSrcLocal) } override def loadPartition( @@ -752,7 +754,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat partition: TablePartitionSpec, isOverwrite: Boolean, holdDDLTime: Boolean, - inheritTableSpecs: Boolean): Unit = withClient { + inheritTableSpecs: Boolean, + isSrcLocal: Boolean): Unit = withClient { requireTableExists(db, table) val orderedPartitionSpec = new util.LinkedHashMap[String, String]() @@ -771,7 +774,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat orderedPartitionSpec, isOverwrite, holdDDLTime, - inheritTableSpecs) + inheritTableSpecs, + isSrcLocal) } override def loadDynamicPartitions( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 8e7c871183df..837b6c57fc24 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -211,14 +211,16 @@ private[hive] trait HiveClient { partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering replace: Boolean, holdDDLTime: Boolean, - inheritTableSpecs: Boolean): Unit + inheritTableSpecs: Boolean, + isSrcLocal: Boolean): Unit /** Loads data into an existing table. */ def loadTable( loadPath: String, // TODO URI tableName: String, replace: Boolean, - holdDDLTime: Boolean): Unit + holdDDLTime: Boolean, + isSrcLocal: Boolean): Unit /** Loads new dynamic partitions into an existing table. */ def loadDynamicPartitions( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index db73596e5f52..b75f6e98d505 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -651,7 +651,8 @@ private[hive] class HiveClientImpl( partSpec: java.util.LinkedHashMap[String, String], replace: Boolean, holdDDLTime: Boolean, - inheritTableSpecs: Boolean): Unit = withHiveState { + inheritTableSpecs: Boolean, + isSrcLocal: Boolean): Unit = withHiveState { val hiveTable = client.getTable(dbName, tableName, true /* throw exception */) shim.loadPartition( client, @@ -661,20 +662,23 @@ private[hive] class HiveClientImpl( replace, holdDDLTime, inheritTableSpecs, - isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories) + isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories, + isSrcLocal = isSrcLocal) } def loadTable( loadPath: String, // TODO URI tableName: String, replace: Boolean, - holdDDLTime: Boolean): Unit = withHiveState { + holdDDLTime: Boolean, + isSrcLocal: Boolean): Unit = withHiveState { shim.loadTable( client, new Path(loadPath), tableName, replace, - holdDDLTime) + holdDDLTime, + isSrcLocal) } def loadDynamicPartitions( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index e561706facf0..137ec267603e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -98,14 +98,16 @@ private[client] sealed abstract class Shim { replace: Boolean, holdDDLTime: Boolean, inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit + isSkewedStoreAsSubdir: Boolean, + isSrcLocal: Boolean): Unit def loadTable( hive: Hive, loadPath: Path, tableName: String, replace: Boolean, - holdDDLTime: Boolean): Unit + holdDDLTime: Boolean, + isSrcLocal: Boolean): Unit def loadDynamicPartitions( hive: Hive, @@ -332,7 +334,8 @@ private[client] class Shim_v0_12 extends Shim with Logging { replace: Boolean, holdDDLTime: Boolean, inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit = { + isSkewedStoreAsSubdir: Boolean, + isSrcLocal: Boolean): Unit = { loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean) } @@ -342,7 +345,8 @@ private[client] class Shim_v0_12 extends Shim with Logging { loadPath: Path, tableName: String, replace: Boolean, - holdDDLTime: Boolean): Unit = { + holdDDLTime: Boolean, + isSrcLocal: Boolean): Unit = { loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean) } @@ -698,10 +702,11 @@ private[client] class Shim_v0_14 extends Shim_v0_13 { replace: Boolean, holdDDLTime: Boolean, inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit = { + isSkewedStoreAsSubdir: Boolean, + isSrcLocal: Boolean): Unit = { loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean, - isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE) + isSrcLocal: JBoolean, JBoolean.FALSE) } override def loadTable( @@ -709,9 +714,10 @@ private[client] class Shim_v0_14 extends Shim_v0_13 { loadPath: Path, tableName: String, replace: Boolean, - holdDDLTime: Boolean): Unit = { + holdDDLTime: Boolean, + isSrcLocal: Boolean): Unit = { loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean, - isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE, JBoolean.FALSE) + isSrcLocal: JBoolean, JBoolean.FALSE, JBoolean.FALSE) } override def loadDynamicPartitions( @@ -749,12 +755,6 @@ private[client] class Shim_v0_14 extends Shim_v0_13 { TimeUnit.MILLISECONDS).asInstanceOf[Long] } - protected def isSrcLocal(path: Path, conf: HiveConf): Boolean = { - val localFs = FileSystem.getLocal(conf) - val pathFs = FileSystem.get(path.toUri(), conf) - localFs.getUri() == pathFs.getUri() - } - } private[client] class Shim_v1_0 extends Shim_v0_14 { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 5f5c8e2432d6..db2239d26aaa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -316,7 +316,8 @@ case class InsertIntoHiveTable( partitionSpec, isOverwrite = doHiveOverwrite, holdDDLTime = holdDDLTime, - inheritTableSpecs = inheritTableSpecs) + inheritTableSpecs = inheritTableSpecs, + isSrcLocal = false) } } } else { @@ -325,7 +326,8 @@ case class InsertIntoHiveTable( table.catalogTable.identifier.table, outputPath.toString, // TODO: URI overwrite, - holdDDLTime) + holdDDLTime, + isSrcLocal = false) } // Invalidate the cache. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 79e76b3134c2..a001048a9ea5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -172,7 +172,8 @@ class VersionsSuite extends SparkFunSuite with Logging { emptyDir, tableName = "src", replace = false, - holdDDLTime = false) + holdDDLTime = false, + isSrcLocal = false) } test(s"$version: tableExists") { @@ -310,7 +311,8 @@ class VersionsSuite extends SparkFunSuite with Logging { partSpec, replace = false, holdDDLTime = false, - inheritTableSpecs = false) + inheritTableSpecs = false, + isSrcLocal = false) } test(s"$version: loadDynamicPartitions") { From 93e07dbc6ebe75f1a8558e803605afca6a76c9e2 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 7 Dec 2016 10:33:20 -0800 Subject: [PATCH 2/5] Fix HiveCommandSuite. Need to make a copy of the input when using "LOAD DATA" vs. "LOAD DATA LOCAL" since Hive moves the input file in the former case. --- .../sql/hive/execution/HiveCommandSuite.scala | 38 ++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index 46ed18c70fb5..b5d38f06d7d9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql.hive.execution +import java.io.File + +import com.google.common.io.Files + import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException @@ -232,17 +236,20 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto sql("""LOAD DATA LOCAL INPATH "/non-existing/data.txt" INTO TABLE non_part_table""") } - val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath + val testData = hiveContext.getHiveFile("data/files/employee.dat") // Non-local inpath: without URI Scheme and Authority - sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""") + withCopy(testData) { tmp => + sql(s"""LOAD DATA INPATH "${tmp.getCanonicalPath()}" INTO TABLE non_part_table""") + } + checkAnswer( sql("SELECT * FROM non_part_table WHERE employeeID = 16"), Row(16, "john") :: Nil) // Use URI as LOCAL inpath: // file:/path/to/data/files/employee.dat - val uri = "file:" + testData + val uri = "file:" + testData.getCanonicalPath() sql(s"""LOAD DATA LOCAL INPATH "$uri" INTO TABLE non_part_table""") checkAnswer( @@ -250,13 +257,19 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto Row(16, "john") :: Row(16, "john") :: Nil) // Use URI as non-LOCAL inpath - sql(s"""LOAD DATA INPATH "$uri" INTO TABLE non_part_table""") + withCopy(testData) { tmp => + val tmpUri = "file:" + tmp.getCanonicalPath() + sql(s"""LOAD DATA INPATH "$tmpUri" INTO TABLE non_part_table""") + } checkAnswer( sql("SELECT * FROM non_part_table WHERE employeeID = 16"), Row(16, "john") :: Row(16, "john") :: Row(16, "john") :: Nil) - sql(s"""LOAD DATA INPATH "$uri" OVERWRITE INTO TABLE non_part_table""") + withCopy(testData) { tmp => + val tmpUri = "file:" + tmp.getCanonicalPath() + sql(s"""LOAD DATA INPATH "$tmpUri" OVERWRITE INTO TABLE non_part_table""") + } checkAnswer( sql("SELECT * FROM non_part_table WHERE employeeID = 16"), @@ -418,4 +431,19 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto assert(sql("SHOW PARTITIONS part_datasrc").count() == 3) } } + + /** + * Run a function with a copy of the input file. Use this for tests that use "LOAD DATA" + * (instead of "LOAD DATA LOCAL") since, according to Hive's semantics, files are moved + * into the target location in that case, and we need the original file to be preserved. + */ + private def withCopy(source: File)(fn: File => Unit): Unit = { + val tmp = File.createTempFile(source.getName(), ".tmp") + Files.copy(source, tmp) + try { + fn(tmp) + } finally { + tmp.delete() + } + } } From a8482a5ee0f320c991da60d6ca1a577e0ad67442 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Sat, 10 Dec 2016 12:20:10 -0800 Subject: [PATCH 3/5] Run same batch of tests against LOAD DATA and LOAD DATA LOCAL. To make sure both work as expected. --- .../sql/hive/execution/HiveCommandSuite.scala | 181 +++++++++--------- 1 file changed, 88 insertions(+), 93 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index b5d38f06d7d9..59b0918a6d42 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -158,7 +158,35 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } } - test("LOAD DATA") { + Seq(true, false).foreach { local => + val loadQuery = if (local) "LOAD DATA LOCAL" else "LOAD DATA" + test(loadQuery) { + testLoadData(loadQuery, local) + } + } + + private def testLoadData(loadQuery: String, local: Boolean): Unit = { + val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalFile() + + /** + * Run a function with a copy of the input data file when running with non-local input. The + * semantics in this mode are that the input file is moved to the destination, so we have + * to make a copy so that subsequent tests have access to the original file. + */ + def withInputFile(fn: File => Unit): Unit = { + if (local) { + fn(testData) + } else { + val tmp = File.createTempFile(testData.getName(), ".tmp") + Files.copy(testData, tmp) + try { + fn(tmp) + } finally { + tmp.delete() + } + } + } + withTable("non_part_table", "part_table") { sql( """ @@ -172,115 +200,96 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto // Its content looks like: // 16|john // 17|robert - val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath // LOAD DATA INTO non-partitioned table can't specify partition intercept[AnalysisException] { - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table PARTITION(ds="1")""") + sql(s"""$loadQuery INPATH "$testData" INTO TABLE non_part_table PARTITION(ds="1")""") + } + + withInputFile { path => + sql(s"""$loadQuery INPATH "$path" INTO TABLE non_part_table""") + + // Non-local mode is expected to move the file. Check once here that the input file + // was actually removed. + if (!local) { + assert(!path.exists()) + } } - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table""") checkAnswer( sql("SELECT * FROM non_part_table WHERE employeeID = 16"), Row(16, "john") :: Nil) - sql( - """ - |CREATE TABLE part_table (employeeID INT, employeeName STRING) - |PARTITIONED BY (c STRING, d STRING) - |ROW FORMAT DELIMITED - |FIELDS TERMINATED BY '|' - |LINES TERMINATED BY '\n' - """.stripMargin) - - // LOAD DATA INTO partitioned table must specify partition - intercept[AnalysisException] { - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table""") + // Incorrect URI. + // file://path/to/data/files/employee.dat + // + // TODO: need a similar test for non-local mode. + if (local) { + val incorrectUri = "file:/" + testData.getAbsolutePath() + intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$incorrectUri" INTO TABLE non_part_table""") + } } - intercept[AnalysisException] { - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1")""") - } - intercept[AnalysisException] { - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1")""") - } - intercept[AnalysisException] { - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", k="2")""") + // Use URI as inpath: + // file:/path/to/data/files/employee.dat + withInputFile { path => + sql(s"""$loadQuery INPATH "${path.toURI()}" INTO TABLE non_part_table""") } - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", d="2")""") checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"), - sql("SELECT * FROM non_part_table").collect()) + sql("SELECT * FROM non_part_table WHERE employeeID = 16"), + Row(16, "john") :: Row(16, "john") :: Nil) + + // Overwrite existing data. + withInputFile { path => + sql(s"""$loadQuery INPATH "${path.toURI()}" OVERWRITE INTO TABLE non_part_table""") + } - // Different order of partition columns. - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1", c="2")""") checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE c = '2' AND d = '1'"), - sql("SELECT * FROM non_part_table").collect()) - } - } + sql("SELECT * FROM non_part_table WHERE employeeID = 16"), + Row(16, "john") :: Nil) - test("LOAD DATA: input path") { - withTable("non_part_table") { sql( """ - |CREATE TABLE non_part_table (employeeID INT, employeeName STRING) + |CREATE TABLE part_table (employeeID INT, employeeName STRING) + |PARTITIONED BY (c STRING, d STRING) |ROW FORMAT DELIMITED |FIELDS TERMINATED BY '|' |LINES TERMINATED BY '\n' """.stripMargin) - // Non-existing inpath - intercept[AnalysisException] { - sql("""LOAD DATA LOCAL INPATH "/non-existing/data.txt" INTO TABLE non_part_table""") + // LOAD DATA INTO partitioned table must specify partition + withInputFile { path => + intercept[AnalysisException] { + sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table""") + } + + intercept[AnalysisException] { + sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(c="1")""") + } + intercept[AnalysisException] { + sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(d="1")""") + } + intercept[AnalysisException] { + sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(c="1", k="2")""") + } } - val testData = hiveContext.getHiveFile("data/files/employee.dat") - - // Non-local inpath: without URI Scheme and Authority - withCopy(testData) { tmp => - sql(s"""LOAD DATA INPATH "${tmp.getCanonicalPath()}" INTO TABLE non_part_table""") + withInputFile { path => + sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(c="1", d="2")""") } - checkAnswer( - sql("SELECT * FROM non_part_table WHERE employeeID = 16"), - Row(16, "john") :: Nil) - - // Use URI as LOCAL inpath: - // file:/path/to/data/files/employee.dat - val uri = "file:" + testData.getCanonicalPath() - sql(s"""LOAD DATA LOCAL INPATH "$uri" INTO TABLE non_part_table""") - - checkAnswer( - sql("SELECT * FROM non_part_table WHERE employeeID = 16"), - Row(16, "john") :: Row(16, "john") :: Nil) - - // Use URI as non-LOCAL inpath - withCopy(testData) { tmp => - val tmpUri = "file:" + tmp.getCanonicalPath() - sql(s"""LOAD DATA INPATH "$tmpUri" INTO TABLE non_part_table""") - } - - checkAnswer( - sql("SELECT * FROM non_part_table WHERE employeeID = 16"), - Row(16, "john") :: Row(16, "john") :: Row(16, "john") :: Nil) + sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"), + sql("SELECT * FROM non_part_table").collect()) - withCopy(testData) { tmp => - val tmpUri = "file:" + tmp.getCanonicalPath() - sql(s"""LOAD DATA INPATH "$tmpUri" OVERWRITE INTO TABLE non_part_table""") + // Different order of partition columns. + withInputFile { path => + sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(d="1", c="2")""") } - checkAnswer( - sql("SELECT * FROM non_part_table WHERE employeeID = 16"), - Row(16, "john") :: Nil) - - // Incorrect URI: - // file://path/to/data/files/employee.dat - val incorrectUri = "file:/" + testData - intercept[AnalysisException] { - sql(s"""LOAD DATA LOCAL INPATH "$incorrectUri" INTO TABLE non_part_table""") - } + sql("SELECT employeeID, employeeName FROM part_table WHERE c = '2' AND d = '1'"), + sql("SELECT * FROM non_part_table").collect()) } } @@ -432,18 +441,4 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } } - /** - * Run a function with a copy of the input file. Use this for tests that use "LOAD DATA" - * (instead of "LOAD DATA LOCAL") since, according to Hive's semantics, files are moved - * into the target location in that case, and we need the original file to be preserved. - */ - private def withCopy(source: File)(fn: File => Unit): Unit = { - val tmp = File.createTempFile(source.getName(), ".tmp") - Files.copy(source, tmp) - try { - fn(tmp) - } finally { - tmp.delete() - } - } } From b451a705f02252777f0b11c853a82b86bdc0359d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Sat, 10 Dec 2016 15:32:51 -0800 Subject: [PATCH 4/5] Better check for move vs. copy behavior. --- .../spark/sql/hive/execution/HiveCommandSuite.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index 59b0918a6d42..0a72b99d57c8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -209,11 +209,9 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto withInputFile { path => sql(s"""$loadQuery INPATH "$path" INTO TABLE non_part_table""") - // Non-local mode is expected to move the file. Check once here that the input file - // was actually removed. - if (!local) { - assert(!path.exists()) - } + // Non-local mode is expected to move the file, while local mode is expected to copy it. + // Check once here that the behavior is the expected. + assert(local === path.exists()) } checkAnswer( From 4e37c808ac76b8b7e1f3b224da06828a191f3373 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 12 Dec 2016 10:15:11 -0800 Subject: [PATCH 5/5] Feedback. --- .../org/apache/spark/sql/execution/command/tables.scala | 2 +- .../spark/sql/hive/execution/HiveCommandSuite.scala | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 35c2ef040edc..d2a7556476a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -203,7 +203,7 @@ case class LoadDataCommand( throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " + s"but number of columns in provided partition spec (${partition.get.size}) " + s"do not match number of partitioned columns in table " + - s"(s${targetTable.partitionColumnNames.size})") + s"(${targetTable.partitionColumnNames.size})") } partition.get.keys.foreach { colName => if (!targetTable.partitionColumnNames.contains(colName)) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index 0a72b99d57c8..1680f6c40acd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -166,6 +166,10 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } private def testLoadData(loadQuery: String, local: Boolean): Unit = { + // employee.dat has two columns separated by '|', the first is an int, the second is a string. + // Its content looks like: + // 16|john + // 17|robert val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalFile() /** @@ -196,11 +200,6 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto |LINES TERMINATED BY '\n' """.stripMargin) - // employee.dat has two columns separated by '|', the first is an int, the second is a string. - // Its content looks like: - // 16|john - // 17|robert - // LOAD DATA INTO non-partitioned table can't specify partition intercept[AnalysisException] { sql(s"""$loadQuery INPATH "$testData" INTO TABLE non_part_table PARTITION(ds="1")""")