Skip to content

Commit 85fd552

Browse files
committed
[SPARK-27190][SQL] add table capability for streaming
## What changes were proposed in this pull request? This is a followup of #24012 , to add the corresponding capabilities for streaming. ## How was this patch tested? existing tests Closes #24129 from cloud-fan/capability. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 2234667 commit 85fd552

File tree

24 files changed

+389
-231
lines changed

24 files changed

+389
-231
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.kafka010
1919

2020
import java.{util => ju}
21-
import java.util.{Collections, Locale, UUID}
21+
import java.util.{Locale, UUID}
2222

2323
import scala.collection.JavaConverters._
2424

@@ -29,9 +29,10 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.kafka010.KafkaConfigUpdater
3131
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
32-
import org.apache.spark.sql.execution.streaming.{Sink, Source}
32+
import org.apache.spark.sql.execution.streaming.{BaseStreamingSink, Sink, Source}
3333
import org.apache.spark.sql.sources._
3434
import org.apache.spark.sql.sources.v2._
35+
import org.apache.spark.sql.sources.v2.TableCapability._
3536
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
3637
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
3738
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
@@ -353,13 +354,15 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
353354
}
354355

355356
class KafkaTable(strategy: => ConsumerStrategy) extends Table
356-
with SupportsMicroBatchRead with SupportsContinuousRead with SupportsStreamingWrite {
357+
with SupportsRead with SupportsWrite with BaseStreamingSink {
357358

358359
override def name(): String = s"Kafka $strategy"
359360

360361
override def schema(): StructType = KafkaOffsetReader.kafkaSchema
361362

362-
override def capabilities(): ju.Set[TableCapability] = Collections.emptySet()
363+
override def capabilities(): ju.Set[TableCapability] = {
364+
Set(MICRO_BATCH_READ, CONTINUOUS_READ, STREAMING_WRITE).asJava
365+
}
363366

364367
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
365368
() => new KafkaScan(options)

sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java

Lines changed: 0 additions & 35 deletions
This file was deleted.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java

Lines changed: 0 additions & 35 deletions
This file was deleted.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ public enum TableCapability {
3333
*/
3434
BATCH_READ,
3535

36+
/**
37+
* Signals that the table supports reads in micro-batch streaming execution mode.
38+
*/
39+
MICRO_BATCH_READ,
40+
41+
/**
42+
* Signals that the table supports reads in continuous streaming execution mode.
43+
*/
44+
CONTINUOUS_READ,
45+
3646
/**
3747
* Signals that the table supports append writes in batch execution mode.
3848
* <p>
@@ -42,6 +52,15 @@ public enum TableCapability {
4252
*/
4353
BATCH_WRITE,
4454

55+
/**
56+
* Signals that the table supports append writes in streaming execution mode.
57+
* <p>
58+
* Tables that return this capability must support appending data and may also support additional
59+
* write modes, like {@link #TRUNCATE}, {@link #OVERWRITE_BY_FILTER}, and
60+
* {@link #OVERWRITE_DYNAMIC}.
61+
*/
62+
STREAMING_WRITE,
63+
4564
/**
4665
* Signals that the table can be truncated in a write operation.
4766
* <p>

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousStream;
2222
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream;
2323
import org.apache.spark.sql.types.StructType;
24-
import org.apache.spark.sql.sources.v2.SupportsContinuousRead;
25-
import org.apache.spark.sql.sources.v2.SupportsMicroBatchRead;
2624
import org.apache.spark.sql.sources.v2.Table;
2725
import org.apache.spark.sql.sources.v2.TableCapability;
2826

@@ -74,8 +72,8 @@ default Batch toBatch() {
7472
/**
7573
* Returns the physical representation of this scan for streaming query with micro-batch mode. By
7674
* default this method throws exception, data sources must overwrite this method to provide an
77-
* implementation, if the {@link Table} that creates this scan implements
78-
* {@link SupportsMicroBatchRead}.
75+
* implementation, if the {@link Table} that creates this scan returns
76+
* {@link TableCapability#MICRO_BATCH_READ} support in its {@link Table#capabilities()}.
7977
*
8078
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
8179
* recovery. Data streams for the same logical source in the same query
@@ -90,8 +88,8 @@ default MicroBatchStream toMicroBatchStream(String checkpointLocation) {
9088
/**
9189
* Returns the physical representation of this scan for streaming query with continuous mode. By
9290
* default this method throws exception, data sources must overwrite this method to provide an
93-
* implementation, if the {@link Table} that creates this scan implements
94-
* {@link SupportsContinuousRead}.
91+
* implementation, if the {@link Table} that creates this scan returns
92+
* {@link TableCapability#CONTINUOUS_READ} support in its {@link Table#capabilities()}.
9593
*
9694
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
9795
* recovery. Data streams for the same logical source in the same query

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
2323

2424
import org.apache.spark.sql.SaveMode
2525
import org.apache.spark.sql.catalyst.InternalRow
26+
import org.apache.spark.sql.execution.streaming.BaseStreamingSink
2627
import org.apache.spark.sql.sources.DataSourceRegister
2728
import org.apache.spark.sql.sources.v2._
2829
import org.apache.spark.sql.sources.v2.writer._
@@ -39,11 +40,13 @@ class NoopDataSource extends TableProvider with DataSourceRegister {
3940
override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable
4041
}
4142

42-
private[noop] object NoopTable extends Table with SupportsWrite with SupportsStreamingWrite {
43+
private[noop] object NoopTable extends Table with SupportsWrite with BaseStreamingSink {
4344
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder
4445
override def name(): String = "noop-table"
4546
override def schema(): StructType = new StructType()
46-
override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_WRITE).asJava
47+
override def capabilities(): util.Set[TableCapability] = {
48+
Set(TableCapability.BATCH_WRITE, TableCapability.STREAMING_WRITE).asJava
49+
}
4750
}
4851

4952
private[noop] object NoopWriteBuilder extends WriteBuilder
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.v2
19+
20+
import org.apache.spark.sql.AnalysisException
21+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22+
import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
23+
import org.apache.spark.sql.sources.v2.TableCapability.{CONTINUOUS_READ, MICRO_BATCH_READ}
24+
25+
/**
26+
* This rules adds some basic table capability check for streaming scan, without knowing the actual
27+
* streaming execution mode.
28+
*/
29+
object V2StreamingScanSupportCheck extends (LogicalPlan => Unit) {
30+
import DataSourceV2Implicits._
31+
32+
override def apply(plan: LogicalPlan): Unit = {
33+
plan.foreach {
34+
case r: StreamingRelationV2 if !r.table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) =>
35+
throw new AnalysisException(
36+
s"Table ${r.table.name()} does not support either micro-batch or continuous scan.")
37+
case _ =>
38+
}
39+
40+
val streamingSources = plan.collect {
41+
case r: StreamingRelationV2 => r.table
42+
}
43+
val v1StreamingRelations = plan.collect {
44+
case r: StreamingRelation => r
45+
}
46+
47+
if (streamingSources.length + v1StreamingRelations.length > 1) {
48+
val allSupportsMicroBatch = streamingSources.forall(_.supports(MICRO_BATCH_READ))
49+
// v1 streaming data source only supports micro-batch.
50+
val allSupportsContinuous = streamingSources.forall(_.supports(CONTINUOUS_READ)) &&
51+
v1StreamingRelations.isEmpty
52+
if (!allSupportsMicroBatch && !allSupportsContinuous) {
53+
val microBatchSources =
54+
streamingSources.filter(_.supports(MICRO_BATCH_READ)).map(_.name()) ++
55+
v1StreamingRelations.map(_.sourceName)
56+
val continuousSources = streamingSources.filter(_.supports(CONTINUOUS_READ)).map(_.name())
57+
throw new AnalysisException(
58+
"The streaming sources in a query do not have a common supported execution mode.\n" +
59+
"Sources support micro-batch: " + microBatchSources.mkString(", ") + "\n" +
60+
"Sources support continuous: " + continuousSources.mkString(", "))
61+
}
62+
}
63+
}
64+
}

0 commit comments

Comments
 (0)