Skip to content

Commit 03a91f8

Browse files
committed
address comments
1 parent 9004fba commit 03a91f8

File tree

11 files changed

+119
-35
lines changed

11 files changed

+119
-35
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -847,9 +847,9 @@ class Analyzer(
847847
*/
848848
object ResolveTempViews extends Rule[LogicalPlan] {
849849
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
850-
case u @ UnresolvedRelation(ident, _, _) =>
851-
lookupTempView(ident).getOrElse(u)
852-
case i @ InsertIntoStatement(UnresolvedRelation(ident, _, _), _, _, _, _) =>
850+
case u @ UnresolvedRelation(ident, _, isStreaming) =>
851+
lookupTempView(ident, isStreaming).getOrElse(u)
852+
case i @ InsertIntoStatement(UnresolvedRelation(ident, _, false), _, _, _, _) =>
853853
lookupTempView(ident)
854854
.map(view => i.copy(table = view))
855855
.getOrElse(i)
@@ -862,15 +862,22 @@ class Analyzer(
862862
lookupTempView(ident).map(_ => ResolvedView(ident.asIdentifier)).getOrElse(u)
863863
}
864864

865-
def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = {
865+
def lookupTempView(
866+
identifier: Seq[String], isStreaming: Boolean = false): Option[LogicalPlan] = {
866867
// Permanent View can't refer to temp views, no need to lookup at all.
867868
if (isResolvingView) return None
868869

869-
identifier match {
870+
val tmpView = identifier match {
870871
case Seq(part1) => v1SessionCatalog.lookupTempView(part1)
871872
case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1, part2)
872873
case _ => None
873874
}
875+
876+
if (isStreaming && tmpView.nonEmpty && !tmpView.get.isStreaming) {
877+
throw new AnalysisException("The temp view related to non-streaming relation is " +
878+
"not supported in readStream.table().")
879+
}
880+
tmpView
874881
}
875882
}
876883

@@ -897,11 +904,12 @@ class Analyzer(
897904
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
898905
case u: UnresolvedRelation =>
899906
lookupV2Relation(u.multipartIdentifier, u.options, u.isStreaming)
900-
.map {
901-
case rel: DataSourceV2Relation =>
902-
val ident = rel.identifier.get
903-
SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel)
904-
case o => o
907+
.map { relation =>
908+
val (catalog, ident) = relation match {
909+
case ds: DataSourceV2Relation => (ds.catalog, ds.identifier.get)
910+
case s: StreamingRelationV2 => (s.catalog, s.identifier.get)
911+
}
912+
SubqueryAlias(catalog.get.name +: ident.namespace :+ ident.name, relation)
905913
}.getOrElse(u)
906914

907915
case u @ UnresolvedTable(NonSessionCatalogAndIdentifier(catalog, ident)) =>
@@ -941,8 +949,8 @@ class Analyzer(
941949
CatalogV2Util.loadTable(catalog, ident) match {
942950
case Some(table) =>
943951
if (isStreaming) {
944-
Some(StreamingRelationV2(
945-
None, table.name, table, options, table.schema.toAttributes, None))
952+
Some(StreamingRelationV2(None, table.name, table, options,
953+
table.schema.toAttributes, Some(catalog), Some(ident), None))
946954
} else {
947955
Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
948956
}
@@ -1038,16 +1046,23 @@ class Analyzer(
10381046
lazy val loaded = CatalogV2Util.loadTable(catalog, ident).map {
10391047
case v1Table: V1Table =>
10401048
if (isStreaming) {
1041-
UnresolvedCatalogRelation(v1Table.v1Table, options, isStreaming = true)
1049+
SubqueryAlias(
1050+
catalog.name +: ident.asMultipartIdentifier,
1051+
UnresolvedCatalogRelation(v1Table.v1Table, options, isStreaming = true))
10421052
} else {
10431053
v1SessionCatalog.getRelation(v1Table.v1Table, options)
10441054
}
10451055
case table =>
10461056
if (isStreaming) {
1047-
val tableMeta = v1SessionCatalog.getTableMetadata(ident.asTableIdentifier)
1048-
StreamingRelationV2(
1049-
None, table.name, table, options, table.schema.toAttributes,
1050-
Some(UnresolvedCatalogRelation(tableMeta, isStreaming = true)))
1057+
val v1Fallback = table match {
1058+
case withFallback: V2TableWithV1Fallback =>
1059+
Some(UnresolvedCatalogRelation(withFallback.v1Table, isStreaming = true))
1060+
case _ => None
1061+
}
1062+
SubqueryAlias(
1063+
catalog.name +: ident.asMultipartIdentifier,
1064+
StreamingRelationV2(None, table.name, table, options, table.schema.toAttributes,
1065+
Some(catalog), Some(ident), v1Fallback))
10511066
} else {
10521067
SubqueryAlias(
10531068
catalog.name +: ident.asMultipartIdentifier,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
3333
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
3434
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter}
3535
import org.apache.spark.sql.catalyst.util.quoteIdentifier
36-
import org.apache.spark.sql.connector.catalog.CatalogManager
36+
import org.apache.spark.sql.connector.catalog.{CatalogManager, Table}
3737
import org.apache.spark.sql.internal.SQLConf
3838
import org.apache.spark.sql.types._
3939
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -695,3 +695,11 @@ case class HiveTableRelation(
695695
dataCols = dataCols.map(_.newInstance()),
696696
partitionCols = partitionCols.map(_.newInstance()))
697697
}
698+
699+
/**
700+
* A V2 table with V1 fallback support. This is used to fallback to V1 table when the V2 one
701+
* doesn't implement specific capabilities but V1 already has.
702+
*/
703+
trait V2TableWithV1Fallback extends Table {
704+
def v1Table: CatalogTable
705+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.streaming
2020
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2121
import org.apache.spark.sql.catalyst.expressions.Attribute
2222
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
23-
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
23+
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableProvider}
2424
import org.apache.spark.sql.internal.SQLConf
2525
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2626

@@ -37,6 +37,8 @@ case class StreamingRelationV2(
3737
table: Table,
3838
extraOptions: CaseInsensitiveStringMap,
3939
output: Seq[Attribute],
40+
catalog: Option[CatalogPlugin],
41+
identifier: Option[Identifier],
4042
v1Relation: Option[LogicalPlan])
4143
extends LeafNode with MultiInstanceRelation {
4244
override lazy val resolved = v1Relation.forall(_.resolved)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
296296
getStreamingRelation(tableMeta, extraOptions)
297297

298298
case s @ StreamingRelationV2(
299-
_, _, table, extraOptions, _, Some(UnresolvedCatalogRelation(tableMeta, _, true))) =>
299+
_, _, table, extraOptions, _, _, _, Some(UnresolvedCatalogRelation(tableMeta, _, true))) =>
300300
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
301301
if (table.isInstanceOf[SupportsRead]
302302
&& table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ)) {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {
4343
case r: DataSourceV2Relation if !r.table.supports(BATCH_READ) =>
4444
failAnalysis(s"Table ${r.table.name()} does not support batch scan.")
4545

46-
case r: StreamingRelationV2 if !r.table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) =>
46+
case r: StreamingRelationV2
47+
if !r.table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) && r.v1Relation.isEmpty =>
4748
throw new AnalysisException(s"Table ${r.table.name()} does not support either " +
4849
"micro-batch or continuous scan.")
4950

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class MicroBatchExecution(
9090
StreamingExecutionRelation(source, output)(sparkSession)
9191
})
9292

93-
case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output, v1) =>
93+
case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output, _, _, v1) =>
9494
val dsStr = if (src.nonEmpty) s"[${src.get}]" else ""
9595
val v2Disabled = disabledSources.contains(src.getOrElse(None).getClass.getCanonicalName)
9696
if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class ContinuousExecution(
6565
var nextSourceId = 0
6666
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
6767
val _logicalPlan = analyzedPlan.transform {
68-
case s @ StreamingRelationV2(ds, sourceName, table: SupportsRead, options, output, _) =>
68+
case s @ StreamingRelationV2(ds, sourceName, table: SupportsRead, options, output, _, _, _) =>
6969
val dsStr = if (ds.nonEmpty) s"[${ds.get}]" else ""
7070
if (!table.supports(TableCapability.CONTINUOUS_READ)) {
7171
throw new UnsupportedOperationException(

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Spa
8383
new MemoryStreamTable(this),
8484
CaseInsensitiveStringMap.empty(),
8585
attributes,
86+
None,
87+
None,
8688
None)
8789
}
8890

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
232232
Dataset.ofRows(
233233
sparkSession,
234234
StreamingRelationV2(
235-
Some(provider), source, table, dsOptions, table.schema.toAttributes, v1Relation))
235+
Some(provider), source, table, dsOptions,
236+
table.schema.toAttributes, None, None, v1Relation))
236237

237238
// fallback to v1
238239
// TODO (SPARK-27483): we should move this fallback logic to an analyzer rule.

sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ class TableCapabilityCheckSuite extends AnalysisSuite with SharedSparkSession {
4646
table,
4747
CaseInsensitiveStringMap.empty(),
4848
TableCapabilityCheckSuite.schema.toAttributes,
49+
None,
50+
None,
4951
v1Relation)
5052
}
5153

0 commit comments

Comments
 (0)