Skip to content

Commit 9e6882f

Browse files
xuanyuankingcloud-fan
authored andcommitted
[SPARK-32885][SS] Add DataStreamReader.table API
### What changes were proposed in this pull request? This pr aims to add a new `table` API in DataStreamReader, which is similar to the table API in DataFrameReader. ### Why are the changes needed? Users can directly use this API to get a Streaming DataFrame on a table. Below is a simple example: Application 1 for initializing and starting the streaming job: ``` val path = "/home/yuanjian.li/runtime/to_be_deleted" val tblName = "my_table" // Write some data to `my_table` spark.range(3).write.format("parquet").option("path", path).saveAsTable(tblName) // Read the table as a streaming source, write result to destination directory val table = spark.readStream.table(tblName) table.writeStream.format("parquet").option("checkpointLocation", "/home/yuanjian.li/runtime/to_be_deleted_ck").start("/home/yuanjian.li/runtime/to_be_deleted_2") ``` Application 2 for appending new data: ``` // Append new data into the path spark.range(5).write.format("parquet").option("path", "/home/yuanjian.li/runtime/to_be_deleted").mode("append").save() ``` Check result: ``` // The desitination directory should contains all written data spark.read.parquet("/home/yuanjian.li/runtime/to_be_deleted_2").show() ``` ### Does this PR introduce _any_ user-facing change? Yes, a new API added. ### How was this patch tested? New UT added and integrated testing. Closes #29756 from xuanyuanking/SPARK-32885. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent f2fc966 commit 9e6882f

File tree

18 files changed

+391
-38
lines changed

18 files changed

