From 37eb1dcb1a102395a948807237586a39fe35deef Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 15 Feb 2017 13:21:48 -0800 Subject: [PATCH 1/2] [SPARK-19329][SQL][BACKPORT-2.1]backport to branch-2.1,Reading from or writing to a datasource table with a non pre-existing location should succeed --- .../datasources/DataSourceStrategy.scala | 2 +- .../sql/execution/command/DDLSuite.scala | 147 ++++++++++++++++++ 2 files changed, 148 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 03eed251763b4..5062da19e5d57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -299,7 +299,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] options = table.storage.properties ++ pathOption) LogicalRelation( - dataSource.resolveRelation(), + dataSource.resolveRelation(checkFilesExist = false), expectedOutputAttributes = Some(simpleCatalogRelation.output), catalogTable = Some(table)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index c0f583e5f7072..b233c75ea2cf3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1760,4 +1760,151 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val rows: Seq[Row] = df.toLocalIterator().asScala.toSeq assert(rows.length > 0) } + + test("SET LOCATION for managed table") { + withTable("tbl") { + withTempDir { dir => + sql("CREATE TABLE tbl(i INT) USING parquet") + sql("INSERT INTO tbl SELECT 1") + checkAnswer(spark.table("tbl"), Row(1)) + val defaultTablePath = spark.sessionState.catalog + .getTableMetadata(TableIdentifier("tbl")).storage.locationUri.get + + sql(s"ALTER TABLE tbl SET LOCATION '${dir.getCanonicalPath}'") + spark.catalog.refreshTable("tbl") + // SET LOCATION won't move data from previous table path to new table path. + assert(spark.table("tbl").count() == 0) + // the previous table path should be still there. + assert(new File(new URI(defaultTablePath)).exists()) + + sql("INSERT INTO tbl SELECT 2") + checkAnswer(spark.table("tbl"), Row(2)) + // newly inserted data will go to the new table path. + assert(dir.listFiles().nonEmpty) + + sql("DROP TABLE tbl") + // the new table path will be removed after DROP TABLE. + assert(!dir.exists()) + } + } + } + + test("insert data to a data source table which has a not existed location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a string, b int) + |USING parquet + |OPTIONS(path "$dir") + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = dir.getAbsolutePath.stripSuffix("/") + assert(table.location.stripSuffix("/") == expectedPath) + + dir.delete + val tableLocFile = new File(table.location.stripPrefix("file:")) + assert(!tableLocFile.exists) + spark.sql("INSERT INTO TABLE t SELECT 'c', 1") + assert(tableLocFile.exists) + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + + Utils.deleteRecursively(dir) + assert(!tableLocFile.exists) + spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1") + assert(tableLocFile.exists) + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + + val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + val newDirFile = new File(newDir) + spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") + spark.sessionState.catalog.refreshTable(TableIdentifier("t")) + + val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table1.location == newDir) + assert(!newDirFile.exists) + + spark.sql("INSERT INTO TABLE t SELECT 'c', 1") + assert(newDirFile.exists) + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + } + } + } + + test("insert into a data source table with no existed partition location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a int, b int, c int, d int) + |USING parquet + |PARTITIONED BY(a, b) + |LOCATION "$dir" + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = dir.getAbsolutePath.stripSuffix("/") + assert(table.location.stripSuffix("/") == expectedPath) + + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") + checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) + + val partLoc = new File(s"${dir.getAbsolutePath}/a=1") + Utils.deleteRecursively(partLoc) + assert(!partLoc.exists()) + // insert overwrite into a partition which location has been deleted. + spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8") + assert(partLoc.exists()) + checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil) + } + } + } + + test("read data from a data source table which has a not existed location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a string, b int) + |USING parquet + |OPTIONS(path "$dir") + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = dir.getAbsolutePath.stripSuffix("/") + assert(table.location.stripSuffix("/") == expectedPath) + + dir.delete() + checkAnswer(spark.table("t"), Nil) + + val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") + + val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table1.location == newDir) + assert(!new File(newDir).exists()) + checkAnswer(spark.table("t"), Nil) + } + } + } + + test("read data from a data source table with no existed partition location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a int, b int, c int, d int) + |USING parquet + |PARTITIONED BY(a, b) + |LOCATION "$dir" + """.stripMargin) + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") + checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) + + // select from a partition which location has been deleted. + Utils.deleteRecursively(dir) + assert(!dir.exists()) + spark.sql("REFRESH TABLE t") + checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil) + } + } + } } From 96c767842e5363137246b958a82e6e603d71bd00 Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 16 Mar 2017 22:41:28 +0800 Subject: [PATCH 2/2] fix some code --- .../sql/execution/command/DDLSuite.scala | 61 +++++-------------- 1 file changed, 16 insertions(+), 45 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b233c75ea2cf3..d7fa0b5a01b3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1761,34 +1761,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(rows.length > 0) } - test("SET LOCATION for managed table") { - withTable("tbl") { - withTempDir { dir => - sql("CREATE TABLE tbl(i INT) USING parquet") - sql("INSERT INTO tbl SELECT 1") - checkAnswer(spark.table("tbl"), Row(1)) - val defaultTablePath = spark.sessionState.catalog - .getTableMetadata(TableIdentifier("tbl")).storage.locationUri.get - - sql(s"ALTER TABLE tbl SET LOCATION '${dir.getCanonicalPath}'") - spark.catalog.refreshTable("tbl") - // SET LOCATION won't move data from previous table path to new table path. - assert(spark.table("tbl").count() == 0) - // the previous table path should be still there. - assert(new File(new URI(defaultTablePath)).exists()) - - sql("INSERT INTO tbl SELECT 2") - checkAnswer(spark.table("tbl"), Row(2)) - // newly inserted data will go to the new table path. - assert(dir.listFiles().nonEmpty) - - sql("DROP TABLE tbl") - // the new table path will be removed after DROP TABLE. - assert(!dir.exists()) - } - } - } - test("insert data to a data source table which has a not existed location should succeed") { withTable("t") { withTempDir { dir => @@ -1799,8 +1771,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { |OPTIONS(path "$dir") """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = dir.getAbsolutePath.stripSuffix("/") - assert(table.location.stripSuffix("/") == expectedPath) + val expectedPath = dir.getAbsolutePath + assert(table.location == expectedPath) dir.delete val tableLocFile = new File(table.location.stripPrefix("file:")) @@ -1815,17 +1787,16 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(tableLocFile.exists) checkAnswer(spark.table("t"), Row("c", 1) :: Nil) - val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" - val newDirFile = new File(newDir) + val newDir = new File(dir, "x") spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") spark.sessionState.catalog.refreshTable(TableIdentifier("t")) val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table1.location == newDir) - assert(!newDirFile.exists) + assert(table1.location == newDir.getAbsolutePath) + assert(!newDir.exists) spark.sql("INSERT INTO TABLE t SELECT 'c', 1") - assert(newDirFile.exists) + assert(newDir.exists) checkAnswer(spark.table("t"), Row("c", 1) :: Nil) } } @@ -1838,17 +1809,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { s""" |CREATE TABLE t(a int, b int, c int, d int) |USING parquet + |OPTIONS(path '$dir') |PARTITIONED BY(a, b) - |LOCATION "$dir" """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = dir.getAbsolutePath.stripSuffix("/") - assert(table.location.stripSuffix("/") == expectedPath) + val expectedPath = dir.getAbsolutePath + assert(table.location == expectedPath) spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) - val partLoc = new File(s"${dir.getAbsolutePath}/a=1") + val partLoc = new File(dir, "a=1") Utils.deleteRecursively(partLoc) assert(!partLoc.exists()) // insert overwrite into a partition which location has been deleted. @@ -1869,18 +1840,18 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { |OPTIONS(path "$dir") """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = dir.getAbsolutePath.stripSuffix("/") - assert(table.location.stripSuffix("/") == expectedPath) + val expectedPath = dir.getAbsolutePath + assert(table.location == expectedPath) dir.delete() checkAnswer(spark.table("t"), Nil) - val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + val newDir = new File(dir, "x") spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table1.location == newDir) - assert(!new File(newDir).exists()) + assert(table1.location == newDir.getAbsolutePath) + assert(!newDir.exists()) checkAnswer(spark.table("t"), Nil) } } @@ -1893,8 +1864,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { s""" |CREATE TABLE t(a int, b int, c int, d int) |USING parquet + |OPTIONS(path "$dir") |PARTITIONED BY(a, b) - |LOCATION "$dir" """.stripMargin) spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)