From 42b9dcbbe0b893d209167529c64ec69587ef7836 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 26 Apr 2019 18:43:53 +0800 Subject: [PATCH] remove BaseStreamingSource and BaseStreamingSink --- .../sql/kafka010/KafkaSourceProvider.scala | 4 +-- .../kafka010/KafkaMicroBatchSourceSuite.scala | 5 ++-- .../streaming/BaseStreamingSink.java | 27 ----------------- .../streaming/BaseStreamingSource.java | 29 ------------------- .../v2/reader/streaming/SparkDataStream.java | 8 +++-- .../datasources/noop/NoopDataSource.scala | 3 +- .../streaming/MicroBatchExecution.scala | 8 ++--- .../sql/execution/streaming/OffsetSeq.scala | 3 +- .../streaming/ProgressReporter.scala | 21 +++++++------- .../spark/sql/execution/streaming/Sink.scala | 21 +++++++++++++- .../sql/execution/streaming/Source.scala | 20 +++++++++++-- .../execution/streaming/StreamExecution.scala | 9 +++--- .../execution/streaming/StreamProgress.scala | 20 +++++++------ .../streaming/StreamingRelation.scala | 3 +- .../sql/execution/streaming/console.scala | 2 +- .../continuous/ContinuousExecution.scala | 2 +- .../sql/execution/streaming/memory.scala | 24 ++++++++------- .../sources/ForeachWriterTable.scala | 3 +- .../execution/streaming/sources/memory.scala | 4 +-- .../sql/streaming/DataStreamWriter.scala | 4 +-- .../sql/streaming/StreamingQueryManager.scala | 6 ++-- .../sources/RateStreamProviderSuite.scala | 4 +-- .../sources/TextSocketStreamSuite.scala | 4 +-- .../spark/sql/streaming/StreamTest.scala | 5 ++-- .../sources/StreamingDataSourceV2Suite.scala | 6 ++-- 25 files changed, 118 insertions(+), 127 deletions(-) delete mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/streaming/BaseStreamingSink.java delete mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/streaming/BaseStreamingSource.java diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index bb76a30b3881..c27382de2477 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -29,7 +29,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe import org.apache.spark.internal.Logging import org.apache.spark.kafka010.KafkaConfigUpdater import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} -import org.apache.spark.sql.execution.streaming.{BaseStreamingSink, Sink, Source} +import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.TableCapability._ @@ -354,7 +354,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } class KafkaTable(strategy: => ConsumerStrategy) extends Table - with SupportsRead with SupportsWrite with BaseStreamingSink { + with SupportsRead with SupportsWrite { override def name(): String = s"Kafka $strategy" diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index b98f8e97db2e..672c3b556336 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.kafka010.KafkaSourceProvider._ +import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream import org.apache.spark.sql.streaming.{StreamTest, Trigger} import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSQLContext @@ -94,7 +95,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf message: String = "", topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends AddData { - override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { + override def addData(query: Option[StreamExecution]): (SparkDataStream, Offset) = { query match { // Make sure no Spark job is running when deleting a topic case Some(m: MicroBatchExecution) => m.processAllAvailable() @@ -114,7 +115,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf query.nonEmpty, "Cannot add data when there is no query for finding the active kafka source") - val sources: Seq[BaseStreamingSource] = { + val sources: Seq[SparkDataStream] = { query.get.logicalPlan.collect { case StreamingExecutionRelation(source: KafkaSource, _) => source case r: StreamingDataSourceV2Relation if r.stream.isInstanceOf[KafkaMicroBatchStream] || diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/BaseStreamingSink.java b/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/BaseStreamingSink.java deleted file mode 100644 index ac96c2765368..000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/BaseStreamingSink.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming; - -/** - * The shared interface between V1 and V2 streaming sinks. - * - * This is a temporary interface for compatibility during migration. It should not be implemented - * directly, and will be removed in future versions. - */ -public interface BaseStreamingSink { -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/BaseStreamingSource.java b/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/BaseStreamingSource.java deleted file mode 100644 index c44b8af2552f..000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/BaseStreamingSource.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming; - -/** - * The shared interface between V1 streaming sources and V2 streaming readers. - * - * This is a temporary interface for compatibility during migration. It should not be implemented - * directly, and will be removed in future versions. - */ -public interface BaseStreamingSource { - /** Stop this source and free any resources it has allocated. */ - void stop(); -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java index 30f38ce37c40..2068a84fc6bb 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java @@ -18,7 +18,6 @@ package org.apache.spark.sql.sources.v2.reader.streaming; import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.execution.streaming.BaseStreamingSource; /** * The base interface representing a readable data stream in a Spark streaming query. It's @@ -28,7 +27,7 @@ * {@link MicroBatchStream} and {@link ContinuousStream}. */ @Evolving -public interface SparkDataStream extends BaseStreamingSource { +public interface SparkDataStream { /** * Returns the initial offset for a streaming query to start reading from. Note that the @@ -50,4 +49,9 @@ public interface SparkDataStream extends BaseStreamingSource { * equal to `end` and will only request offsets greater than `end` in the future. */ void commit(Offset end); + + /** + * Stop this source and free any resources it has allocated. + */ + void stop(); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala index c8b2f65ca62e..e91e2b48db48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala @@ -23,7 +23,6 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.streaming.BaseStreamingSink import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.writer._ @@ -40,7 +39,7 @@ class NoopDataSource extends TableProvider with DataSourceRegister { override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable } -private[noop] object NoopTable extends Table with SupportsWrite with BaseStreamingSink { +private[noop] object NoopTable extends Table with SupportsWrite { override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder override def name(): String = "noop-table" override def schema(): StructType = new StructType() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index d9fe836b1c49..58c265d0a850 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relat import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchStream, WriteToMicroBatchDataSource} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2._ -import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2, SparkDataStream} import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.Clock @@ -38,7 +38,7 @@ class MicroBatchExecution( name: String, checkpointRoot: String, analyzedPlan: LogicalPlan, - sink: BaseStreamingSink, + sink: Table, trigger: Trigger, triggerClock: Clock, outputMode: OutputMode, @@ -48,7 +48,7 @@ class MicroBatchExecution( sparkSession, name, checkpointRoot, analyzedPlan, sink, trigger, triggerClock, outputMode, deleteCheckpointOnStop) { - @volatile protected var sources: Seq[BaseStreamingSource] = Seq.empty + @volatile protected var sources: Seq[SparkDataStream] = Seq.empty private val triggerExecutor = trigger match { case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) @@ -354,7 +354,7 @@ class MicroBatchExecution( if (isCurrentBatchConstructed) return true // Generate a map from each unique source to the next available offset. - val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map { + val latestOffsets: Map[SparkDataStream, Option[Offset]] = uniqueSources.map { case s: Source => updateStatusMessage(s"Getting offsets from $s") reportTimeTaken("getOffset") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index 73cf355dbe75..0f7ad7517e8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -24,6 +24,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.RuntimeConfig import org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper, StreamingAggregationStateManager} import org.apache.spark.sql.internal.SQLConf.{FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, _} +import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream /** * An ordered collection of offsets, used to track the progress of processing data from one or more @@ -39,7 +40,7 @@ case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMet * This method is typically used to associate a serialized offset with actual sources (which * cannot be serialized). */ - def toStreamProgress(sources: Seq[BaseStreamingSource]): StreamProgress = { + def toStreamProgress(sources: Seq[SparkDataStream]): StreamProgress = { assert(sources.size == offsets.size, s"There are [${offsets.size}] sources in the " + s"checkpoint offsets and now there are [${sources.size}] sources requested by the query. " + s"Cannot continue.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 859c327d757d..6cb75083d0c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -29,7 +29,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalP import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, StreamingDataSourceV2Relation, StreamWriterCommitProgress} -import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream +import org.apache.spark.sql.sources.v2.Table +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, SparkDataStream} import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent import org.apache.spark.util.Clock @@ -44,7 +45,7 @@ import org.apache.spark.util.Clock trait ProgressReporter extends Logging { case class ExecutionStats( - inputRows: Map[BaseStreamingSource, Long], + inputRows: Map[SparkDataStream, Long], stateOperators: Seq[StateOperatorProgress], eventTimeStats: Map[String, String]) @@ -55,10 +56,10 @@ trait ProgressReporter extends Logging { protected def triggerClock: Clock protected def logicalPlan: LogicalPlan protected def lastExecution: QueryExecution - protected def newData: Map[BaseStreamingSource, LogicalPlan] + protected def newData: Map[SparkDataStream, LogicalPlan] protected def sinkCommitProgress: Option[StreamWriterCommitProgress] - protected def sources: Seq[BaseStreamingSource] - protected def sink: BaseStreamingSink + protected def sources: Seq[SparkDataStream] + protected def sink: Table protected def offsetSeqMetadata: OffsetSeqMetadata protected def currentBatchId: Long protected def sparkSession: SparkSession @@ -67,8 +68,8 @@ trait ProgressReporter extends Logging { // Local timestamps and counters. private var currentTriggerStartTimestamp = -1L private var currentTriggerEndTimestamp = -1L - private var currentTriggerStartOffsets: Map[BaseStreamingSource, String] = _ - private var currentTriggerEndOffsets: Map[BaseStreamingSource, String] = _ + private var currentTriggerStartOffsets: Map[SparkDataStream, String] = _ + private var currentTriggerEndOffsets: Map[SparkDataStream, String] = _ // TODO: Restore this from the checkpoint when possible. private var lastTriggerStartTimestamp = -1L @@ -240,9 +241,9 @@ trait ProgressReporter extends Logging { } /** Extract number of input sources for each streaming source in plan */ - private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = { + private def extractSourceToNumInputRows(): Map[SparkDataStream, Long] = { - def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = { + def sumRows(tuples: Seq[(SparkDataStream, Long)]): Map[SparkDataStream, Long] = { tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source } @@ -262,7 +263,7 @@ trait ProgressReporter extends Logging { val sourceToInputRowsTuples = lastExecution.executedPlan.collect { case s: MicroBatchScanExec => val numRows = s.metrics.get("numOutputRows").map(_.value).getOrElse(0L) - val source = s.stream.asInstanceOf[BaseStreamingSource] + val source = s.stream source -> numRows } logDebug("Source -> # input rows\n\t" + sourceToInputRowsTuples.mkString("\n\t")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala index 34bc085d920c..190325fb7ec2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala @@ -17,14 +17,21 @@ package org.apache.spark.sql.execution.streaming +import java.util + import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.sources.v2.{Table, TableCapability} +import org.apache.spark.sql.types.StructType /** * An interface for systems that can collect the results of a streaming query. In order to preserve * exactly once semantics a sink must be idempotent in the face of multiple attempts to add the same * batch. + * + * Note that, we extends `Table` here, to make the v1 streaming sink API be compatible with + * data source v2. */ -trait Sink extends BaseStreamingSink { +trait Sink extends Table { /** * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if @@ -38,4 +45,16 @@ trait Sink extends BaseStreamingSink { * after data is consumed by sink successfully. */ def addBatch(batchId: Long, data: DataFrame): Unit + + override def name: String = { + throw new IllegalStateException("should not be called.") + } + + override def schema: StructType = { + throw new IllegalStateException("should not be called.") + } + + override def capabilities: util.Set[TableCapability] = { + throw new IllegalStateException("should not be called.") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index dbbd59e06909..7f66d0b055cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -18,14 +18,19 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2} +import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream import org.apache.spark.sql.types.StructType /** * A source of continually arriving data for a streaming query. A [[Source]] must have a * monotonically increasing notion of progress that can be represented as an [[Offset]]. Spark * will regularly query each [[Source]] to see if any more data is available. + * + * Note that, we extends `SparkDataStream` here, to make the v1 streaming source API be compatible + * with data source v2. */ -trait Source extends BaseStreamingSource { +trait Source extends SparkDataStream { /** Returns the schema of the data from this source */ def schema: StructType @@ -62,6 +67,15 @@ trait Source extends BaseStreamingSource { */ def commit(end: Offset) : Unit = {} - /** Stop this source and free any resources it has allocated. */ - def stop(): Unit + override def initialOffset(): OffsetV2 = { + throw new IllegalStateException("should not be called.") + } + + override def deserializeOffset(json: String): OffsetV2 = { + throw new IllegalStateException("should not be called.") + } + + override def commit(end: OffsetV2): Unit = { + throw new IllegalStateException("should not be called.") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index fd959619650e..5d66b61ae711 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -40,7 +40,8 @@ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.StreamingExplainCommand import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.SupportsWrite +import org.apache.spark.sql.sources.v2.{SupportsWrite, Table} +import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream import org.apache.spark.sql.sources.v2.writer.SupportsTruncate import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite import org.apache.spark.sql.streaming._ @@ -69,7 +70,7 @@ abstract class StreamExecution( override val name: String, private val checkpointRoot: String, analyzedPlan: LogicalPlan, - val sink: BaseStreamingSink, + val sink: Table, val trigger: Trigger, val triggerClock: Clock, val outputMode: OutputMode, @@ -205,7 +206,7 @@ abstract class StreamExecution( /** * A list of unique sources in the query plan. This will be set when generating logical plan. */ - @volatile protected var uniqueSources: Seq[BaseStreamingSource] = Seq.empty + @volatile protected var uniqueSources: Seq[SparkDataStream] = Seq.empty /** Defines the internal state of execution */ protected val state = new AtomicReference[State](INITIALIZING) @@ -214,7 +215,7 @@ abstract class StreamExecution( var lastExecution: IncrementalExecution = _ /** Holds the most recent input data for each source. */ - protected var newData: Map[BaseStreamingSource, LogicalPlan] = _ + protected var newData: Map[SparkDataStream, LogicalPlan] = _ @volatile protected var streamDeathCause: StreamingQueryException = null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala index 8531070b1bc4..8a1d064f49d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -19,32 +19,34 @@ package org.apache.spark.sql.execution.streaming import scala.collection.{immutable, GenTraversableOnce} +import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream + /** * A helper class that looks like a Map[Source, Offset]. */ class StreamProgress( - val baseMap: immutable.Map[BaseStreamingSource, Offset] = - new immutable.HashMap[BaseStreamingSource, Offset]) - extends scala.collection.immutable.Map[BaseStreamingSource, Offset] { + val baseMap: immutable.Map[SparkDataStream, Offset] = + new immutable.HashMap[SparkDataStream, Offset]) + extends scala.collection.immutable.Map[SparkDataStream, Offset] { - def toOffsetSeq(source: Seq[BaseStreamingSource], metadata: OffsetSeqMetadata): OffsetSeq = { + def toOffsetSeq(source: Seq[SparkDataStream], metadata: OffsetSeqMetadata): OffsetSeq = { OffsetSeq(source.map(get), Some(metadata)) } override def toString: String = baseMap.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}") - override def +[B1 >: Offset](kv: (BaseStreamingSource, B1)): Map[BaseStreamingSource, B1] = { + override def +[B1 >: Offset](kv: (SparkDataStream, B1)): Map[SparkDataStream, B1] = { baseMap + kv } - override def get(key: BaseStreamingSource): Option[Offset] = baseMap.get(key) + override def get(key: SparkDataStream): Option[Offset] = baseMap.get(key) - override def iterator: Iterator[(BaseStreamingSource, Offset)] = baseMap.iterator + override def iterator: Iterator[(SparkDataStream, Offset)] = baseMap.iterator - override def -(key: BaseStreamingSource): Map[BaseStreamingSource, Offset] = baseMap - key + override def -(key: SparkDataStream): Map[SparkDataStream, Offset] = baseMap - key - def ++(updates: GenTraversableOnce[(BaseStreamingSource, Offset)]): StreamProgress = { + def ++(updates: GenTraversableOnce[(SparkDataStream, Offset)]): StreamProgress = { new StreamProgress(baseMap ++ updates) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala index 0d7e9ba363d0..142b6e7d1806 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Stati import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.sources.v2.{Table, TableProvider} +import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream import org.apache.spark.sql.util.CaseInsensitiveStringMap object StreamingRelation { @@ -63,7 +64,7 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output: * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. */ case class StreamingExecutionRelation( - source: BaseStreamingSource, + source: SparkDataStream, output: Seq[Attribute])(session: SparkSession) extends LeafNode with MultiInstanceRelation { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index c7161d311c02..9ae39c79c515 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -61,7 +61,7 @@ class ConsoleSinkProvider extends TableProvider def shortName(): String = "console" } -object ConsoleTable extends Table with SupportsWrite with BaseStreamingSink { +object ConsoleTable extends Table with SupportsWrite { override def name(): String = "console" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index a1fb212fd433..82708a331b0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -49,7 +49,7 @@ class ContinuousExecution( extraOptions: Map[String, String], deleteCheckpointOnStop: Boolean) extends StreamExecution( - sparkSession, name, checkpointRoot, analyzedPlan, sink.asInstanceOf[BaseStreamingSink], + sparkSession, name, checkpointRoot, analyzedPlan, sink, trigger, triggerClock, outputMode, deleteCheckpointOnStop) { @volatile protected var sources: Seq[ContinuousStream] = Seq() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 6efde0a27efe..022c8da0c074 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream, Offset => OffsetV2} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream, Offset => OffsetV2, SparkDataStream} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -49,7 +49,7 @@ object MemoryStream { /** * A base class for memory stream implementations. Supports adding data and resetting. */ -abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends BaseStreamingSource { +abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends SparkDataStream { val encoder = encoderFor[A] protected val attributes = encoder.schema.toAttributes @@ -78,6 +78,18 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Bas } def addData(data: TraversableOnce[A]): Offset + + override def initialOffset(): OffsetV2 = { + throw new IllegalStateException("should not be called.") + } + + override def deserializeOffset(json: String): OffsetV2 = { + throw new IllegalStateException("should not be called.") + } + + override def commit(end: OffsetV2): Unit = { + throw new IllegalStateException("should not be called.") + } } // This class is used to indicate the memory stream data source. We don't actually use it, as @@ -264,11 +276,3 @@ object MemoryStreamReaderFactory extends PartitionReaderFactory { } } } - -/** A common trait for MemorySinks with methods used for testing */ -trait MemorySinkBase extends BaseStreamingSink { - def allData: Seq[Row] - def latestBatchData: Seq[Row] - def dataSinceBatch(sinceBatchId: Long): Seq[Row] - def latestBatchId: Option[Long] -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala index 838ede6c563f..6da1b3a49c44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.python.PythonForeachWriter -import org.apache.spark.sql.execution.streaming.BaseStreamingSink import org.apache.spark.sql.sources.v2.{SupportsWrite, Table, TableCapability} import org.apache.spark.sql.sources.v2.writer.{DataWriter, SupportsTruncate, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} @@ -44,7 +43,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap case class ForeachWriterTable[T]( writer: ForeachWriter[T], converter: Either[ExpressionEncoder[T], InternalRow => T]) - extends Table with SupportsWrite with BaseStreamingSink { + extends Table with SupportsWrite { override def name(): String = "ForeachSink" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala index 9008c63491cb..de8d00d4ac34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils -import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink} +import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.sources.v2.{SupportsWrite, Table, TableCapability} import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} @@ -43,7 +43,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit * tests and does not provide durability. */ -class MemorySink extends Table with SupportsWrite with MemorySinkBase with Logging { +class MemorySink extends Table with SupportsWrite with Logging { override def name(): String = "MemorySink" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 2f12efe04c50..d051cf9c1d4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -311,7 +311,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ provider.getTable(dsOptions) match { case table: SupportsWrite if table.supports(STREAMING_WRITE) => - table.asInstanceOf[BaseStreamingSink] + table case _ => createV1Sink() } } else { @@ -331,7 +331,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { } } - private def createV1Sink(): BaseStreamingSink = { + private def createV1Sink(): Sink = { val ds = DataSource( df.sparkSession, className = source, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 5a08049ab55c..1705d5624409 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS -import org.apache.spark.sql.sources.v2.SupportsWrite +import org.apache.spark.sql.sources.v2.{SupportsWrite, Table} import org.apache.spark.util.{Clock, SystemClock, Utils} /** @@ -209,7 +209,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo userSpecifiedCheckpointLocation: Option[String], df: DataFrame, extraOptions: Map[String, String], - sink: BaseStreamingSink, + sink: Table, outputMode: OutputMode, useTempCheckpointLocation: Boolean, recoverFromCheckpointLocation: Boolean, @@ -315,7 +315,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo userSpecifiedCheckpointLocation: Option[String], df: DataFrame, extraOptions: Map[String, String], - sink: BaseStreamingSink, + sink: Table, outputMode: OutputMode, useTempCheckpointLocation: Boolean = false, recoverFromCheckpointLocation: Boolean = true, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala index c04f6e3f255c..883201b5fb47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relati import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.functions._ -import org.apache.spark.sql.sources.v2.reader.streaming.Offset +import org.apache.spark.sql.sources.v2.reader.streaming.{Offset, SparkDataStream} import org.apache.spark.sql.streaming.StreamTest import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.ManualClock @@ -39,7 +39,7 @@ class RateStreamProviderSuite extends StreamTest { import testImplicits._ case class AdvanceRateManualClock(seconds: Long) extends AddData { - override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { + override def addData(query: Option[StreamExecution]): (SparkDataStream, Offset) = { assert(query.nonEmpty) val rateSource = query.get.logicalPlan.collect { case r: StreamingDataSourceV2Relation diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala index 22f84376ecb5..fd3c31fbbacf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relati import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.reader.streaming.Offset +import org.apache.spark.sql.sources.v2.reader.streaming.{Offset, SparkDataStream} import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -55,7 +55,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before private var serverThread: ServerThread = null case class AddSocketData(data: String*) extends AddData { - override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { + override def addData(query: Option[StreamExecution]): (SparkDataStream, Offset) = { require( query.nonEmpty, "Cannot add data when there is no query for finding the active socket source") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 900098a5ef61..89c62ba9fc82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, EpochCoordinatorRef, IncrementAndGetEpoch} import org.apache.spark.sql.execution.streaming.sources.MemorySink import org.apache.spark.sql.execution.streaming.state.StateStore +import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream import org.apache.spark.sql.streaming.StreamingQueryListener._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.{Clock, SystemClock, Utils} @@ -123,7 +124,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be * the active query, and then return the source object the data was added, as well as the * offset of added data. */ - def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) + def addData(query: Option[StreamExecution]): (SparkDataStream, Offset) } /** A trait that can be extended when testing a source. */ @@ -134,7 +135,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be case class AddDataMemory[A](source: MemoryStreamBase[A], data: Seq[A]) extends AddData { override def toString: String = s"AddData to $source: ${data.mkString(",")}" - override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { + override def addData(query: Option[StreamExecution]): (SparkDataStream, Offset) = { (source, source.addData(data)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 25a68e4f9a57..7b2c1a56e8ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -22,9 +22,9 @@ import java.util.Collections import scala.collection.JavaConverters._ -import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} +import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.execution.streaming.{BaseStreamingSink, RateStreamOffset, Sink, StreamingQueryWrapper} +import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, StreamingQueryWrapper} import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} @@ -80,7 +80,7 @@ class FakeWriteBuilder extends WriteBuilder with StreamingWrite { } } -trait FakeStreamingWriteTable extends Table with SupportsWrite with BaseStreamingSink { +trait FakeStreamingWriteTable extends Table with SupportsWrite { override def name(): String = "fake" override def schema(): StructType = StructType(Seq()) override def capabilities(): util.Set[TableCapability] = {