Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
options = table.storage.properties ++ pathOption)

LogicalRelation(
dataSource.resolveRelation(),
dataSource.resolveRelation(checkFilesExist = false),
expectedOutputAttributes = Some(simpleCatalogRelation.output),
catalogTable = Some(table))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1760,4 +1760,122 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val rows: Seq[Row] = df.toLocalIterator().asScala.toSeq
assert(rows.length > 0)
}

test("insert data to a data source table which has a not existed location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING parquet
|OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = dir.getAbsolutePath
assert(table.location == expectedPath)

dir.delete
val tableLocFile = new File(table.location.stripPrefix("file:"))
assert(!tableLocFile.exists)
spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
assert(tableLocFile.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

Utils.deleteRecursively(dir)
assert(!tableLocFile.exists)
spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1")
assert(tableLocFile.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

val newDir = new File(dir, "x")
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
spark.sessionState.catalog.refreshTable(TableIdentifier("t"))

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDir.getAbsolutePath)
assert(!newDir.exists)

spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
assert(newDir.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
}
}
}

test("insert into a data source table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING parquet
|OPTIONS(path '$dir')
|PARTITIONED BY(a, b)
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = dir.getAbsolutePath
assert(table.location == expectedPath)

spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

val partLoc = new File(dir, "a=1")
Utils.deleteRecursively(partLoc)
assert(!partLoc.exists())
// insert overwrite into a partition which location has been deleted.
spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8")
assert(partLoc.exists())
checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil)
}
}
}

test("read data from a data source table which has a not existed location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING parquet
|OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = dir.getAbsolutePath
assert(table.location == expectedPath)

dir.delete()
checkAnswer(spark.table("t"), Nil)

val newDir = new File(dir, "x")
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDir.getAbsolutePath)
assert(!newDir.exists())
checkAnswer(spark.table("t"), Nil)
}
}
}

test("read data from a data source table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING parquet
|OPTIONS(path "$dir")
|PARTITIONED BY(a, b)
""".stripMargin)
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

// select from a partition which location has been deleted.
Utils.deleteRecursively(dir)
assert(!dir.exists())
spark.sql("REFRESH TABLE t")
checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)
}
}
}
}