Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/.rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,4 @@ org.apache.spark.scheduler.ExternalClusterManager
.Rbuildignore
org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
spark-warehouse
structured-streaming/*
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,14 @@ private object JsonUtils {
*/
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
val result = new HashMap[String, HashMap[Int, Long]]()
partitionOffsets.foreach { case (tp, off) =>
implicit val ordering = new Ordering[TopicPartition] {
override def compare(x: TopicPartition, y: TopicPartition): Int = {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
}
val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
partitions.foreach { tp =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You can use partitionOffsets.toSeq.sortBy(_._1).foreach { case (tp, off) => to simplify the codes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to sort by topic and partitions together. so that partitions are ordered when json is generated (currently is not) and hard to read.

val off = partitionOffsets(tp)
val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
parts += tp.partition -> off
result += tp.topic -> parts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,16 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
Array(0 -> batch0Serialized, 1 -> batch1Serialized))
}
}

test("read Spark 2.1.0 log format") {
val offset = readFromResource("kafka-source-offset-version-2.1.0.txt")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe not need to read json from a file since we never write them into a single file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. but its good to have it in a separate file in the same place as other formats. will be easier to track all the things that need compatibility guarantees.

assert(KafkaSourceOffset(offset) ===
KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L)))
}

private def readFromResource(file: String): SerializedOffset = {
import scala.io.Source
val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
SerializedOffset(str)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
v1
{"path":"/a/b/0","size":1,"isDir":false,"modificationTime":1,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/1","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/2","size":200,"isDir":false,"modificationTime":200,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/3","size":300,"isDir":false,"modificationTime":300,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/4","size":400,"isDir":false,"modificationTime":400,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/5","size":500,"isDir":false,"modificationTime":500,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/6","size":600,"isDir":false,"modificationTime":600,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/7","size":700,"isDir":false,"modificationTime":700,"blockReplication":1,"blockSize":100,"action":"add"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"path":"/a/b/8","size":800,"isDir":false,"modificationTime":800,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/0","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"delete"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"path":"/a/b/9","size":900,"isDir":false,"modificationTime":900,"blockReplication":3,"blockSize":200,"action":"add"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
v1
{"path":"/a/b/0","timestamp":1480730949000,"batchId":0}
{"path":"/a/b/1","timestamp":1480730950000,"batchId":1}
{"path":"/a/b/2","timestamp":1480730950000,"batchId":2}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"path":"/a/b/3","timestamp":1480730950000,"batchId":3}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"path":"/a/b/4","timestamp":1480730951000,"batchId":4}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
345
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"topic1":{"0":456,"1":789},"topic2":{"0":0}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1480981499528}
0
{"topic-0":{"0":1}}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,21 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

test("read Spark 2.1.0 log format") {
assert(readFromResource("file-sink-log-version-2.1.0") === Seq(
// SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), -> deleted
SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION)
))
}

/**
* Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields
* in SinkFileStatus.
Expand All @@ -206,4 +221,10 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
f(sinkLog)
}
}

private def readFromResource(dir: String): Seq[SinkFileStatus] = {
val input = getClass.getResource(s"/structured-streaming/$dir")
val log = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, input.toString)
log.allFiles()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,20 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
Array(0 -> batch0Serialized, 1 -> batch1Serialized))
}
}

test("read Spark 2.1.0 log format") {
val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0")
assert(batchId === 0)
assert(offsetSeq.offsets === Seq(
Some(SerializedOffset("0")),
Some(SerializedOffset("""{"topic-0":{"0":1}}"""))
))
assert(offsetSeq.metadata === Some(OffsetSeqMetadata(0L, 1480981499528L)))
}

private def readFromResource(dir: String): (Long, OffsetSeq) = {
val input = getClass.getResource(s"/structured-streaming/$dir")
val log = new OffsetSeqLog(spark, input.toString)
log.getLatest().get
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package org.apache.spark.sql.streaming

import java.io.File

import scala.collection.mutable

import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -1022,6 +1021,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val options = new FileStreamOptions(Map("maxfilespertrigger" -> "1"))
assert(options.maxFilesPerTrigger == Some(1))
}

test("FileStreamSource offset - read Spark 2.1.0 log format") {
val offset = readOffsetFromResource("file-source-offset-version-2.1.0.txt")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe not need to read json from a file since we never write them into a single file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above.

assert(LongOffset.convert(offset) === Some(LongOffset(345)))
}

test("FileStreamSourceLog - read Spark 2.1.0 log format") {
assert(readLogFromResource("file-source-log-version-2.1.0") === Seq(
FileEntry("/a/b/0", 1480730949000L, 0L),
FileEntry("/a/b/1", 1480730950000L, 1L),
FileEntry("/a/b/2", 1480730950000L, 2L),
FileEntry("/a/b/3", 1480730950000L, 3L),
FileEntry("/a/b/4", 1480730951000L, 4L)
))
}

private def readLogFromResource(dir: String): Seq[FileEntry] = {
val input = getClass.getResource(s"/structured-streaming/$dir")
val log = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, input.toString)
log.allFiles()
}

private def readOffsetFromResource(file: String): SerializedOffset = {
import scala.io.Source
val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
SerializedOffset(str.trim)
}
}

class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
Expand Down