+391
-38
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.objects._
3636
import org.apache.spark.sql.catalyst.plans._
3737
import org.apache.spark.sql.catalyst.plans.logical._
3838
import org.apache.spark.sql.catalyst.rules._
39+
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
3940
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
4041
import org.apache.spark.sql.catalyst.util.toPrettySQL
4142
import org.apache.spark.sql.connector.catalog._
@@ -846,9 +847,9 @@ class Analyzer(
846847
*/
847848
object ResolveTempViews extends Rule[LogicalPlan] {
848849
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
849-
case u @ UnresolvedRelation(ident, _) =>
850-
lookupTempView(ident).getOrElse(u)
851-
case i @ InsertIntoStatement(UnresolvedRelation(ident, _), _, _, _, _) =>
850+
case u @ UnresolvedRelation(ident, _, isStreaming) =>
851+
lookupTempView(ident, isStreaming).getOrElse(u)
852+
case i @ InsertIntoStatement(UnresolvedRelation(ident, _, false), _, _, _, _) =>
852853
lookupTempView(ident)
853854
.map(view => i.copy(table = view))
854855
.getOrElse(i)
@@ -861,15 +862,22 @@ class Analyzer(
861862
lookupTempView(ident).map(_ => ResolvedView(ident.asIdentifier)).getOrElse(u)
862863
}
863864

864-
def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = {
865+
def lookupTempView(
866+
identifier: Seq[String], isStreaming: Boolean = false): Option[LogicalPlan] = {
865867
// Permanent View can't refer to temp views, no need to lookup at all.
866868
if (isResolvingView) return None
867869

868-
identifier match {
870+
val tmpView = identifier match {
869871
case Seq(part1) => v1SessionCatalog.lookupTempView(part1)
870872
case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1, part2)
871873
case _ => None
872874
}
875+
876+
if (isStreaming && tmpView.nonEmpty && !tmpView.get.isStreaming) {
877+
throw new AnalysisException(s"${identifier.quoted} is not a temp view of streaming " +
878+
s"logical plan, please use batch API such as `DataFrameReader.table` to read it.")
879+
}
880+
tmpView
873881
}
874882
}
875883

@@ -895,10 +903,13 @@ class Analyzer(
895903
object ResolveTables extends Rule[LogicalPlan] {
896904
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
897905
case u: UnresolvedRelation =>
898-
lookupV2Relation(u.multipartIdentifier, u.options)
899-
.map { rel =>
900-
val ident = rel.identifier.get
901-
SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel)
906+
lookupV2Relation(u.multipartIdentifier, u.options, u.isStreaming)
907+
.map { relation =>
908+
val (catalog, ident) = relation match {
909+
case ds: DataSourceV2Relation => (ds.catalog, ds.identifier.get)
910+
case s: StreamingRelationV2 => (s.catalog, s.identifier.get)
911+
}
912+
SubqueryAlias(catalog.get.name +: ident.namespace :+ ident.name, relation)
902913
}.getOrElse(u)
903914

904915
case u @ UnresolvedTable(NonSessionCatalogAndIdentifier(catalog, ident)) =>
@@ -911,8 +922,9 @@ class Analyzer(
911922
.map(ResolvedTable(catalog.asTableCatalog, ident, _))
912923
.getOrElse(u)
913924

914-
case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved =>
915-
lookupV2Relation(u.multipartIdentifier, u.options)
925+
case i @ InsertIntoStatement(u @ UnresolvedRelation(_, _, false), _, _, _, _)
926+
if i.query.resolved =>
927+
lookupV2Relation(u.multipartIdentifier, u.options, false)
916928
.map(v2Relation => i.copy(table = v2Relation))
917929
.getOrElse(i)
918930

@@ -930,12 +942,18 @@ class Analyzer(
930942
*/
931943
private def lookupV2Relation(
932944
identifier: Seq[String],
933-
options: CaseInsensitiveStringMap): Option[DataSourceV2Relation] =
945+
options: CaseInsensitiveStringMap,
946+
isStreaming: Boolean): Option[LogicalPlan] =
934947
expandRelationName(identifier) match {
935948
case NonSessionCatalogAndIdentifier(catalog, ident) =>
936949
CatalogV2Util.loadTable(catalog, ident) match {
937950
case Some(table) =>
938-
Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
951+
if (isStreaming) {
952+
Some(StreamingRelationV2(None, table.name, table, options,
953+
table.schema.toAttributes, Some(catalog), Some(ident), None))
954+
} else {
955+
Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
956+
}
939957
case None => None
940958
}
941959
case _ => None
@@ -976,8 +994,8 @@ class Analyzer(
976994
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
977995
case i @ InsertIntoStatement(table, _, _, _, _) if i.query.resolved =>
978996
val relation = table match {
979-
case u: UnresolvedRelation =>
980-
lookupRelation(u.multipartIdentifier, u.options).getOrElse(u)
997+
case u @ UnresolvedRelation(_, _, false) =>
998+
lookupRelation(u.multipartIdentifier, u.options, false).getOrElse(u)
981999
case other => other
9821000
}
9831001

@@ -988,7 +1006,8 @@ class Analyzer(
9881006
}
9891007

9901008
case u: UnresolvedRelation =>
991-
lookupRelation(u.multipartIdentifier, u.options).map(resolveViews).getOrElse(u)
1009+
lookupRelation(u.multipartIdentifier, u.options, u.isStreaming)
1010+
.map(resolveViews).getOrElse(u)
9921011

9931012
case u @ UnresolvedTable(identifier) =>
9941013
lookupTableOrView(identifier).map {
@@ -1020,16 +1039,40 @@ class Analyzer(
10201039
// 3) If a v1 table is found, create a v1 relation. Otherwise, create a v2 relation.
10211040
private def lookupRelation(
10221041
identifier: Seq[String],
1023-
options: CaseInsensitiveStringMap): Option[LogicalPlan] = {
1042+
options: CaseInsensitiveStringMap,
1043+
isStreaming: Boolean): Option[LogicalPlan] = {
10241044
expandRelationName(identifier) match {
10251045
case SessionCatalogAndIdentifier(catalog, ident) =>
10261046
lazy val loaded = CatalogV2Util.loadTable(catalog, ident).map {
10271047
case v1Table: V1Table =>
1028-
v1SessionCatalog.getRelation(v1Table.v1Table, options)
1048+
if (isStreaming) {
1049+
if (v1Table.v1Table.tableType == CatalogTableType.VIEW) {
1050+
throw new AnalysisException(s"${identifier.quoted} is a permanent view, " +
1051+
"which is not supported by streaming reading API such as " +
1052+
"`DataStreamReader.table` yet.")
1053+
}
1054+
SubqueryAlias(
1055+
catalog.name +: ident.asMultipartIdentifier,
1056+
UnresolvedCatalogRelation(v1Table.v1Table, options, isStreaming = true))
1057+
} else {
1058+
v1SessionCatalog.getRelation(v1Table.v1Table, options)
1059+
}
10291060
case table =>
1030-
SubqueryAlias(
1031-
catalog.name +: ident.asMultipartIdentifier,
1032-
DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
1061+
if (isStreaming) {
1062+
val v1Fallback = table match {
1063+
case withFallback: V2TableWithV1Fallback =>
1064+
Some(UnresolvedCatalogRelation(withFallback.v1Table, isStreaming = true))
1065+
case _ => None
1066+
}
1067+
SubqueryAlias(
1068+
catalog.name +: ident.asMultipartIdentifier,
1069+
StreamingRelationV2(None, table.name, table, options, table.schema.toAttributes,
1070+
Some(catalog), Some(ident), v1Fallback))
1071+
} else {
1072+
SubqueryAlias(
1073+
catalog.name +: ident.asMultipartIdentifier,
1074+
DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
1075+
}
10331076
}
10341077
val key = catalog.name +: ident.namespace :+ ident.name
10351078
AnalysisContext.get.relationCache.get(key).map(_.transform {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
171171
plan: LogicalPlan,
172172
cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan =
173173
plan resolveOperatorsUp {
174-
case u @ UnresolvedRelation(Seq(table), _) =>
174+
case u @ UnresolvedRelation(Seq(table), _, _) =>
175175
cteRelations.find(r => plan.conf.resolver(r._1, table)).map(_._2).getOrElse(u)
176176

177177
case other =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,15 @@ object ResolveHints {
105105

106106
val newNode = CurrentOrigin.withOrigin(plan.origin) {
107107
plan match {
108-
case ResolvedHint(u @ UnresolvedRelation(ident, _), hint)
108+
case ResolvedHint(u @ UnresolvedRelation(ident, _, _), hint)
109109
if matchedIdentifierInHint(ident) =>
110110
ResolvedHint(u, createHintInfo(hintName).merge(hint, hintErrorHandler))
111111

112112
case ResolvedHint(r: SubqueryAlias, hint)
113113
if matchedIdentifierInHint(extractIdentifier(r)) =>
114114
ResolvedHint(r, createHintInfo(hintName).merge(hint, hintErrorHandler))
115115

116-
case UnresolvedRelation(ident, _) if matchedIdentifierInHint(ident) =>
116+
case UnresolvedRelation(ident, _, _) if matchedIdentifierInHint(ident) =>
117117
ResolvedHint(plan, createHintInfo(hintName))
118118

119119
case r: SubqueryAlias if matchedIdentifierInHint(extractIdentifier(r)) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
4545
*/
4646
case class UnresolvedRelation(
4747
multipartIdentifier: Seq[String],
48-
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty())
48+
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(),
49+
override val isStreaming: Boolean = false)
4950
extends LeafNode with NamedRelation {
5051
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
5152

@@ -60,6 +61,14 @@ case class UnresolvedRelation(
6061
}
6162

6263
object UnresolvedRelation {
64+
def apply(
65+
tableIdentifier: TableIdentifier,
66+
extraOptions: CaseInsensitiveStringMap,
67+
isStreaming: Boolean): UnresolvedRelation = {
68+
UnresolvedRelation(
69+
tableIdentifier.database.toSeq :+ tableIdentifier.table, extraOptions, isStreaming)
70+
}
71+
6372
def apply(tableIdentifier: TableIdentifier): UnresolvedRelation =
6473
UnresolvedRelation(tableIdentifier.database.toSeq :+ tableIdentifier.table)
6574
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,8 @@ object CatalogTypes {
643643
*/
644644
case class UnresolvedCatalogRelation(
645645
tableMeta: CatalogTable,
646-
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()) extends LeafNode {
646+
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(),
647+
override val isStreaming: Boolean = false) extends LeafNode {
647648
assert(tableMeta.identifier.database.isDefined)
648649
override lazy val resolved: Boolean = false
649650
override def output: Seq[Attribute] = Nil

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.streaming
2020
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2121
import org.apache.spark.sql.catalyst.expressions.Attribute
2222
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
23-
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
23+
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableProvider}
2424
import org.apache.spark.sql.internal.SQLConf
2525
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2626

@@ -37,6 +37,8 @@ case class StreamingRelationV2(
3737
table: Table,
3838
extraOptions: CaseInsensitiveStringMap,
3939
output: Seq[Attribute],
40+
catalog: Option[CatalogPlugin],
41+
identifier: Option[Identifier],
4042
v1Relation: Option[LogicalPlan])
4143
extends LeafNode with MultiInstanceRelation {
4244
override lazy val resolved = v1Relation.forall(_.resolved)

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,11 @@ private[sql] case class V1Table(v1Table: CatalogTable) extends Table {
8080

8181
override def toString: String = s"V1Table($name)"
8282
}
83+
84+
/**
85+
* A V2 table with V1 fallback support. This is used to fallback to V1 table when the V2 one
86+
* doesn't implement specific capabilities but V1 already has.
87+
*/
88+
private[sql] trait V2TableWithV1Fallback extends Table {
89+
def v1Table: CatalogTable
90+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ case class CreateViewCommand(
174174
def verify(child: LogicalPlan) {
175175
child.collect {
176176
// Disallow creating permanent views based on temporary views.
177-
case UnresolvedRelation(nameParts, _) if catalog.isTempView(nameParts) =>
177+
case UnresolvedRelation(nameParts, _, _) if catalog.isTempView(nameParts) =>
178178
throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
179179
s"referencing a temporary view ${nameParts.quoted}. " +
180180
"Please create a temp view instead by CREATE TEMP VIEW")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,12 @@ import org.apache.spark.sql.catalyst.expressions._
3737
import org.apache.spark.sql.catalyst.planning.ScanOperation
3838
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project}
3939
import org.apache.spark.sql.catalyst.rules.Rule
40+
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
41+
import org.apache.spark.sql.connector.catalog.SupportsRead
42+
import org.apache.spark.sql.connector.catalog.TableCapability._
4043
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
4144
import org.apache.spark.sql.execution.command._
45+
import org.apache.spark.sql.execution.streaming.StreamingRelation
4246
import org.apache.spark.sql.internal.SQLConf
4347
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
4448
import org.apache.spark.sql.sources._
@@ -260,19 +264,48 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
260264
})
261265
}
262266

267+
private def getStreamingRelation(
268+
table: CatalogTable,
269+
extraOptions: CaseInsensitiveStringMap): StreamingRelation = {
270+
val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, table)
271+
val dataSource = DataSource(
272+
sparkSession,
273+
className = table.provider.get,
274+
userSpecifiedSchema = Some(table.schema),
275+
options = dsOptions)
276+
StreamingRelation(dataSource)
277+
}
278+
279+
263280
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
264-
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options), _, _, _, _)
281+
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false), _, _, _, _)
265282
if DDLUtils.isDatasourceTable(tableMeta) =>
266283
i.copy(table = readDataSourceTable(tableMeta, options))
267284

268-
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _), _, _, _, _) =>
285+
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _, _) =>
269286
i.copy(table = DDLUtils.readHiveTable(tableMeta))
270287

271-
case UnresolvedCatalogRelation(tableMeta, options) if DDLUtils.isDatasourceTable(tableMeta) =>
288+
case UnresolvedCatalogRelation(tableMeta, options, false)
289+
if DDLUtils.isDatasourceTable(tableMeta) =>
272290
readDataSourceTable(tableMeta, options)
273291

274-
case UnresolvedCatalogRelation(tableMeta, _) =>
292+
case UnresolvedCatalogRelation(tableMeta, _, false) =>
275293
DDLUtils.readHiveTable(tableMeta)
294+
295+
case UnresolvedCatalogRelation(tableMeta, extraOptions, true) =>
296+
getStreamingRelation(tableMeta, extraOptions)
297+
298+
case s @ StreamingRelationV2(
299+
_, _, table, extraOptions, _, _, _, Some(UnresolvedCatalogRelation(tableMeta, _, true))) =>
300+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
301+
val v1Relation = getStreamingRelation(tableMeta, extraOptions)
302+
if (table.isInstanceOf[SupportsRead]
303+
&& table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ)) {
304+
s.copy(v1Relation = Some(v1Relation))
305+
} else {
306+
// Fallback to V1 relation
307+
v1Relation
308+
}
276309
}
277310
}
278311

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class MicroBatchExecution(
9090
StreamingExecutionRelation(source, output)(sparkSession)
9191
})
9292

93-
case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output, v1) =>
93+
case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output, _, _, v1) =>
9494
val dsStr = if (src.nonEmpty) s"[${src.get}]" else ""
9595
val v2Disabled = disabledSources.contains(src.getOrElse(None).getClass.getCanonicalName)
9696
if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {

0 commit comments

Comments
 (0)