Skip to content

Commit d0800fc

Browse files
yuchenhuobrkyvz
authored andcommitted
[SPARK-30314] Add identifier and catalog information to DataSourceV2Relation
### What changes were proposed in this pull request? Add identifier and catalog information in DataSourceV2Relation so it would be possible to do richer checks in checkAnalysis step. ### Why are the changes needed? In data source v2, table implementations are all customized so we may not be able to get the resolved identifier from tables them selves. Therefore we encode the table and catalog information in DSV2Relation so no external changes are needed to make sure this information is available. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit tests in the following suites: CatalogManagerSuite.scala CatalogV2UtilSuite.scala SupportsCatalogOptionsSuite.scala PlanResolutionSuite.scala Closes #26957 from yuchenhuo/SPARK-30314. Authored-by: Yuchen Huo <[email protected]> Signed-off-by: Burak Yavuz <[email protected]>
1 parent 48f6478 commit d0800fc

File tree

17 files changed

+290
-54
lines changed

17 files changed

+290
-54
lines changed

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ class KafkaRelationSuiteV2 extends KafkaRelationSuiteBase {
624624
val topic = newTopic()
625625
val df = createDF(topic)
626626
assert(df.logicalPlan.collect {
627-
case DataSourceV2Relation(_, _, _) => true
627+
case _: DataSourceV2Relation => true
628628
}.nonEmpty)
629629
}
630630
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -817,8 +817,8 @@ class Analyzer(
817817

818818
case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) =>
819819
CatalogV2Util.loadRelation(u.catalog, u.tableName)
820-
.map(rel => alter.copy(table = rel))
821-
.getOrElse(alter)
820+
.map(rel => alter.copy(table = rel))
821+
.getOrElse(alter)
822822

