Skip to content

Commit 6ef4530

Browse files
committed
[SPARK-27579][SQL] remove BaseStreamingSource and BaseStreamingSink
## What changes were proposed in this pull request? `BaseStreamingSource` and `BaseStreamingSink` is used to unify v1 and v2 streaming data source API in some code paths. This PR removes these 2 interfaces, and let the v1 API extend v2 API to keep API compatibility. The motivation is #24416 . We want to move data source v2 to catalyst module, but `BaseStreamingSource` and `BaseStreamingSink` are in sql/core. ## How was this patch tested? existing tests Closes #24471 from cloud-fan/streaming. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 4b725e5 commit 6ef4530

File tree

25 files changed

+118
-127
lines changed

25 files changed

+118
-127
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.kafka010.KafkaConfigUpdater
3131
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
32-
import org.apache.spark.sql.execution.streaming.{BaseStreamingSink, Sink, Source}
32+
import org.apache.spark.sql.execution.streaming.{Sink, Source}
3333
import org.apache.spark.sql.sources._
3434
import org.apache.spark.sql.sources.v2._
3535
import org.apache.spark.sql.sources.v2.TableCapability._
@@ -354,7 +354,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
354354
}
355355

356356
class KafkaTable(strategy: => ConsumerStrategy) extends Table
357-
with SupportsRead with SupportsWrite with BaseStreamingSink {
357+
with SupportsRead with SupportsWrite {
358358

359359
override def name(): String = s"Kafka $strategy"
360360

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
4141
import org.apache.spark.sql.functions.{count, window}
4242
import org.apache.spark.sql.internal.SQLConf
4343
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
44+
import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream
4445
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
4546
import org.apache.spark.sql.streaming.util.StreamManualClock
4647
import org.apache.spark.sql.test.SharedSQLContext
@@ -94,7 +95,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
9495
message: String = "",
9596
topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends AddData {
9697

97-
override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
98+
override def addData(query: Option[StreamExecution]): (SparkDataStream, Offset) = {
9899
query match {
99100
// Make sure no Spark job is running when deleting a topic
100101
case Some(m: MicroBatchExecution) => m.processAllAvailable()
@@ -114,7 +115,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
114115
query.nonEmpty,
115116
"Cannot add data when there is no query for finding the active kafka source")
116117

117-
val sources: Seq[BaseStreamingSource] = {
118+
val sources: Seq[SparkDataStream] = {
118119
query.get.logicalPlan.collect {
119120
case StreamingExecutionRelation(source: KafkaSource, _) => source
120121
case r: StreamingDataSourceV2Relation if r.stream.isInstanceOf[KafkaMicroBatchStream] ||

sql/core/src/main/java/org/apache/spark/sql/execution/streaming/BaseStreamingSink.java

Lines changed: 0 additions & 27 deletions
This file was deleted.

sql/core/src/main/java/org/apache/spark/sql/execution/streaming/BaseStreamingSource.java

Lines changed: 0 additions & 29 deletions
This file was deleted.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.sources.v2.reader.streaming;
1919

2020
import org.apache.spark.annotation.Evolving;
21-
import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
2221

2322
/**
2423
* The base interface representing a readable data stream in a Spark streaming query. It's
@@ -28,7 +27,7 @@
2827
* {@link MicroBatchStream} and {@link ContinuousStream}.
2928
*/
3029
@Evolving
31-
public interface SparkDataStream extends BaseStreamingSource {
30+
public interface SparkDataStream {
3231

3332
/**
3433
* Returns the initial offset for a streaming query to start reading from. Note that the
@@ -50,4 +49,9 @@ public interface SparkDataStream extends BaseStreamingSource {
5049
* equal to `end` and will only request offsets greater than `end` in the future.
5150
*/
5251
void commit(Offset end);
52+
53+
/**
54+
* Stop this source and free any resources it has allocated.
55+
*/
56+
void stop();
5357
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
2323

2424
import org.apache.spark.sql.SaveMode
2525
import org.apache.spark.sql.catalyst.InternalRow
26-
import org.apache.spark.sql.execution.streaming.BaseStreamingSink
2726
import org.apache.spark.sql.sources.DataSourceRegister
2827
import org.apache.spark.sql.sources.v2._
2928
import org.apache.spark.sql.sources.v2.writer._
@@ -40,7 +39,7 @@ class NoopDataSource extends TableProvider with DataSourceRegister {
4039
override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable
4140
}
4241

43-
private[noop] object NoopTable extends Table with SupportsWrite with BaseStreamingSink {
42+
private[noop] object NoopTable extends Table with SupportsWrite {
4443
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder
4544
override def name(): String = "noop-table"
4645
override def schema(): StructType = new StructType()

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relat
2929
import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchStream, WriteToMicroBatchDataSource}
3030
import org.apache.spark.sql.internal.SQLConf
3131
import org.apache.spark.sql.sources.v2._
32-
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2}
32+
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2, SparkDataStream}
3333
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
3434
import org.apache.spark.util.Clock
3535

@@ -38,7 +38,7 @@ class MicroBatchExecution(
3838
name: String,
3939
checkpointRoot: String,
4040
analyzedPlan: LogicalPlan,
41-
sink: BaseStreamingSink,
41+
sink: Table,
4242
trigger: Trigger,
4343
triggerClock: Clock,
4444
outputMode: OutputMode,
@@ -48,7 +48,7 @@ class MicroBatchExecution(
4848
sparkSession, name, checkpointRoot, analyzedPlan, sink,
4949
trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
5050

51-
@volatile protected var sources: Seq[BaseStreamingSource] = Seq.empty
51+
@volatile protected var sources: Seq[SparkDataStream] = Seq.empty
5252

5353
private val triggerExecutor = trigger match {
5454
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
@@ -354,7 +354,7 @@ class MicroBatchExecution(
354354
if (isCurrentBatchConstructed) return true
355355

356356
// Generate a map from each unique source to the next available offset.
357-
val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
357+
val latestOffsets: Map[SparkDataStream, Option[Offset]] = uniqueSources.map {
358358
case s: Source =>
359359
updateStatusMessage(s"Getting offsets from $s")
360360
reportTimeTaken("getOffset") {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.internal.Logging
2424
import org.apache.spark.sql.RuntimeConfig
2525
import org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper, StreamingAggregationStateManager}
2626
import org.apache.spark.sql.internal.SQLConf.{FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, _}
27+
import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream
2728

2829
/**
2930
* An ordered collection of offsets, used to track the progress of processing data from one or more
@@ -39,7 +40,7 @@ case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMet
3940
* This method is typically used to associate a serialized offset with actual sources (which
4041
* cannot be serialized).
4142
*/
42-
def toStreamProgress(sources: Seq[BaseStreamingSource]): StreamProgress = {
43+
def toStreamProgress(sources: Seq[SparkDataStream]): StreamProgress = {
4344
assert(sources.size == offsets.size, s"There are [${offsets.size}] sources in the " +
4445
s"checkpoint offsets and now there are [${sources.size}] sources requested by the query. " +
4546
s"Cannot continue.")

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalP
2929
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
3030
import org.apache.spark.sql.execution.QueryExecution
3131
import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, StreamingDataSourceV2Relation, StreamWriterCommitProgress}
32-
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream
32+
import org.apache.spark.sql.sources.v2.Table
33+
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, SparkDataStream}
3334
import org.apache.spark.sql.streaming._
3435
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
3536
import org.apache.spark.util.Clock
@@ -44,7 +45,7 @@ import org.apache.spark.util.Clock
4445
trait ProgressReporter extends Logging {
4546

4647
case class ExecutionStats(
47-
inputRows: Map[BaseStreamingSource, Long],
48+
inputRows: Map[SparkDataStream, Long],
4849
stateOperators: Seq[StateOperatorProgress],
4950
eventTimeStats: Map[String, String])
5051

@@ -55,10 +56,10 @@ trait ProgressReporter extends Logging {
5556
protected def triggerClock: Clock
5657
protected def logicalPlan: LogicalPlan
5758
protected def lastExecution: QueryExecution
58-
protected def newData: Map[BaseStreamingSource, LogicalPlan]
59+
protected def newData: Map[SparkDataStream, LogicalPlan]
5960
protected def sinkCommitProgress: Option[StreamWriterCommitProgress]
60-
protected def sources: Seq[BaseStreamingSource]
61-
protected def sink: BaseStreamingSink
61+
protected def sources: Seq[SparkDataStream]
62+
protected def sink: Table
6263
protected def offsetSeqMetadata: OffsetSeqMetadata
6364
protected def currentBatchId: Long
6465
protected def sparkSession: SparkSession
@@ -67,8 +68,8 @@ trait ProgressReporter extends Logging {
6768
// Local timestamps and counters.
6869
private var currentTriggerStartTimestamp = -1L
6970
private var currentTriggerEndTimestamp = -1L
70-
private var currentTriggerStartOffsets: Map[BaseStreamingSource, String] = _
71-
private var currentTriggerEndOffsets: Map[BaseStreamingSource, String] = _
71+
private var currentTriggerStartOffsets: Map[SparkDataStream, String] = _
72+
private var currentTriggerEndOffsets: Map[SparkDataStream, String] = _
7273
// TODO: Restore this from the checkpoint when possible.
7374
private var lastTriggerStartTimestamp = -1L
7475

@@ -240,9 +241,9 @@ trait ProgressReporter extends Logging {
240241
}
241242

242243
/** Extract number of input sources for each streaming source in plan */
243-
private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = {
244+
private def extractSourceToNumInputRows(): Map[SparkDataStream, Long] = {
244245

245-
def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = {
246+
def sumRows(tuples: Seq[(SparkDataStream, Long)]): Map[SparkDataStream, Long] = {
246247
tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
247248
}
248249

@@ -262,7 +263,7 @@ trait ProgressReporter extends Logging {
262263
val sourceToInputRowsTuples = lastExecution.executedPlan.collect {
263264
case s: MicroBatchScanExec =>
264265
val numRows = s.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
265-
val source = s.stream.asInstanceOf[BaseStreamingSource]
266+
val source = s.stream
266267
source -> numRows
267268
}
268269
logDebug("Source -> # input rows\n\t" + sourceToInputRowsTuples.mkString("\n\t"))

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,21 @@
1717

1818
package org.apache.spark.sql.execution.streaming
1919

20+
import java.util
21+
2022
import org.apache.spark.sql.DataFrame
23+
import org.apache.spark.sql.sources.v2.{Table, TableCapability}
24+
import org.apache.spark.sql.types.StructType
2125

2226
/**
2327
* An interface for systems that can collect the results of a streaming query. In order to preserve
2428
* exactly once semantics a sink must be idempotent in the face of multiple attempts to add the same
2529
* batch.
30+
*
31+
* Note that, we extends `Table` here, to make the v1 streaming sink API be compatible with
32+
* data source v2.
2633
*/
27-
trait Sink extends BaseStreamingSink {
34+
trait Sink extends Table {
2835

2936
/**
3037
* Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
@@ -38,4 +45,16 @@ trait Sink extends BaseStreamingSink {
3845
* after data is consumed by sink successfully.
3946
*/
4047
def addBatch(batchId: Long, data: DataFrame): Unit
48+
49+
override def name: String = {
50+
throw new IllegalStateException("should not be called.")
51+
}
52+
53+
override def schema: StructType = {
54+
throw new IllegalStateException("should not be called.")
55+
}
56+
57+
override def capabilities: util.Set[TableCapability] = {
58+
throw new IllegalStateException("should not be called.")
59+
}
4160
}

0 commit comments

Comments
 (0)