Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private(
var toFetchOffset = offset
while (toFetchOffset != UNKNOWN_OFFSET) {
try {
return fetchData(toFetchOffset, pollTimeoutMs)
return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
} catch {
case e: OffsetOutOfRangeException =>
// When there is some error thrown, it's better to use a new consumer to drop all cached
Expand Down Expand Up @@ -159,14 +159,18 @@ private[kafka010] case class CachedKafkaConsumer private(
}

/**
* Get the record at `offset`.
* Get the record for the given offset if available. Otherwise it will either throw error
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
* or null.
*
* @throws OffsetOutOfRangeException if `offset` is out of range
* @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds.
*/
private def fetchData(
offset: Long,
pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
untilOffset: Long,
pollTimeoutMs: Long,
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
// This is the first fetch, or the last pre-fetched data has been drained.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
Expand All @@ -190,10 +194,31 @@ private[kafka010] case class CachedKafkaConsumer private(
} else {
val record = fetchedData.next()
nextOffsetInFetchedData = record.offset + 1
// `seek` is always called before "poll". So "record.offset" must be same as "offset".
assert(record.offset == offset,
s"The fetched data has a different offset: expected $offset but was ${record.offset}")
record
// In general, Kafka uses the specified offset as the start point, and tries to fetch the next
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// available offset. Hence we need to handle offset mismatch.
if (record.offset > offset) {
// This may happen when some records aged out but their offsets already got verified
if (failOnDataLoss) {
reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})")
// Never happen as "reportDataLoss" will throw an exception
null
} else {
if (record.offset >= untilOffset) {
reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
null
} else {
reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
record
}
}
} else if (record.offset < offset) {
// This should not happen. If it does happen, then we probably misunderstand Kafka internal
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// mechanism.
throw new IllegalStateException(
s"Tried to fetch $offset but the returned record offset was ${record.offset}")
} else {
record
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private[kafka010] case class KafkaSource(
sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt

private val offsetFetchAttemptIntervalMs =
sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong
sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong

private val maxOffsetsPerTrigger =
sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkContext
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}

abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {

Expand Down Expand Up @@ -811,6 +812,11 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared

private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"

override def createSparkSession(): TestSparkSession = {
// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
}

override def beforeAll(): Unit = {
super.beforeAll()
testUtils = new KafkaTestUtils {
Expand Down Expand Up @@ -839,7 +845,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
}
}

ignore("stress test for failOnDataLoss=false") {
test("stress test for failOnDataLoss=false") {
val reader = spark
.readStream
.format("kafka")
Expand All @@ -848,6 +854,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
.option("subscribePattern", "failOnDataLoss.*")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.option("fetchOffset.retryIntervalMs", "3000")
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class KafkaTestUtils extends Logging {
def deleteTopic(topic: String): Unit = {
val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size
AdminUtils.deleteTopic(zkUtils, topic)
verifyTopicDeletion(zkUtils, topic, partitions, List(this.server))
verifyTopicDeletionWithRetries(zkUtils, topic, partitions, List(this.server))
}

/** Add new paritions to a Kafka topic */
Expand Down Expand Up @@ -286,36 +286,57 @@ class KafkaTestUtils extends Logging {
props
}

/** Verify topic is deleted in all places, e.g, brokers, zookeeper. */
private def verifyTopicDeletion(
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]): Unit = {
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))

import ZkUtils._
Copy link
Member Author

@zsxwing zsxwing Dec 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed this method to assertTopicDeleted and moved out of this method. I also rewrote it to make it tell us which line fails.

// wait until admin path for delete topic is deleted, signaling completion of topic deletion
assert(
!zkUtils.pathExists(getDeleteTopicPath(topic)),
s"${getDeleteTopicPath(topic)} still exists")
assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
// ensure that the topic-partition has been deleted from all brokers' replica managers
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
s"topic $topic still exists in the replica manager")
// ensure that logs from all replicas are deleted if delete topic is marked successful
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.getLogManager().getLog(tp).isEmpty)),
s"topic $topic still exists in log mananger")
// ensure that topic is removed from all cleaner offsets
assert(servers.forall(server => topicAndPartitions.forall { tp =>
val checkpoints = server.getLogManager().logDirs.map { logDir =>
new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
}), s"checkpoint for topic $topic still exists")
// ensure the topic is gone
assert(
!zkUtils.getAllTopics().contains(topic),
s"topic $topic still exists on zookeeper")
}

/** Verify topic is deleted. Retry to delete the topic if not. */
private def verifyTopicDeletionWithRetries(
zkUtils: ZkUtils,
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]) {
import ZkUtils._
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
def isDeleted(): Boolean = {
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
val deletePath = !zkUtils.pathExists(getDeleteTopicPath(topic))
val topicPath = !zkUtils.pathExists(getTopicPath(topic))
// ensure that the topic-partition has been deleted from all brokers' replica managers
val replicaManager = servers.forall(server => topicAndPartitions.forall(tp =>
server.replicaManager.getPartition(tp.topic, tp.partition) == None))
// ensure that logs from all replicas are deleted if delete topic is marked successful
val logManager = servers.forall(server => topicAndPartitions.forall(tp =>
server.getLogManager().getLog(tp).isEmpty))
// ensure that topic is removed from all cleaner offsets
val cleaner = servers.forall(server => topicAndPartitions.forall { tp =>
val checkpoints = server.getLogManager().logDirs.map { logDir =>
new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
})
// ensure the topic is gone
val deleted = !zkUtils.getAllTopics().contains(topic)
deletePath && topicPath && replicaManager && logManager && cleaner && deleted
}
eventually(timeout(60.seconds)) {
assert(isDeleted, s"$topic not deleted after timeout")
eventually(timeout(60.seconds), interval(200.millis)) {
try {
verifyTopicDeletion(topic, numPartitions, servers)
} catch {
case e: Throwable =>
// As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
// chance that a topic will be recreated after deletion due to the asynchronous update.
// Hence, delete the topic and retry.
AdminUtils.deleteTopic(zkUtils, topic)
throw e
}
}
}

Expand All @@ -331,7 +352,7 @@ class KafkaTestUtils extends Logging {
case _ =>
false
}
eventually(timeout(10.seconds)) {
eventually(timeout(60.seconds)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increase this timeout as it seems 10 seconds is not enough sometimes.

assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,18 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
*/
protected implicit def sqlContext: SQLContext = _spark.sqlContext

protected def createSparkSession: TestSparkSession = {
new TestSparkSession(
sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName))
}

/**
* Initialize the [[TestSparkSession]].
*/
protected override def beforeAll(): Unit = {
SparkSession.sqlListener.set(null)
if (_spark == null) {
_spark = new TestSparkSession(
sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName))
_spark = createSparkSession
}
// Ensure we have initialized the context before calling parent code
super.beforeAll()
Expand Down