823823
case u: UnresolvedV2Relation =>
824824
CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u)
@@ -831,7 +831,8 @@ class Analyzer(
831831
expandRelationName(identifier) match {
832832
case NonSessionCatalogAndIdentifier(catalog, ident) =>
833833
CatalogV2Util.loadTable(catalog, ident) match {
834-
case Some(table) => Some(DataSourceV2Relation.create(table))
834+
case Some(table) =>
835+
Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
835836
case None => None
836837
}
837838
case _ => None
@@ -921,7 +922,7 @@ class Analyzer(
921922
case v1Table: V1Table =>
922923
v1SessionCatalog.getRelation(v1Table.v1Table)
923924
case table =>
924-
DataSourceV2Relation.create(table)
925+
DataSourceV2Relation.create(table, Some(catalog), Some(ident))
925926
}
926927
val key = catalog.name +: ident.namespace :+ ident.name
927928
Option(AnalysisContext.get.relationCache.getOrElseUpdate(key, loaded.orNull))

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ private[sql] object CatalogV2Util {
286286
}
287287

288288
def loadRelation(catalog: CatalogPlugin, ident: Identifier): Option[NamedRelation] = {
289-
loadTable(catalog, ident).map(DataSourceV2Relation.create)
289+
loadTable(catalog, ident).map(DataSourceV2Relation.create(_, Some(catalog), Some(ident)))
290290
}
291291

292292
def isSessionCatalog(catalog: CatalogPlugin): Boolean = {

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat
2121
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
2222
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
2323
import org.apache.spark.sql.catalyst.util.truncatedString
24-
import org.apache.spark.sql.connector.catalog.{Table, TableCapability}
24+
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableCapability}
2525
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, Statistics => V2Statistics, SupportsReportStatistics}
2626
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
2727
import org.apache.spark.sql.connector.write.WriteBuilder
@@ -32,12 +32,17 @@ import org.apache.spark.util.Utils
3232
* A logical plan representing a data source v2 table.
3333
*
3434
* @param table The table that this relation represents.
35+
* @param output the output attributes of this relation.
36+
* @param catalog catalogPlugin for the table. None if no catalog is specified.
37+
* @param identifier the identifier for the table. None if no identifier is defined.
3538
* @param options The options for this table operation. It's used to create fresh [[ScanBuilder]]
3639
* and [[WriteBuilder]].
3740
*/
3841
case class DataSourceV2Relation(
3942
table: Table,
4043
output: Seq[AttributeReference],
44+
catalog: Option[CatalogPlugin],
45+
identifier: Option[Identifier],
4146
options: CaseInsensitiveStringMap)
4247
extends LeafNode with MultiInstanceRelation with NamedRelation {
4348

@@ -137,12 +142,20 @@ case class StreamingDataSourceV2Relation(
137142
}
138143

139144
object DataSourceV2Relation {
140-
def create(table: Table, options: CaseInsensitiveStringMap): DataSourceV2Relation = {
145+
def create(
146+
table: Table,
147+
catalog: Option[CatalogPlugin],
148+
identifier: Option[Identifier],
149+
options: CaseInsensitiveStringMap): DataSourceV2Relation = {
141150
val output = table.schema().toAttributes
142-
DataSourceV2Relation(table, output, options)
151+
DataSourceV2Relation(table, output, catalog, identifier, options)
143152
}
144153

145-
def create(table: Table): DataSourceV2Relation = create(table, CaseInsensitiveStringMap.empty)
154+
def create(
155+
table: Table,
156+
catalog: Option[CatalogPlugin],
157+
identifier: Option[Identifier]): DataSourceV2Relation =
158+
create(table, catalog, identifier, CaseInsensitiveStringMap.empty)
146159

147160
/**
148161
* This is used to transform data source v2 statistics to logical.Statistics.
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.catalog
19+
20+
import org.mockito.Mockito.{mock, when}
21+
22+
import org.apache.spark.SparkFunSuite
23+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
24+
import org.apache.spark.sql.types.StructType
25+
26+
class CatalogV2UtilSuite extends SparkFunSuite {
27+
test("Load relation should encode the identifiers for V2Relations") {
28+
val testCatalog = mock(classOf[TableCatalog])
29+
val ident = mock(classOf[Identifier])
30+
val table = mock(classOf[Table])
31+
when(table.schema()).thenReturn(mock(classOf[StructType]))
32+
when(testCatalog.loadTable(ident)).thenReturn(table)
33+
val r = CatalogV2Util.loadRelation(testCatalog, ident)
34+
assert(r.isDefined)
35+
assert(r.get.isInstanceOf[DataSourceV2Relation])
36+
val v2Relation = r.get.asInstanceOf[DataSourceV2Relation]
37+
assert(v2Relation.catalog.exists(_ == testCatalog))
38+
assert(v2Relation.identifier.exists(_ == ident))
39+
}
40+
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
195195
}
196196

197197
DataSource.lookupDataSourceV2(source, sparkSession.sessionState.conf).map { provider =>
198+
val catalogManager = sparkSession.sessionState.catalogManager
198199
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
199200
source = provider, conf = sparkSession.sessionState.conf)
200201
val pathsOption = if (paths.isEmpty) {
@@ -206,27 +207,30 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
206207

207208
val finalOptions = sessionOptions ++ extraOptions.toMap ++ pathsOption
208209
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
209-
val table = provider match {
210+
val (table, catalog, ident) = provider match {
210211
case _: SupportsCatalogOptions if userSpecifiedSchema.nonEmpty =>
211212
throw new IllegalArgumentException(
212213
s"$source does not support user specified schema. Please don't specify the schema.")
213214
case hasCatalog: SupportsCatalogOptions =>
214215
val ident = hasCatalog.extractIdentifier(dsOptions)
215216
val catalog = CatalogV2Util.getTableProviderCatalog(
216217
hasCatalog,
217-
sparkSession.sessionState.catalogManager,
218+
catalogManager,
218219
dsOptions)
219-
catalog.loadTable(ident)
220+
(catalog.loadTable(ident), Some(catalog), Some(ident))
220221
case _ =>
222+
// TODO: Non-catalog paths for DSV2 are currently not well defined.
221223
userSpecifiedSchema match {
222-
case Some(schema) => provider.getTable(dsOptions, schema)
223-
case _ => provider.getTable(dsOptions)
224+
case Some(schema) => (provider.getTable(dsOptions, schema), None, None)
225+
case _ => (provider.getTable(dsOptions), None, None)
224226
}
225227
}
226228
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
227229
table match {
228230
case _: SupportsRead if table.supports(BATCH_READ) =>
229-
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, dsOptions))
231+
Dataset.ofRows(
232+
sparkSession,
233+
DataSourceV2Relation.create(table, catalog, ident, dsOptions))
230234

231235
case _ => loadV1Source(paths: _*)
232236
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -258,20 +258,20 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
258258
val dsOptions = new CaseInsensitiveStringMap(options.asJava)
259259

260260
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
261+
val catalogManager = df.sparkSession.sessionState.catalogManager
261262
mode match {
262263
case SaveMode.Append | SaveMode.Overwrite =>
263-
val table = provider match {
264+
val (table, catalog, ident) = provider match {
264265
case supportsExtract: SupportsCatalogOptions =>
265266
val ident = supportsExtract.extractIdentifier(dsOptions)
266-
val sessionState = df.sparkSession.sessionState
267267
val catalog = CatalogV2Util.getTableProviderCatalog(
268-
supportsExtract, sessionState.catalogManager, dsOptions)
268+
supportsExtract, catalogManager, dsOptions)
269269

270-
catalog.loadTable(ident)
270+
(catalog.loadTable(ident), Some(catalog), Some(ident))
271271
case tableProvider: TableProvider =>
272272
val t = tableProvider.getTable(dsOptions)
273273
if (t.supports(BATCH_WRITE)) {
274-
t
274+
(t, None, None)
275275
} else {
276276
// Streaming also uses the data source V2 API. So it may be that the data source
277277
// implements v2, but has no v2 implementation for batch writes. In that case, we
@@ -280,7 +280,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
280280
}
281281
}
282282

283-
val relation = DataSourceV2Relation.create(table, dsOptions)
283+
val relation = DataSourceV2Relation.create(table, catalog, ident, dsOptions)
284284
checkPartitioningMatchesV2Table(table)
285285
if (mode == SaveMode.Append) {
286286
runCommand(df.sparkSession, "save") {
@@ -299,9 +299,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
299299
provider match {
300300
case supportsExtract: SupportsCatalogOptions =>
301301
val ident = supportsExtract.extractIdentifier(dsOptions)
302-
val sessionState = df.sparkSession.sessionState
303302
val catalog = CatalogV2Util.getTableProviderCatalog(
304-
supportsExtract, sessionState.catalogManager, dsOptions)
303+
supportsExtract, catalogManager, dsOptions)
305304

306305
val location = Option(dsOptions.get("path")).map(TableCatalog.PROP_LOCATION -> _)
307306

@@ -419,7 +418,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
419418
case _: V1Table =>
420419
return insertInto(TableIdentifier(ident.name(), ident.namespace().headOption))
421420
case t =>
422-
DataSourceV2Relation.create(t)
421+
DataSourceV2Relation.create(t, Some(catalog), Some(ident))
423422
}
424423

425424
val command = mode match {
@@ -554,12 +553,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
554553
}
555554

556555
val command = (mode, tableOpt) match {
557-
case (_, Some(table: V1Table)) =>
556+
case (_, Some(_: V1Table)) =>
558557
return saveAsTable(TableIdentifier(ident.name(), ident.namespace().headOption))
559558

560559
case (SaveMode.Append, Some(table)) =>
561560
checkPartitioningMatchesV2Table(table)
562-
AppendData.byName(DataSourceV2Relation.create(table), df.logicalPlan, extraOptions.toMap)
561+
val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
562+
AppendData.byName(v2Relation, df.logicalPlan, extraOptions.toMap)
563563

564564
case (SaveMode.Overwrite, _) =>
565565
ReplaceTableAsSelect(

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,9 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
158158
def append(): Unit = {
159159
val append = loadTable(catalog, identifier) match {
160160
case Some(t) =>
161-
AppendData.byName(DataSourceV2Relation.create(t), logicalPlan, options.toMap)
161+
AppendData.byName(
162+
DataSourceV2Relation.create(t, Some(catalog), Some(identifier)),
163+
logicalPlan, options.toMap)
162164
case _ =>
163165
throw new NoSuchTableException(identifier)
164166
}
@@ -181,7 +183,8 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
181183
val overwrite = loadTable(catalog, identifier) match {
182184
case Some(t) =>
183185
OverwriteByExpression.byName(
184-
DataSourceV2Relation.create(t), logicalPlan, condition.expr, options.toMap)
186+
DataSourceV2Relation.create(t, Some(catalog), Some(identifier)),
187+
logicalPlan, condition.expr, options.toMap)
185188
case _ =>
186189
throw new NoSuchTableException(identifier)
187190
}
@@ -207,7 +210,8 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
207210
val dynamicOverwrite = loadTable(catalog, identifier) match {
208211
case Some(t) =>
209212
OverwritePartitionsDynamic.byName(
210-
DataSourceV2Relation.create(t), logicalPlan, options.toMap)
213+
DataSourceV2Relation.create(t, Some(catalog), Some(identifier)),
214+
logicalPlan, options.toMap)
211215
case _ =>
212216
throw new NoSuchTableException(identifier)
213217
}

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ class CacheManager extends Logging {
270270
case _ => false
271271
}
272272

273-
case DataSourceV2Relation(fileTable: FileTable, _, _) =>
273+
case DataSourceV2Relation(fileTable: FileTable, _, _, _, _) =>
274274
refreshFileIndexIfNecessary(fileTable.fileIndex, fs, qualifiedPath)
275275

276276
case _ => false

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, File
3333
*/
3434
class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
3535
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
36-
case i @ InsertIntoStatement(d @ DataSourceV2Relation(table: FileTable, _, _), _, _, _, _) =>
36+
case i @
37+
InsertIntoStatement(d @ DataSourceV2Relation(table: FileTable, _, _, _, _), _, _, _, _) =>
3738
val v1FileFormat = table.fallbackFileFormat.newInstance()
3839
val relation = HadoopFsRelation(
3940
table.fileIndex,

0 commit comments

Comments
 (0)