From 7d1df9855dee8b6bce5ebed136b8da4275f178c7 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Mon, 18 Dec 2017 13:34:50 -0800 Subject: [PATCH 1/5] Restore old offset for binary compatibility --- .../spark/sql/sources/v2/reader/Offset.java | 34 +---------------- .../streaming/FileStreamSource.scala | 1 - .../streaming/FileStreamSourceOffset.scala | 2 - .../sql/execution/streaming/LongOffset.scala | 2 - .../streaming/MicroBatchExecution.scala | 1 - .../spark/sql/execution/streaming/Offset.java | 37 +++++++++++++++++++ .../sql/execution/streaming/OffsetSeq.scala | 1 - .../execution/streaming/OffsetSeqLog.scala | 1 - .../streaming/RateSourceProvider.scala | 3 +- .../streaming/RateStreamOffset.scala | 4 +- .../{Offset.scala => SerializedOffset.scala} | 3 -- .../sql/execution/streaming/Source.scala | 1 - .../execution/streaming/StreamExecution.scala | 1 - .../execution/streaming/StreamProgress.scala | 2 - .../ContinuousRateStreamSource.scala | 3 +- .../sql/execution/streaming/memory.scala | 1 - .../sql/execution/streaming/socket.scala | 1 - .../{ => sources}/RateStreamSourceV2.scala | 7 +++- .../streaming/{ => sources}/memoryV2.scala | 3 +- .../streaming/MemorySinkV2Suite.scala | 1 + .../streaming/RateSourceV2Suite.scala | 1 + .../sql/streaming/FileStreamSourceSuite.scala | 1 - .../spark/sql/streaming/OffsetSuite.scala | 3 +- .../spark/sql/streaming/StreamSuite.scala | 1 - .../spark/sql/streaming/StreamTest.scala | 1 - .../streaming/StreamingAggregationSuite.scala | 1 - .../StreamingQueryListenerSuite.scala | 1 - .../sql/streaming/StreamingQuerySuite.scala | 2 - .../test/DataStreamReaderWriterSuite.scala | 1 - .../sql/streaming/util/BlockingSource.scala | 3 +- 30 files changed, 55 insertions(+), 69 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java rename sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/{Offset.scala => SerializedOffset.scala} (95%) rename sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/{ => sources}/RateStreamSourceV2.scala (96%) rename sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/{ => sources}/memoryV2.scala (98%) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java index 1ebd35356f1a3..62839ec665de0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java @@ -23,38 +23,6 @@ * restart checkpoints. Sources should provide an Offset implementation which they can use to * reconstruct the stream position where the offset was taken. */ -public abstract class Offset { - /** - * A JSON-serialized representation of an Offset that is - * used for saving offsets to the offset log. - * Note: We assume that equivalent/equal offsets serialize to - * identical JSON strings. - * - * @return JSON string encoding - */ - public abstract String json(); +public abstract class Offset extends org.apache.spark.sql.execution.streaming.Offset { - /** - * Equality based on JSON string representation. We leverage the - * JSON representation for normalization between the Offset's - * in memory and on disk representations. - */ - @Override - public boolean equals(Object obj) { - if (obj instanceof Offset) { - return this.json().equals(((Offset) obj).json()); - } else { - return false; - } - } - - @Override - public int hashCode() { - return this.json().hashCode(); - } - - @Override - public String toString() { - return this.json(); - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index a33b785126765..0debd7db84757 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -27,7 +27,6 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation} -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.types.StructType /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala index 431e5b99e3e98..a2b49d944a688 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala @@ -22,8 +22,6 @@ import scala.util.control.Exception._ import org.json4s.NoTypeHints import org.json4s.jackson.Serialization -import org.apache.spark.sql.sources.v2.reader.Offset - /** * Offset for the [[FileStreamSource]]. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala index 7ea31462ca7b0..5f0b195fcfcb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.sql.sources.v2.reader.Offset - /** * A simple offset for sources that produce a single linear stream of data. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index a67dda99dc01b..4a3de8bae4bc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.{Clock, Utils} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java new file mode 100644 index 0000000000000..fa1c129654435 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java @@ -0,0 +1,37 @@ +package org.apache.spark.sql.execution.streaming; + +public abstract class Offset { + /** + * A JSON-serialized representation of an Offset that is + * used for saving offsets to the offset log. + * Note: We assume that equivalent/equal offsets serialize to + * identical JSON strings. + * + * @return JSON string encoding + */ + public abstract String json(); + + /** + * Equality based on JSON string representation. We leverage the + * JSON representation for normalization between the Offset's + * in memory and on disk representations. + */ + @Override + public boolean equals(Object obj) { + if (obj instanceof Offset) { + return this.json().equals(((Offset) obj).json()); + } else { + return false; + } + } + + @Override + public int hashCode() { + return this.json().hashCode(); + } + + @Override + public String toString() { + return this.json(); + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index dcc5935890c8d..4e0a468b962a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -23,7 +23,6 @@ import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging import org.apache.spark.sql.RuntimeConfig import org.apache.spark.sql.internal.SQLConf.{SHUFFLE_PARTITIONS, STATE_STORE_PROVIDER_CLASS} -import org.apache.spark.sql.sources.v2.reader.Offset /** * An ordered collection of offsets, used to track the progress of processing data from one or more diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala index bfdbc65296165..e3f4abcf9f1dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala @@ -24,7 +24,6 @@ import java.nio.charset.StandardCharsets._ import scala.io.{Source => IOSource} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.sources.v2.reader.Offset /** * This class is used to log offsets to persistent files in HDFS. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala index 50671a46599e6..41761324cf6ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala @@ -30,9 +30,10 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.execution.streaming.continuous.ContinuousRateStreamReader +import org.apache.spark.sql.execution.streaming.sources.RateStreamV2Reader import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} import org.apache.spark.sql.sources.v2._ -import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, MicroBatchReader, Offset} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, MicroBatchReader} import org.apache.spark.sql.types._ import org.apache.spark.util.{ManualClock, SystemClock} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala index 13679dfbe446b..726d8574af52b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.execution.streaming import org.json4s.DefaultFormats import org.json4s.jackson.Serialization -import org.apache.spark.sql.sources.v2.reader.Offset +import org.apache.spark.sql.sources.v2 case class RateStreamOffset(partitionToValueAndRunTimeMs: Map[Int, (Long, Long)]) - extends Offset { + extends v2.reader.Offset { implicit val defaultFormats: DefaultFormats = DefaultFormats override val json = Serialization.write(partitionToValueAndRunTimeMs) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SerializedOffset.scala similarity index 95% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SerializedOffset.scala index 73f0c6221c5c1..129cfed860eb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SerializedOffset.scala @@ -17,9 +17,6 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.sql.sources.v2.reader.Offset - - /** * Used when loading a JSON serialized offset from external storage. * We are currently not responsible for converting JSON serialized diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index dbb408ffc98d8..311942f6dbd84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.types.StructType /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 7946889e85e37..129995dcf3607 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.StreamingExplainCommand import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala index 770db401c9fd7..a3f3662e6f4c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.streaming import scala.collection.{immutable, GenTraversableOnce} -import org.apache.spark.sql.sources.v2.reader.Offset - /** * A helper class that looks like a Map[Source, Offset]. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala index 77fc26730e52c..4c3a1ee201ac1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala @@ -25,7 +25,8 @@ import org.json4s.jackson.Serialization import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.{RateSourceProvider, RateStreamOffset} +import org.apache.spark.sql.execution.streaming.sources.RateStreamSourceV2 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index db0717510a2cb..3041d4d703cb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, Statistics} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index 440cae016a173..0b22cbc46e6bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -31,7 +31,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} import org.apache.spark.unsafe.types.UTF8String diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala similarity index 96% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala index 102551c238bfb..45dc7d75cbc8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.streaming +package org.apache.spark.sql.execution.streaming.sources import java.util.Optional @@ -27,6 +27,7 @@ import org.json4s.jackson.Serialization import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.RateStreamOffset import org.apache.spark.sql.sources.v2.DataSourceV2Options import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} @@ -59,7 +60,9 @@ class RateStreamV2Reader(options: DataSourceV2Options) private var start: RateStreamOffset = _ private var end: RateStreamOffset = _ - override def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = { + override def setOffsetRange( + start: Optional[Offset], + end: Optional[Offset]): Unit = { this.start = start.orElse( RateStreamSourceV2.createInitialOffset(numPartitions, creationTimeMs)) .asInstanceOf[RateStreamOffset] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala similarity index 98% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala index 437040cc12472..94c5dd63089b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.streaming +package org.apache.spark.sql.execution.streaming.sources import javax.annotation.concurrent.GuardedBy @@ -26,6 +26,7 @@ import scala.util.control.NonFatal import org.apache.spark.internal.Logging import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update} +import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, DataSourceV2, DataSourceV2Options, MicroBatchWriteSupport} import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.streaming.OutputMode diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala index be4b490754986..00d4f0b8503d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming import org.scalatest.BeforeAndAfter import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.streaming.sources._ import org.apache.spark.sql.streaming.{OutputMode, StreamTest} class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala index ef801ceb1310c..6514c5f0fdfeb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.Row import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.execution.streaming.sources.{RateStreamBatchTask, RateStreamSourceV2, RateStreamV2Reader} import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2Options, MicroBatchReadSupport} import org.apache.spark.sql.streaming.StreamTest diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index c5b57bca18313..a2ade1b517624 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, SeenFilesMap} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._ import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSQLContext diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala index 429748261f1ea..f208f9bd9b6e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.streaming import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.execution.streaming.{LongOffset, SerializedOffset} -import org.apache.spark.sql.sources.v2.reader.Offset +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, SerializedOffset} trait OffsetSuite extends SparkFunSuite { /** Creates test to check all the comparisons of offsets given a `one` that is less than `two`. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 9e696b2236b68..77bcb9e10202a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -39,7 +39,6 @@ import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreCon import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.StreamSourceProvider -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index fb88c5d327043..71a474ef63e84 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStore -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.StreamingQueryListener._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.{Clock, SystemClock, Utils} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 38aa5171314f2..97e065193fd05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.OutputMode._ import org.apache.spark.sql.streaming.util.{MockSourceProvider, StreamManualClock} import org.apache.spark.sql.types.StructType diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index fc9ac2a56c4e5..9ff02dee288fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -33,7 +33,6 @@ import org.apache.spark.scheduler._ import org.apache.spark.sql.{Encoder, SparkSession} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.StreamingQueryListener._ import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.util.JsonProtocol diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index ad4d3abd01aa5..2fa4595dab376 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -33,12 +33,10 @@ import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock} import org.apache.spark.sql.types.StructType import org.apache.spark.util.ManualClock - class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging with MockitoSugar { import AwaitTerminationTester._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 952908f21ca60..aa163d2211c38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.{ProcessingTime => DeprecatedProcessingTime, _} import org.apache.spark.sql.streaming.Trigger._ import org.apache.spark.sql.types._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala index 9a35f097e6e40..19ab2ff13e14e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala @@ -20,9 +20,8 @@ package org.apache.spark.sql.streaming.util import java.util.concurrent.CountDownLatch import org.apache.spark.sql.{SQLContext, _} -import org.apache.spark.sql.execution.streaming.{LongOffset, Sink, Source} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source} import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{IntegerType, StructField, StructType} From 949f5b49fc7b0f1de53acdfdac942d02eb2c2651 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Mon, 18 Dec 2017 14:02:18 -0800 Subject: [PATCH 2/5] add comment for deprecated class --- .../org/apache/spark/sql/execution/streaming/Offset.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java index fa1c129654435..a2ffc9709f5df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java @@ -1,5 +1,12 @@ package org.apache.spark.sql.execution.streaming; +/** + * This is an internal, deprecated interface. New source implementations should use the + * org.apache.spark.sql.sources.v2.reader.Offset class, which is the one that will be supported + * in the long term. + * + * This class will be removed in a future release. + */ public abstract class Offset { /** * A JSON-serialized representation of an Offset that is From c94699e95e8013929bd03aeeb3505865ace96d75 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Mon, 18 Dec 2017 14:11:08 -0800 Subject: [PATCH 3/5] copyright header --- .../spark/sql/execution/streaming/Offset.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java index a2ffc9709f5df..80aa5505db991 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.execution.streaming; /** From ace608db502d10c89f0a65811bff1822f7a8e06c Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Mon, 18 Dec 2017 14:39:38 -0800 Subject: [PATCH 4/5] fix kafka --- .../main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala | 1 - .../org/apache/spark/sql/kafka010/KafkaSourceOffset.scala | 3 +-- .../scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala | 1 - 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 87f31fcc20ae6..e9cff04ba5f2e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.kafka010.KafkaSource._ -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala index 6e24423df4abc..b5da415b3097e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.kafka010 import org.apache.kafka.common.TopicPartition -import org.apache.spark.sql.execution.streaming.SerializedOffset -import org.apache.spark.sql.sources.v2.reader.Offset +import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset} /** * An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 9cac0e5ae7117..2034b9be07f24 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.ForeachWriter import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} From 97fd2f968d417bd7e690cf961efaac77bc2ef08d Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Mon, 18 Dec 2017 19:16:14 -0800 Subject: [PATCH 5/5] duplicate methods and docs --- .../spark/sql/sources/v2/reader/Offset.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java index 62839ec665de0..ce1c489742054 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java @@ -24,5 +24,37 @@ * reconstruct the stream position where the offset was taken. */ public abstract class Offset extends org.apache.spark.sql.execution.streaming.Offset { + /** + * A JSON-serialized representation of an Offset that is + * used for saving offsets to the offset log. + * Note: We assume that equivalent/equal offsets serialize to + * identical JSON strings. + * + * @return JSON string encoding + */ + public abstract String json(); + /** + * Equality based on JSON string representation. We leverage the + * JSON representation for normalization between the Offset's + * in memory and on disk representations. + */ + @Override + public boolean equals(Object obj) { + if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) { + return this.json().equals(((org.apache.spark.sql.execution.streaming.Offset) obj).json()); + } else { + return false; + } + } + + @Override + public int hashCode() { + return this.json().hashCode(); + } + + @Override + public String toString() { + return this.json(); + } }