Skip to content

Commit 42bf872

Browse files
committed
DataFrameWriterV2 tests
1 parent c80a155 commit 42bf872

File tree

1 file changed

+44
-0
lines changed

1 file changed

+44
-0
lines changed

sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,15 @@ import scala.collection.JavaConverters._
2222
import org.scalatest.BeforeAndAfter
2323

2424
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
25+
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
2526
import org.apache.spark.sql.connector.InMemoryTableCatalog
2627
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
2728
import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform}
29+
import org.apache.spark.sql.execution.QueryExecution
30+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
2831
import org.apache.spark.sql.test.SharedSparkSession
2932
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
33+
import org.apache.spark.sql.util.QueryExecutionListener
3034

3135
class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with BeforeAndAfter {
3236
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
@@ -51,6 +55,46 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
5155
spark.sessionState.conf.clear()
5256
}
5357

58+
test("DataFrameWriteV2 encode identifiers correctly") {
59+
spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo")
60+
61+
var plan: LogicalPlan = null
62+
val listener = new QueryExecutionListener {
63+
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
64+
plan = qe.analyzed
65+
66+
}
67+
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {}
68+
}
69+
spark.listenerManager.register(listener)
70+
71+
spark.table("source").writeTo("testcat.table_name").append()
72+
sparkContext.listenerBus.waitUntilEmpty()
73+
assert(plan.isInstanceOf[AppendData])
74+
checkV2Identifiers(plan.asInstanceOf[AppendData].table)
75+
76+
spark.table("source").writeTo("testcat.table_name").overwrite(lit(true))
77+
sparkContext.listenerBus.waitUntilEmpty()
78+
assert(plan.isInstanceOf[OverwriteByExpression])
79+
checkV2Identifiers(plan.asInstanceOf[OverwriteByExpression].table)
80+
81+
spark.table("source").writeTo("testcat.table_name").overwritePartitions()
82+
sparkContext.listenerBus.waitUntilEmpty()
83+
assert(plan.isInstanceOf[OverwritePartitionsDynamic])
84+
checkV2Identifiers(plan.asInstanceOf[OverwritePartitionsDynamic].table)
85+
}
86+
87+
private def checkV2Identifiers(
88+
plan: LogicalPlan,
89+
identifiers: Seq[String] = Seq("table_name"),
90+
catalogName: String = "testcat"): Unit = {
91+
assert(plan.isInstanceOf[DataSourceV2Relation])
92+
val v2 = plan.asInstanceOf[DataSourceV2Relation]
93+
assert(v2.identifiers.length == identifiers.length)
94+
assert(identifiers.forall(t => v2.identifiers.exists(_.name() == t)))
95+
assert(v2.catalogIdentifier.exists(_ == catalogName))
96+
}
97+
5498
test("Append: basic append") {
5599
spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo")
56100

0 commit comments

Comments
 (0)