From 7b31dbcc1bfb9e0d67f0e49f418a6eed043e8018 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 7 Dec 2016 19:22:16 -0800 Subject: [PATCH 1/3] Make FileStreamSourceOffset, and fix FileStreamSourceLog bug --- .../streaming/FileStreamSource.scala | 14 +++--- .../streaming/FileStreamSourceLog.scala | 2 +- .../streaming/FileStreamSourceOffset.scala | 43 +++++++++++++++++++ .../file-source-offset-version-2.1.0.txt | 2 +- .../streaming/FileStreamSourceSuite.scala | 2 +- .../sql/streaming/FileStreamSourceSuite.scala | 23 ++++++---- 6 files changed, 66 insertions(+), 20 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 8494aef004bb..fce063587df6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -79,7 +79,7 @@ class FileStreamSource( * `synchronized` on this method is for solving race conditions in tests. In the normal usage, * there is no race here, so the cost of `synchronized` should be rare. */ - private def fetchMaxOffset(): LongOffset = synchronized { + private def fetchMaxOffset(): FileStreamSourceOffset = synchronized { // All the new files found - ignore aged files and files that we have seen. val newFiles = fetchAllFiles().filter { case (path, timestamp) => seenFiles.isNewFile(path, timestamp) @@ -111,7 +111,7 @@ class FileStreamSource( logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files") } - new LongOffset(maxBatchId) + FileStreamSourceOffset(maxBatchId) } /** @@ -123,16 +123,14 @@ class FileStreamSource( } /** Return the latest offset in the source */ - def currentOffset: LongOffset = synchronized { - new LongOffset(maxBatchId) - } + def currentBatchId: Long = synchronized { maxBatchId } /** * Returns the data that is between the offsets (`start`, `end`]. */ override def getBatch(start: Option[Offset], end: Offset): DataFrame = { - val startId = start.flatMap(LongOffset.convert(_)).getOrElse(LongOffset(-1L)).offset - val endId = LongOffset.convert(end).getOrElse(LongOffset(0)).offset + val startId = start.map(FileStreamSourceOffset(_).batchId).getOrElse(-1L) + val endId = FileStreamSourceOffset(end).batchId assert(startId <= endId) val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2) @@ -172,7 +170,7 @@ class FileStreamSource( files } - override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1) + override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.batchId == -1) override def toString: String = s"FileStreamSource[$qualifiedBasePath]" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 327b3ac26776..81908c0cefdf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -78,7 +78,7 @@ class FileStreamSourceLog( override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, Array[FileEntry])] = { val startBatchId = startId.getOrElse(0L) - val endBatchId = getLatest().map(_._1).getOrElse(0L) + val endBatchId = endId.orElse(getLatest().map(_._1)).getOrElse(0L) val (existedBatches, removedBatches) = (startBatchId to endBatchId).map { id => if (isCompactionBatch(id, compactInterval) && fileEntryCache.containsKey(id)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala new file mode 100644 index 000000000000..82f2b1ff6168 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +case class FileStreamSourceOffset(batchId: Long) extends Offset { + override def json: String = { + Serialization.write(this)(FileStreamSourceOffset.format) + } +} + +object FileStreamSourceOffset { + implicit val format = Serialization.formats(NoTypeHints) + + def apply(offset: Offset): FileStreamSourceOffset = { + offset match { + case f: FileStreamSourceOffset => f + case s: SerializedOffset => + Serialization.read[FileStreamSourceOffset](s.json) + case _ => + throw new IllegalArgumentException( + s"Invalid conversion from offset of ${offset.getClass} to FileStreamSourceOffset") + } + } +} + diff --git a/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt index 51b4008129ff..d8c72ad27966 100644 --- a/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt +++ b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt @@ -1 +1 @@ -345 +{ "batchId": 345 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala index 4a47c04d3f08..40d0643ba877 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala @@ -97,7 +97,7 @@ class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext { val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil), Nil, dir.getAbsolutePath, Map.empty) // this method should throw an exception if `fs.exists` is called during resolveRelation - newSource.getBatch(None, LongOffset(1)) + newSource.getBatch(None, FileStreamSourceOffset(1)) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 7b6fe83b9a59..d9977b263b0f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -60,7 +60,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext with Private val source = sources.head val newOffset = source.withBatchingLocked { addData(source) - source.currentOffset + 1 + new FileStreamSourceOffset(source.currentBatchId + 1) } logInfo(s"Added file to $source at offset $newOffset") (source, newOffset) @@ -986,12 +986,17 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val _sources = PrivateMethod[Seq[Source]]('sources) val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource] - assert(fileSource.getBatch(None, LongOffset(2)).as[String].collect() === - List("keep1", "keep2", "keep3")) - assert(fileSource.getBatch(Some(LongOffset(0)), LongOffset(2)).as[String].collect() === - List("keep2", "keep3")) - assert(fileSource.getBatch(Some(LongOffset(1)), LongOffset(2)).as[String].collect() === - List("keep3")) + + def verify(startId: Option[Int], endId: Int, expected: String*): Unit = { + val start = startId.map(new FileStreamSourceOffset(_)) + val end = FileStreamSourceOffset(endId) + assert(fileSource.getBatch(start, end).as[String].collect().toSeq === expected) + } + + verify(startId = None, endId = 2, "keep1", "keep2", "keep3") + verify(startId = Some(0), endId = 1, "keep2") + verify(startId = Some(0), endId = 2, "keep2", "keep3") + verify(startId = Some(1), endId = 2, "keep3") true } ) @@ -1024,7 +1029,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { test("FileStreamSource offset - read Spark 2.1.0 log format") { val offset = readOffsetFromResource("file-source-offset-version-2.1.0.txt") - assert(LongOffset.convert(offset) === Some(LongOffset(345))) + assert(FileStreamSourceOffset(offset) === FileStreamSourceOffset(345)) } test("FileStreamSourceLog - read Spark 2.1.0 log format") { @@ -1046,7 +1051,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { private def readOffsetFromResource(file: String): SerializedOffset = { import scala.io.Source val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString - SerializedOffset(str.trim) + SerializedOffset(str) } } From 5dda0f3b18ed52b2cd89b52d3427bae63cdc866b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 7 Dec 2016 19:41:23 -0800 Subject: [PATCH 2/3] Renamed stuff --- .../streaming/FileStreamSource.scala | 28 +++++++++---------- .../streaming/FileStreamSourceOffset.scala | 6 +++- .../file-source-offset-version-2.1.0.txt | 2 +- .../offset-log-version-2.1.0/0 | 4 +-- .../streaming/OffsetSeqLogSuite.scala | 2 +- .../sql/streaming/FileStreamSourceSuite.scala | 2 +- 6 files changed, 24 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index fce063587df6..20e0dcef8ffd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -57,7 +57,7 @@ class FileStreamSource( private val metadataLog = new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath) - private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) + private var metadataLogCurrentOffset = metadataLog.getLatest().map(_._1).getOrElse(-1L) /** Maximum number of new files to be considered in each batch */ private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger @@ -104,14 +104,14 @@ class FileStreamSource( """.stripMargin) if (batchFiles.nonEmpty) { - maxBatchId += 1 - metadataLog.add(maxBatchId, batchFiles.map { case (path, timestamp) => - FileEntry(path = path, timestamp = timestamp, batchId = maxBatchId) + metadataLogCurrentOffset += 1 + metadataLog.add(metadataLogCurrentOffset, batchFiles.map { case (p, timestamp) => + FileEntry(path = p, timestamp = timestamp, batchId = metadataLogCurrentOffset) }.toArray) - logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files") + logInfo(s"Log offset set to $metadataLogCurrentOffset with ${batchFiles.size} new files") } - FileStreamSourceOffset(maxBatchId) + FileStreamSourceOffset(metadataLogCurrentOffset) } /** @@ -122,19 +122,19 @@ class FileStreamSource( func } - /** Return the latest offset in the source */ - def currentBatchId: Long = synchronized { maxBatchId } + /** Return the latest offset in the [[FileStreamSourceLog]] */ + def currentLogOffset: Long = synchronized { metadataLogCurrentOffset } /** * Returns the data that is between the offsets (`start`, `end`]. */ override def getBatch(start: Option[Offset], end: Offset): DataFrame = { - val startId = start.map(FileStreamSourceOffset(_).batchId).getOrElse(-1L) - val endId = FileStreamSourceOffset(end).batchId + val startOffset = start.map(FileStreamSourceOffset(_).logOffset).getOrElse(-1L) + val endOffset = FileStreamSourceOffset(end).logOffset - assert(startId <= endId) - val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2) - logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId") + assert(startOffset <= endOffset) + val files = metadataLog.get(Some(startOffset + 1), Some(endOffset)).flatMap(_._2) + logInfo(s"Processing ${files.length} files from ${startOffset + 1}:$endOffset") logTrace(s"Files are:\n\t" + files.mkString("\n\t")) val newDataSource = DataSource( @@ -170,7 +170,7 @@ class FileStreamSource( files } - override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.batchId == -1) + override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.logOffset == -1) override def toString: String = s"FileStreamSource[$qualifiedBasePath]" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala index 82f2b1ff6168..f28e1ce24a04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala @@ -20,7 +20,11 @@ package org.apache.spark.sql.execution.streaming import org.json4s.NoTypeHints import org.json4s.jackson.Serialization -case class FileStreamSourceOffset(batchId: Long) extends Offset { +/** + * Offset for the [[FileStreamSource]]. + * @param logOffset Position in the [[FileStreamSourceLog]] + */ +case class FileStreamSourceOffset(logOffset: Long) extends Offset { override def json: String = { Serialization.write(this)(FileStreamSourceOffset.format) } diff --git a/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt index d8c72ad27966..e266a47368e1 100644 --- a/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt +++ b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt @@ -1 +1 @@ -{ "batchId": 345 } +{"logOffset":345} diff --git a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0 b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0 index fe5c1d44a6e2..988a98a7587d 100644 --- a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0 +++ b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0 @@ -1,4 +1,4 @@ v1 {"batchWatermarkMs":0,"batchTimestampMs":1480981499528} -0 -{"topic-0":{"0":1}} \ No newline at end of file +{"logOffset":345} +{"topic-0":{"0":1}} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index d139efaaf824..bb4274a162e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -74,7 +74,7 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0") assert(batchId === 0) assert(offsetSeq.offsets === Seq( - Some(SerializedOffset("0")), + Some(SerializedOffset("""{"logOffset":345}""")), Some(SerializedOffset("""{"topic-0":{"0":1}}""")) )) assert(offsetSeq.metadata === Some(OffsetSeqMetadata(0L, 1480981499528L))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index d9977b263b0f..af051e2b364a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -60,7 +60,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext with Private val source = sources.head val newOffset = source.withBatchingLocked { addData(source) - new FileStreamSourceOffset(source.currentBatchId + 1) + new FileStreamSourceOffset(source.currentLogOffset + 1) } logInfo(s"Added file to $source at offset $newOffset") (source, newOffset) From 05f8b56318f1c157ad39eac80081c8952cf20ffd Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 8 Dec 2016 15:06:00 -0800 Subject: [PATCH 3/3] Addressed comments --- .../spark/sql/kafka010/KafkaSourceOffsetSuite.scala | 2 +- .../execution/streaming/FileStreamSourceOffset.scala | 10 ++++++++-- ....txt => file-source-offset-version-2.1.0-json.txt} | 0 .../file-source-offset-version-2.1.0-long.txt | 1 + .../spark/sql/streaming/FileStreamSourceSuite.scala | 11 ++++++++--- 5 files changed, 18 insertions(+), 6 deletions(-) rename sql/core/src/test/resources/structured-streaming/{file-source-offset-version-2.1.0.txt => file-source-offset-version-2.1.0-json.txt} (100%) create mode 100644 sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-long.txt diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala index 22668fd6faaa..10b35c74f473 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala @@ -90,7 +90,7 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext { } } - test("read Spark 2.1.0 log format") { + test("read Spark 2.1.0 offset format") { val offset = readFromResource("kafka-source-offset-version-2.1.0.txt") assert(KafkaSourceOffset(offset) === KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala index f28e1ce24a04..06d0fe6c18c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.streaming +import scala.util.control.Exception._ + import org.json4s.NoTypeHints import org.json4s.jackson.Serialization @@ -36,8 +38,12 @@ object FileStreamSourceOffset { def apply(offset: Offset): FileStreamSourceOffset = { offset match { case f: FileStreamSourceOffset => f - case s: SerializedOffset => - Serialization.read[FileStreamSourceOffset](s.json) + case SerializedOffset(str) => + catching(classOf[NumberFormatException]).opt { + FileStreamSourceOffset(str.toLong) + }.getOrElse { + Serialization.read[FileStreamSourceOffset](str) + } case _ => throw new IllegalArgumentException( s"Invalid conversion from offset of ${offset.getClass} to FileStreamSourceOffset") diff --git a/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-json.txt similarity index 100% rename from sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt rename to sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-json.txt diff --git a/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-long.txt b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-long.txt new file mode 100644 index 000000000000..51b4008129ff --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-long.txt @@ -0,0 +1 @@ +345 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index af051e2b364a..8cb398979d14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1027,8 +1027,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest { assert(options.maxFilesPerTrigger == Some(1)) } - test("FileStreamSource offset - read Spark 2.1.0 log format") { - val offset = readOffsetFromResource("file-source-offset-version-2.1.0.txt") + test("FileStreamSource offset - read Spark 2.1.0 offset json format") { + val offset = readOffsetFromResource("file-source-offset-version-2.1.0-json.txt") + assert(FileStreamSourceOffset(offset) === FileStreamSourceOffset(345)) + } + + test("FileStreamSource offset - read Spark 2.1.0 offset long format") { + val offset = readOffsetFromResource("file-source-offset-version-2.1.0-long.txt") assert(FileStreamSourceOffset(offset) === FileStreamSourceOffset(345)) } @@ -1051,7 +1056,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { private def readOffsetFromResource(file: String): SerializedOffset = { import scala.io.Source val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString - SerializedOffset(str) + SerializedOffset(str.trim) } }