Skip to content

Commit b06c23d

Browse files
tdaszsxwing
authored andcommitted
[SPARK-18283][STRUCTURED STREAMING][KAFKA] Added test to check whether default starting offset in latest
## What changes were proposed in this pull request? Added test to check whether default starting offset in latest ## How was this patch tested? new unit test Author: Tathagata Das <[email protected]> Closes #15778 from tdas/SPARK-18283.
1 parent daa975f commit b06c23d

File tree

1 file changed

+24
-0
lines changed

1 file changed

+24
-0
lines changed

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,30 @@ class KafkaSourceSuite extends KafkaSourceTest {
306306
)
307307
}
308308

309+
test("starting offset is latest by default") {
310+
val topic = newTopic()
311+
testUtils.createTopic(topic, partitions = 5)
312+
testUtils.sendMessages(topic, Array("0"))
313+
require(testUtils.getLatestOffsets(Set(topic)).size === 5)
314+
315+
val reader = spark
316+
.readStream
317+
.format("kafka")
318+
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
319+
.option("subscribe", topic)
320+
321+
val kafka = reader.load()
322+
.selectExpr("CAST(value AS STRING)")
323+
.as[String]
324+
val mapped = kafka.map(_.toInt)
325+
326+
testStream(mapped)(
327+
makeSureGetOffsetCalled,
328+
AddKafkaData(Set(topic), 1, 2, 3),
329+
CheckAnswer(1, 2, 3) // should not have 0
330+
)
331+
}
332+
309333
test("bad source options") {
310334
def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = {
311335
val ex = intercept[IllegalArgumentException] {

0 commit comments

Comments
 (0)