From 84efce97313c5244dbeb30db6ff70f70b48f1555 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 2 Dec 2016 17:48:22 -0800 Subject: [PATCH 1/8] Added test for FileStremaSinkLog --- .../file-sink-log-version-2.1.0/7.compact | 9 ++++++++ .../file-sink-log-version-2.1.0/8 | 3 +++ .../file-sink-log-version-2.1.0/9 | 2 ++ .../streaming/FileStreamSinkLogSuite.scala | 21 +++++++++++++++++++ 4 files changed, 35 insertions(+) create mode 100644 sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact create mode 100644 sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8 create mode 100644 sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9 diff --git a/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact new file mode 100644 index 0000000000000..e1ec8a74f052c --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact @@ -0,0 +1,9 @@ +v1 +{"path":"/a/b/0","size":1,"isDir":false,"modificationTime":1,"blockReplication":1,"blockSize":100,"action":"add"} +{"path":"/a/b/1","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"add"} +{"path":"/a/b/2","size":200,"isDir":false,"modificationTime":200,"blockReplication":1,"blockSize":100,"action":"add"} +{"path":"/a/b/3","size":300,"isDir":false,"modificationTime":300,"blockReplication":1,"blockSize":100,"action":"add"} +{"path":"/a/b/4","size":400,"isDir":false,"modificationTime":400,"blockReplication":1,"blockSize":100,"action":"add"} +{"path":"/a/b/5","size":500,"isDir":false,"modificationTime":500,"blockReplication":1,"blockSize":100,"action":"add"} +{"path":"/a/b/6","size":600,"isDir":false,"modificationTime":600,"blockReplication":1,"blockSize":100,"action":"add"} +{"path":"/a/b/7","size":700,"isDir":false,"modificationTime":700,"blockReplication":1,"blockSize":100,"action":"add"} diff --git a/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8 b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8 new file mode 100644 index 0000000000000..e7989804e8886 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8 @@ -0,0 +1,3 @@ +v1 +{"path":"/a/b/8","size":800,"isDir":false,"modificationTime":800,"blockReplication":1,"blockSize":100,"action":"add"} +{"path":"/a/b/0","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"delete"} diff --git a/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9 b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9 new file mode 100644 index 0000000000000..42fb0ee416922 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9 @@ -0,0 +1,2 @@ +v1 +{"path":"/a/b/9","size":900,"isDir":false,"modificationTime":900,"blockReplication":3,"blockSize":200,"action":"add"} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index e046fee0c04d3..3ba3549562adf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -185,6 +185,21 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { } } + test("read Spark 2.1.0 format") { + assert(readFromResource("file-sink-log-version-2.1.0") === Seq( + // SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION), + SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION) + )) + } + /** * Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields * in SinkFileStatus. @@ -206,4 +221,10 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { f(sinkLog) } } + + private def readFromResource(dir: String): Seq[SinkFileStatus] = { + val input = getClass.getResource(s"/structured-streaming/$dir") + val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, input.toString) + sinkLog.allFiles() + } } From 3d5449418da6453b608e39ab19d32caa3ffe9d7b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 2 Dec 2016 18:23:28 -0800 Subject: [PATCH 2/8] Added test for FileStreamSourceLog --- .../file-source-log-version-2.1.0/2.compact | 4 ++++ .../file-source-log-version-2.1.0/3 | 2 ++ .../file-source-log-version-2.1.0/4 | 2 ++ .../streaming/FileStreamSinkLogSuite.scala | 2 +- .../sql/streaming/FileStreamSourceSuite.scala | 19 +++++++++++++++++-- 5 files changed, 26 insertions(+), 3 deletions(-) create mode 100644 sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact create mode 100644 sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3 create mode 100644 sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4 diff --git a/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact new file mode 100644 index 0000000000000..95f78bb2620d4 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact @@ -0,0 +1,4 @@ +v1 +{"path":"/a/b/0","timestamp":1480730949000,"batchId":0} +{"path":"/a/b/1","timestamp":1480730950000,"batchId":1} +{"path":"/a/b/2","timestamp":1480730950000,"batchId":2} diff --git a/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3 b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3 new file mode 100644 index 0000000000000..2caa5972e42eb --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3 @@ -0,0 +1,2 @@ +v1 +{"path":"/a/b/3","timestamp":1480730950000,"batchId":3} diff --git a/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4 b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4 new file mode 100644 index 0000000000000..e54b943229880 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4 @@ -0,0 +1,2 @@ +v1 +{"path":"/a/b/4","timestamp":1480730951000,"batchId":4} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index 3ba3549562adf..6ab8e5f9520a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -185,7 +185,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { } } - test("read Spark 2.1.0 format") { + test("read Spark 2.1.0 log format") { assert(readFromResource("file-sink-log-version-2.1.0") === Seq( // SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 8256c63d87090..59ab3c5fabb19 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -19,14 +19,13 @@ package org.apache.spark.sql.streaming import java.io.File -import scala.collection.mutable - import org.scalatest.PrivateMethodTester import org.scalatest.time.SpanSugar._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -1022,6 +1021,22 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val options = new FileStreamOptions(Map("maxfilespertrigger" -> "1")) assert(options.maxFilesPerTrigger == Some(1)) } + + test("read Spark 2.1.0 log format") { + assert(readLogFromResource("file-source-log-version-2.1.0") === Seq( + FileEntry("/a/b/0", 1480730949000L, 0L), + FileEntry("/a/b/1", 1480730950000L, 1L), + FileEntry("/a/b/2", 1480730950000L, 2L), + FileEntry("/a/b/3", 1480730950000L, 3L), + FileEntry("/a/b/4", 1480730951000L, 4L) + )) + } + + private def readLogFromResource(dir: String): Seq[FileEntry] = { + val input = getClass.getResource(s"/structured-streaming/$dir") + val log = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, input.toString) + log.allFiles() + } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { From 49e940b3bd566d0502d52c0a974ddd75650d9f7a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 2 Dec 2016 18:53:13 -0800 Subject: [PATCH 3/8] Added test for OffsetSeqLog --- .../offset-log-version-2.1.0/15 | 3 ++ .../offset-log-version-2.1.0/16 | 3 ++ .../streaming/FileStreamSinkLogSuite.scala | 6 ++-- .../streaming/OffsetSeqLogSuite.scala | 35 +++++++++++++++++++ 4 files changed, 44 insertions(+), 3 deletions(-) create mode 100644 sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/15 create mode 100644 sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/16 diff --git a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/15 b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/15 new file mode 100644 index 0000000000000..7f7923fa9bb08 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/15 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1480732569701} +{"stress5":{"23":0,"8":1,"17":1,"11":1,"20":0,"2":6,"5":2,"14":0,"4":3,"13":0,"22":0,"7":1,"16":0,"25":0,"10":0,"1":6,"19":0,"9":0,"18":1,"3":3,"21":0,"12":0,"15":0,"24":0,"6":0,"0":4},"stress8":{"8":0,"11":0,"2":2,"5":2,"4":0,"7":0,"10":0,"1":0,"9":0,"12":0,"3":0,"6":1,"0":2},"stress16":{"8":0,"2":0,"5":0,"4":0,"7":0,"1":0,"3":0,"6":0,"0":0},"stress10":{"8":1,"2":1,"5":1,"4":1,"7":0,"1":0,"3":0,"6":0,"0":0},"stress1":{"8":3,"17":0,"11":3,"20":0,"2":5,"5":2,"14":2,"13":0,"4":1,"7":0,"16":0,"1":3,"10":0,"19":0,"9":0,"18":0,"21":0,"12":1,"3":0,"15":0,"6":1,"0":5},"stress4":{"23":0,"8":0,"17":0,"11":0,"20":0,"2":0,"5":0,"14":0,"4":0,"13":0,"22":0,"16":0,"7":0,"25":0,"10":0,"1":0,"19":0,"9":0,"18":0,"3":0,"12":0,"21":0,"15":0,"6":0,"24":0,"0":9},"stress3":{"23":0,"17":0,"8":0,"26":0,"11":0,"2":4,"20":0,"5":1,"14":1,"13":2,"4":1,"22":1,"16":1,"7":2,"25":0,"10":0,"1":4,"19":0,"9":1,"18":0,"27":0,"3":4,"12":0,"21":0,"15":0,"6":1,"24":0,"0":2},"stress18":{"1":0,"0":0},"stress6":{"8":0,"11":0,"2":0,"5":0,"14":0,"13":0,"4":0,"7":0,"1":0,"10":0,"9":0,"12":0,"3":0,"15":0,"6":0,"0":0},"stress12":{"8":0,"2":0,"5":0,"4":0,"7":0,"1":0,"3":0,"6":0,"0":0},"stress14":{"2":0,"5":0,"4":0,"1":0,"3":0,"0":0},"stress2":{"23":0,"17":1,"8":1,"26":0,"11":1,"2":4,"20":1,"5":3,"14":1,"4":1,"13":1,"22":0,"16":0,"7":1,"25":0,"10":1,"1":3,"19":0,"9":1,"18":0,"12":0,"21":0,"3":2,"15":0,"6":0,"24":0,"0":3}} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/16 b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/16 new file mode 100644 index 0000000000000..fd3e0da7813d5 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/16 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1480732570644} +{"kafka-topic":{"23":0,"8":1,"17":1,"11":1,"20":0,"2":6,"5":2,"14":0,"4":4,"13":1,"22":1,"7":1,"16":0,"25":0,"10":0,"1":6,"19":0,"9":0,"18":1,"3":3,"21":0,"12":0,"15":0,"24":0,"6":0,"0":4},"kafka-topic8":{"8":0,"11":0,"2":2,"5":2,"4":0,"7":0,"10":0,"1":0,"9":0,"12":0,"3":0,"6":1,"0":2},"kafka-topic16":{"8":0,"2":0,"5":0,"4":0,"7":0,"1":0,"3":0,"6":0,"0":0},"kafka-topic10":{"8":1,"2":1,"5":1,"4":1,"7":0,"1":0,"3":0,"6":0,"0":0},"kafka-topic1":{"8":3,"17":0,"11":3,"20":0,"2":5,"5":2,"14":2,"13":0,"4":1,"7":0,"16":0,"1":3,"10":0,"19":0,"9":0,"18":0,"21":0,"12":1,"3":0,"15":0,"6":1,"0":5},"kafka-topic4":{"23":0,"8":0,"17":0,"11":0,"20":0,"2":0,"5":0,"14":0,"4":0,"13":0,"22":0,"16":0,"7":0,"25":0,"10":0,"1":0,"19":0,"9":0,"18":0,"3":0,"12":0,"21":0,"15":0,"6":0,"24":0,"0":9},"kafka-topic3":{"23":0,"17":0,"8":0,"26":0,"11":0,"2":4,"20":0,"5":1,"14":1,"13":2,"4":1,"22":1,"16":1,"7":2,"25":0,"10":0,"1":4,"19":0,"9":1,"18":0,"27":0,"3":4,"12":0,"21":0,"15":0,"6":1,"24":0,"0":2},"kafka-topic18":{"1":0,"0":0},"kafka-topic6":{"8":0,"11":0,"2":0,"5":0,"14":0,"13":0,"4":0,"7":0,"1":0,"10":0,"9":0,"12":0,"3":0,"15":0,"6":0,"0":0},"kafka-topic12":{"8":0,"2":0,"5":0,"4":0,"7":0,"1":0,"3":0,"6":0,"0":0},"kafka-topic14":{"2":0,"5":0,"4":0,"1":0,"3":0,"0":0},"kafka-topic2":{"23":0,"17":1,"8":1,"26":0,"11":1,"2":4,"20":1,"5":3,"14":1,"4":1,"13":1,"22":0,"16":0,"7":1,"25":0,"10":1,"1":3,"19":0,"9":1,"18":0,"12":0,"21":0,"3":2,"15":0,"6":0,"24":0,"0":3}} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index 6ab8e5f9520a6..8a21b76e8f029 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -187,7 +187,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { test("read Spark 2.1.0 log format") { assert(readFromResource("file-sink-log-version-2.1.0") === Seq( - // SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), + // SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), -> deleted SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION), SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION), @@ -224,7 +224,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { private def readFromResource(dir: String): Seq[SinkFileStatus] = { val input = getClass.getResource(s"/structured-streaming/$dir") - val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, input.toString) - sinkLog.allFiles() + val log = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, input.toString) + log.allFiles() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index 3afd11fa4686d..573d8a069916e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -60,4 +60,39 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { Array(0 -> batch0Serialized, 1 -> batch1Serialized)) } } + + test("read Spark 2.1.0 log format") { + val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0") + assert(batchId === 16) + assert(offsetSeq === + OffsetSeq( + offsets = Seq(Some(SerializedOffset( + """ + |{"kafka-topic":{"23":0,"8":1,"17":1,"11":1,"20":0,"2":6,"5":2,"14":0,"4":4,"13":1, + |"22":1,"7":1,"16":0,"25":0,"10":0,"1":6,"19":0,"9":0,"18":1,"3":3,"21":0,"12":0, + |"15":0,"24":0,"6":0,"0":4},"kafka-topic8":{"8":0,"11":0,"2":2,"5":2,"4":0,"7":0, + |"10":0,"1":0,"9":0,"12":0,"3":0,"6":1,"0":2},"kafka-topic16":{"8":0,"2":0,"5":0, + |"4":0,"7":0,"1":0,"3":0,"6":0,"0":0},"kafka-topic10":{"8":1,"2":1,"5":1,"4":1,"7":0, + |"1":0,"3":0,"6":0,"0":0},"kafka-topic1":{"8":3,"17":0,"11":3,"20":0,"2":5,"5":2,"14":2, + |"13":0,"4":1,"7":0,"16":0,"1":3,"10":0,"19":0,"9":0,"18":0,"21":0,"12":1,"3":0,"15":0, + |"6":1,"0":5},"kafka-topic4":{"23":0,"8":0,"17":0,"11":0,"20":0,"2":0,"5":0,"14":0, + |"4":0,"13":0,"22":0,"16":0,"7":0,"25":0,"10":0,"1":0,"19":0,"9":0,"18":0,"3":0,"12":0, + |"21":0,"15":0,"6":0,"24":0,"0":9},"kafka-topic3":{"23":0,"17":0,"8":0,"26":0,"11":0, + |"2":4,"20":0,"5":1,"14":1,"13":2,"4":1,"22":1,"16":1,"7":2,"25":0,"10":0,"1":4,"19":0, + |"9":1,"18":0,"27":0,"3":4,"12":0,"21":0,"15":0,"6":1,"24":0,"0":2}, + |"kafka-topic18":{"1":0,"0":0},"kafka-topic6":{"8":0,"11":0,"2":0,"5":0,"14":0,"13":0, + |"4":0,"7":0,"1":0,"10":0,"9":0,"12":0,"3":0,"15":0,"6":0,"0":0},"kafka-topic12":{"8":0, + |"2":0,"5":0,"4":0,"7":0,"1":0,"3":0,"6":0,"0":0},"kafka-topic14":{"2":0,"5":0,"4":0, + |"1":0,"3":0,"0":0},"kafka-topic2":{"23":0,"17":1,"8":1,"26":0,"11":1,"2":4,"20":1, + |"5":3,"14":1,"4":1,"13":1,"22":0,"16":0,"7":1,"25":0,"10":1,"1":3,"19":0,"9":1, + |"18":0,"12":0,"21":0,"3":2,"15":0,"6":0,"24":0,"0":3}} + """.stripMargin.trim.split("\n").mkString))), + metadata = Some("""{"batchWatermarkMs":0,"batchTimestampMs":1480732570644}"""))) + } + + private def readFromResource(dir: String): (Long, OffsetSeq) = { + val input = getClass.getResource(s"/structured-streaming/$dir") + val log = new OffsetSeqLog(spark, input.toString) + log.getLatest().get + } } From d9be1c5b78ca7ac9f42f69bb267feea36ab3fb50 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 2 Dec 2016 21:25:15 -0800 Subject: [PATCH 4/8] Added rat excludes --- dev/.rat-excludes | 1 + 1 file changed, 1 insertion(+) diff --git a/dev/.rat-excludes b/dev/.rat-excludes index a3efddeaa515a..6be1c72bc6cfb 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -102,3 +102,4 @@ org.apache.spark.scheduler.ExternalClusterManager .Rbuildignore org.apache.spark.deploy.yarn.security.ServiceCredentialProvider spark-warehouse +structured-streaming/* From 4150e56d1750ed238a2401c518761649fdccb3b7 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 5 Dec 2016 15:55:12 -0800 Subject: [PATCH 5/8] Improved offset seq test --- .../offset-log-version-2.1.0/0 | 4 +++ .../offset-log-version-2.1.0/15 | 3 -- .../offset-log-version-2.1.0/16 | 3 -- .../streaming/OffsetSeqLogSuite.scala | 32 ++++--------------- 4 files changed, 11 insertions(+), 31 deletions(-) create mode 100644 sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0 delete mode 100644 sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/15 delete mode 100644 sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/16 diff --git a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0 b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0 new file mode 100644 index 0000000000000..fe5c1d44a6e26 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0 @@ -0,0 +1,4 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1480981499528} +0 +{"topic-0":{"0":1}} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/15 b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/15 deleted file mode 100644 index 7f7923fa9bb08..0000000000000 --- a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/15 +++ /dev/null @@ -1,3 +0,0 @@ -v1 -{"batchWatermarkMs":0,"batchTimestampMs":1480732569701} -{"stress5":{"23":0,"8":1,"17":1,"11":1,"20":0,"2":6,"5":2,"14":0,"4":3,"13":0,"22":0,"7":1,"16":0,"25":0,"10":0,"1":6,"19":0,"9":0,"18":1,"3":3,"21":0,"12":0,"15":0,"24":0,"6":0,"0":4},"stress8":{"8":0,"11":0,"2":2,"5":2,"4":0,"7":0,"10":0,"1":0,"9":0,"12":0,"3":0,"6":1,"0":2},"stress16":{"8":0,"2":0,"5":0,"4":0,"7":0,"1":0,"3":0,"6":0,"0":0},"stress10":{"8":1,"2":1,"5":1,"4":1,"7":0,"1":0,"3":0,"6":0,"0":0},"stress1":{"8":3,"17":0,"11":3,"20":0,"2":5,"5":2,"14":2,"13":0,"4":1,"7":0,"16":0,"1":3,"10":0,"19":0,"9":0,"18":0,"21":0,"12":1,"3":0,"15":0,"6":1,"0":5},"stress4":{"23":0,"8":0,"17":0,"11":0,"20":0,"2":0,"5":0,"14":0,"4":0,"13":0,"22":0,"16":0,"7":0,"25":0,"10":0,"1":0,"19":0,"9":0,"18":0,"3":0,"12":0,"21":0,"15":0,"6":0,"24":0,"0":9},"stress3":{"23":0,"17":0,"8":0,"26":0,"11":0,"2":4,"20":0,"5":1,"14":1,"13":2,"4":1,"22":1,"16":1,"7":2,"25":0,"10":0,"1":4,"19":0,"9":1,"18":0,"27":0,"3":4,"12":0,"21":0,"15":0,"6":1,"24":0,"0":2},"stress18":{"1":0,"0":0},"stress6":{"8":0,"11":0,"2":0,"5":0,"14":0,"13":0,"4":0,"7":0,"1":0,"10":0,"9":0,"12":0,"3":0,"15":0,"6":0,"0":0},"stress12":{"8":0,"2":0,"5":0,"4":0,"7":0,"1":0,"3":0,"6":0,"0":0},"stress14":{"2":0,"5":0,"4":0,"1":0,"3":0,"0":0},"stress2":{"23":0,"17":1,"8":1,"26":0,"11":1,"2":4,"20":1,"5":3,"14":1,"4":1,"13":1,"22":0,"16":0,"7":1,"25":0,"10":1,"1":3,"19":0,"9":1,"18":0,"12":0,"21":0,"3":2,"15":0,"6":0,"24":0,"0":3}} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/16 b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/16 deleted file mode 100644 index fd3e0da7813d5..0000000000000 --- a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/16 +++ /dev/null @@ -1,3 +0,0 @@ -v1 -{"batchWatermarkMs":0,"batchTimestampMs":1480732570644} -{"kafka-topic":{"23":0,"8":1,"17":1,"11":1,"20":0,"2":6,"5":2,"14":0,"4":4,"13":1,"22":1,"7":1,"16":0,"25":0,"10":0,"1":6,"19":0,"9":0,"18":1,"3":3,"21":0,"12":0,"15":0,"24":0,"6":0,"0":4},"kafka-topic8":{"8":0,"11":0,"2":2,"5":2,"4":0,"7":0,"10":0,"1":0,"9":0,"12":0,"3":0,"6":1,"0":2},"kafka-topic16":{"8":0,"2":0,"5":0,"4":0,"7":0,"1":0,"3":0,"6":0,"0":0},"kafka-topic10":{"8":1,"2":1,"5":1,"4":1,"7":0,"1":0,"3":0,"6":0,"0":0},"kafka-topic1":{"8":3,"17":0,"11":3,"20":0,"2":5,"5":2,"14":2,"13":0,"4":1,"7":0,"16":0,"1":3,"10":0,"19":0,"9":0,"18":0,"21":0,"12":1,"3":0,"15":0,"6":1,"0":5},"kafka-topic4":{"23":0,"8":0,"17":0,"11":0,"20":0,"2":0,"5":0,"14":0,"4":0,"13":0,"22":0,"16":0,"7":0,"25":0,"10":0,"1":0,"19":0,"9":0,"18":0,"3":0,"12":0,"21":0,"15":0,"6":0,"24":0,"0":9},"kafka-topic3":{"23":0,"17":0,"8":0,"26":0,"11":0,"2":4,"20":0,"5":1,"14":1,"13":2,"4":1,"22":1,"16":1,"7":2,"25":0,"10":0,"1":4,"19":0,"9":1,"18":0,"27":0,"3":4,"12":0,"21":0,"15":0,"6":1,"24":0,"0":2},"kafka-topic18":{"1":0,"0":0},"kafka-topic6":{"8":0,"11":0,"2":0,"5":0,"14":0,"13":0,"4":0,"7":0,"1":0,"10":0,"9":0,"12":0,"3":0,"15":0,"6":0,"0":0},"kafka-topic12":{"8":0,"2":0,"5":0,"4":0,"7":0,"1":0,"3":0,"6":0,"0":0},"kafka-topic14":{"2":0,"5":0,"4":0,"1":0,"3":0,"0":0},"kafka-topic2":{"23":0,"17":1,"8":1,"26":0,"11":1,"2":4,"20":1,"5":3,"14":1,"4":1,"13":1,"22":0,"16":0,"7":1,"25":0,"10":1,"1":3,"19":0,"9":1,"18":0,"12":0,"21":0,"3":2,"15":0,"6":0,"24":0,"0":3}} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index 573d8a069916e..19e0ae31339e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -63,31 +63,13 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { test("read Spark 2.1.0 log format") { val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0") - assert(batchId === 16) - assert(offsetSeq === - OffsetSeq( - offsets = Seq(Some(SerializedOffset( - """ - |{"kafka-topic":{"23":0,"8":1,"17":1,"11":1,"20":0,"2":6,"5":2,"14":0,"4":4,"13":1, - |"22":1,"7":1,"16":0,"25":0,"10":0,"1":6,"19":0,"9":0,"18":1,"3":3,"21":0,"12":0, - |"15":0,"24":0,"6":0,"0":4},"kafka-topic8":{"8":0,"11":0,"2":2,"5":2,"4":0,"7":0, - |"10":0,"1":0,"9":0,"12":0,"3":0,"6":1,"0":2},"kafka-topic16":{"8":0,"2":0,"5":0, - |"4":0,"7":0,"1":0,"3":0,"6":0,"0":0},"kafka-topic10":{"8":1,"2":1,"5":1,"4":1,"7":0, - |"1":0,"3":0,"6":0,"0":0},"kafka-topic1":{"8":3,"17":0,"11":3,"20":0,"2":5,"5":2,"14":2, - |"13":0,"4":1,"7":0,"16":0,"1":3,"10":0,"19":0,"9":0,"18":0,"21":0,"12":1,"3":0,"15":0, - |"6":1,"0":5},"kafka-topic4":{"23":0,"8":0,"17":0,"11":0,"20":0,"2":0,"5":0,"14":0, - |"4":0,"13":0,"22":0,"16":0,"7":0,"25":0,"10":0,"1":0,"19":0,"9":0,"18":0,"3":0,"12":0, - |"21":0,"15":0,"6":0,"24":0,"0":9},"kafka-topic3":{"23":0,"17":0,"8":0,"26":0,"11":0, - |"2":4,"20":0,"5":1,"14":1,"13":2,"4":1,"22":1,"16":1,"7":2,"25":0,"10":0,"1":4,"19":0, - |"9":1,"18":0,"27":0,"3":4,"12":0,"21":0,"15":0,"6":1,"24":0,"0":2}, - |"kafka-topic18":{"1":0,"0":0},"kafka-topic6":{"8":0,"11":0,"2":0,"5":0,"14":0,"13":0, - |"4":0,"7":0,"1":0,"10":0,"9":0,"12":0,"3":0,"15":0,"6":0,"0":0},"kafka-topic12":{"8":0, - |"2":0,"5":0,"4":0,"7":0,"1":0,"3":0,"6":0,"0":0},"kafka-topic14":{"2":0,"5":0,"4":0, - |"1":0,"3":0,"0":0},"kafka-topic2":{"23":0,"17":1,"8":1,"26":0,"11":1,"2":4,"20":1, - |"5":3,"14":1,"4":1,"13":1,"22":0,"16":0,"7":1,"25":0,"10":1,"1":3,"19":0,"9":1, - |"18":0,"12":0,"21":0,"3":2,"15":0,"6":0,"24":0,"0":3}} - """.stripMargin.trim.split("\n").mkString))), - metadata = Some("""{"batchWatermarkMs":0,"batchTimestampMs":1480732570644}"""))) + assert(batchId === 0) + assert(offsetSeq.offsets === Seq( + Some(SerializedOffset("0")), + Some(SerializedOffset("""{"topic-0":{"0":1}}""")) + )) + assert(offsetSeq.metadata === + Some("""{"batchWatermarkMs":0,"batchTimestampMs":1480981499528}""")) } private def readFromResource(dir: String): (Long, OffsetSeq) = { From a7529b3802208bad2994636d1bea6dde5bf3c665 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 5 Dec 2016 16:30:54 -0800 Subject: [PATCH 6/8] Added kafka offset test --- .../org/apache/spark/sql/kafka010/JsonUtils.scala | 9 ++++++++- .../spark/sql/kafka010/KafkaSourceOffsetSuite.scala | 13 +++++++++++++ .../kafka-offset-version-2.1.0.txt | 1 + 3 files changed, 22 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/resources/structured-streaming/kafka-offset-version-2.1.0.txt diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala index 13d717092a898..868edb5dcdc0c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala @@ -81,7 +81,14 @@ private object JsonUtils { */ def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = { val result = new HashMap[String, HashMap[Int, Long]]() - partitionOffsets.foreach { case (tp, off) => + implicit val ordering = new Ordering[TopicPartition] { + override def compare(x: TopicPartition, y: TopicPartition): Int = { + Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition)) + } + } + val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism + partitions.foreach { tp => + val off = partitionOffsets(tp) val parts = result.getOrElse(tp.topic, new HashMap[Int, Long]) parts += tp.partition -> off result += tp.topic -> parts diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala index 881018fd95665..09bacff441648 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.kafka010 import java.io.File +import scala.io.Source + import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.streaming.OffsetSuite import org.apache.spark.sql.test.SharedSQLContext @@ -89,4 +91,15 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext { Array(0 -> batch0Serialized, 1 -> batch1Serialized)) } } + + test("read Spark 2.1.0 log format") { + val offset = readFromResource("kafka-offset-version-2.1.0.txt") + assert(KafkaSourceOffset(offset) === + KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L))) + } + + private def readFromResource(file: String): SerializedOffset = { + val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString + SerializedOffset(str) + } } diff --git a/sql/core/src/test/resources/structured-streaming/kafka-offset-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/kafka-offset-version-2.1.0.txt new file mode 100644 index 0000000000000..6410031743d26 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/kafka-offset-version-2.1.0.txt @@ -0,0 +1 @@ +{"topic1":{"0":456,"1":789},"topic2":{"0":0}} From 8d4ca5e5d58c01050ac3ca13e4e9b004f67c3009 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 5 Dec 2016 17:29:03 -0800 Subject: [PATCH 7/8] Added test for file stream source offset --- .../spark/sql/kafka010/KafkaSourceOffsetSuite.scala | 5 ++--- .../file-source-offset-version-2.1.0.txt | 1 + ....0.txt => kafka-source-offset-version-2.1.0.txt} | 0 .../spark/sql/streaming/FileStreamSourceSuite.scala | 13 ++++++++++++- 4 files changed, 15 insertions(+), 4 deletions(-) create mode 100644 sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt rename sql/core/src/test/resources/structured-streaming/{kafka-offset-version-2.1.0.txt => kafka-source-offset-version-2.1.0.txt} (100%) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala index 09bacff441648..c8326ffcc7ad4 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.kafka010 import java.io.File -import scala.io.Source - import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.streaming.OffsetSuite import org.apache.spark.sql.test.SharedSQLContext @@ -93,12 +91,13 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext { } test("read Spark 2.1.0 log format") { - val offset = readFromResource("kafka-offset-version-2.1.0.txt") + val offset = readFromResource("kafka-source-offset-version-2.1.0.txt") assert(KafkaSourceOffset(offset) === KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L))) } private def readFromResource(file: String): SerializedOffset = { + import scala.io.Source val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString SerializedOffset(str) } diff --git a/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt new file mode 100644 index 0000000000000..51b4008129ffe --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt @@ -0,0 +1 @@ +345 diff --git a/sql/core/src/test/resources/structured-streaming/kafka-offset-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt similarity index 100% rename from sql/core/src/test/resources/structured-streaming/kafka-offset-version-2.1.0.txt rename to sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 59ab3c5fabb19..ff1f3e26f1593 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1022,7 +1022,12 @@ class FileStreamSourceSuite extends FileStreamSourceTest { assert(options.maxFilesPerTrigger == Some(1)) } - test("read Spark 2.1.0 log format") { + test("FileStreamSource offset - read Spark 2.1.0 log format") { + val offset = readOffsetFromResource("file-source-offset-version-2.1.0.txt") + assert(LongOffset.convert(offset) === Some(LongOffset(345))) + } + + test("FileStreamSourceLog - read Spark 2.1.0 log format") { assert(readLogFromResource("file-source-log-version-2.1.0") === Seq( FileEntry("/a/b/0", 1480730949000L, 0L), FileEntry("/a/b/1", 1480730950000L, 1L), @@ -1037,6 +1042,12 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val log = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, input.toString) log.allFiles() } + + private def readOffsetFromResource(file: String): SerializedOffset = { + import scala.io.Source + val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString + SerializedOffset(str.trim) + } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { From 26a86d64f2f492094960b19332cabd7457f95e61 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 5 Dec 2016 22:08:55 -0800 Subject: [PATCH 8/8] Fix unit test --- .../spark/sql/execution/streaming/OffsetSeqLogSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index fc2d1fabd996b..d139efaaf824f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -77,8 +77,7 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { Some(SerializedOffset("0")), Some(SerializedOffset("""{"topic-0":{"0":1}}""")) )) - assert(offsetSeq.metadata === - Some("""{"batchWatermarkMs":0,"batchTimestampMs":1480981499528}""")) + assert(offsetSeq.metadata === Some(OffsetSeqMetadata(0L, 1480981499528L))) } private def readFromResource(dir: String): (Long, OffsetSeq) = {