Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
// TODO: improve `InMemoryCatalog` and remove this limitation.
catalogTable = if (withHiveSupport) Some(table) else None)

LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
LogicalRelation(dataSource.resolveRelation(checkFilesExist = false),
catalogTable = Some(table))
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1816,4 +1816,123 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
}

test("insert data to a data source table which has a not existed location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING parquet
|OPTIONS(path "$dir")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the path doesn't exist when create? will we succeed or fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently, it will throw an exception that the path does not existed. maybe we can check if the path is a dir or not, dir can not exist and file must be exist?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the behavior of hive?

""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = dir.getAbsolutePath.stripSuffix("/")
assert(table.location.stripSuffix("/") == expectedPath)

dir.delete
val tableLocFile = new File(table.location.stripPrefix("file:"))
assert(!tableLocFile.exists)
spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
assert(tableLocFile.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

Utils.deleteRecursively(dir)
assert(!tableLocFile.exists)
spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1")
assert(tableLocFile.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new File(dir, "x")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will fix this when I do another pr, thanks~

val newDirFile = new File(newDir)
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
spark.sessionState.catalog.refreshTable(TableIdentifier("t"))

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDir)
assert(!newDirFile.exists)

spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
assert(newDirFile.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
}
}
}

test("insert into a data source table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING parquet
|PARTITIONED BY(a, b)
|LOCATION "$dir"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = dir.getAbsolutePath.stripSuffix("/")
assert(table.location.stripSuffix("/") == expectedPath)

spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

val partLoc = new File(s"${dir.getAbsolutePath}/a=1")
Utils.deleteRecursively(partLoc)
assert(!partLoc.exists())
// insert overwrite into a partition which location has been deleted.
spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8")
assert(partLoc.exists())
checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil)
}
}
}

test("read data from a data source table which has a not existed location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING parquet
|OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = dir.getAbsolutePath.stripSuffix("/")
assert(table.location.stripSuffix("/") == expectedPath)

dir.delete()
checkAnswer(spark.table("t"), Nil)

val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDir)
assert(!new File(newDir).exists())
checkAnswer(spark.table("t"), Nil)
}
}
}

test("read data from a data source table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING parquet
|PARTITIONED BY(a, b)
|LOCATION "$dir"
""".stripMargin)
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

// select from a partition which location has been deleted.
Utils.deleteRecursively(dir)
assert(!dir.exists())
spark.sql("REFRESH TABLE t")
checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)
}
}
}
}