Skip to content

Commit 7dbde3d

Browse files
zsxwingJackey Lee
authored andcommitted
[SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka
## What changes were proposed in this pull request? Due to [KAFKA-7703](https://issues.apache.org/jira/browse/KAFKA-7703), Kafka may return an earliest offset when we are request a latest offset. This will cause Spark to reprocess data. As per suggestion in KAFKA-7703, we put a position call between poll and seekToEnd to block the fetch request triggered by `poll` before calling `seekToEnd`. In addition, to avoid other unknown issues, we also use the previous known offsets to audit the latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. ## How was this patch tested? Jenkins Closes apache#23324 from zsxwing/SPARK-26267. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
1 parent cda3654 commit 7dbde3d

File tree

6 files changed

+145
-13
lines changed

6 files changed

+145
-13
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class KafkaContinuousReadSupport(
6060
override def initialOffset(): Offset = {
6161
val offsets = initialOffsets match {
6262
case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
63-
case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets())
63+
case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
6464
case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
6565
}
6666
logInfo(s"Initial offsets: $offsets")
@@ -107,7 +107,7 @@ class KafkaContinuousReadSupport(
107107

108108
override def needsReconfiguration(config: ScanConfig): Boolean = {
109109
val knownPartitions = config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions
110-
offsetReader.fetchLatestOffsets().keySet != knownPartitions
110+
offsetReader.fetchLatestOffsets(None).keySet != knownPartitions
111111
}
112112

113113
override def toString(): String = s"KafkaSource[$offsetReader]"

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
8484

8585
override def latestOffset(start: Offset): Offset = {
8686
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
87-
val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
87+
val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
8888
endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets =>
8989
rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
9090
}.getOrElse {
@@ -133,10 +133,21 @@ private[kafka010] class KafkaMicroBatchReadSupport(
133133
}.toSeq
134134
logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
135135

136+
val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets
137+
val untilOffsets = endPartitionOffsets
138+
untilOffsets.foreach { case (tp, untilOffset) =>
139+
fromOffsets.get(tp).foreach { fromOffset =>
140+
if (untilOffset < fromOffset) {
141+
reportDataLoss(s"Partition $tp's offset was changed from " +
142+
s"$fromOffset to $untilOffset, some data may have been missed")
143+
}
144+
}
145+
}
146+
136147
// Calculate offset ranges
137148
val offsetRanges = rangeCalculator.getRanges(
138-
fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
139-
untilOffsets = endPartitionOffsets,
149+
fromOffsets = fromOffsets,
150+
untilOffsets = untilOffsets,
140151
executorLocations = getSortedExecutorList())
141152

142153
// Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions,
@@ -186,7 +197,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
186197
case EarliestOffsetRangeLimit =>
187198
KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
188199
case LatestOffsetRangeLimit =>
189-
KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
200+
KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None))
190201
case SpecificOffsetRangeLimit(p) =>
191202
kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
192203
}

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int
3737
* the read tasks of the skewed partitions to multiple Spark tasks.
3838
* The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more
3939
* depending on rounding errors or Kafka partitions that didn't receive any new data.
40+
*
41+
* Empty ranges (`KafkaOffsetRange.size <= 0`) will be dropped.
4042
*/
4143
def getRanges(
4244
fromOffsets: PartitionOffsetMap,

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.{util => ju}
2121
import java.util.concurrent.{Executors, ThreadFactory}
2222

2323
import scala.collection.JavaConverters._
24+
import scala.collection.mutable.ArrayBuffer
2425
import scala.concurrent.{ExecutionContext, Future}
2526
import scala.concurrent.duration.Duration
2627
import scala.util.control.NonFatal
@@ -137,6 +138,12 @@ private[kafka010] class KafkaOffsetReader(
137138
// Poll to get the latest assigned partitions
138139
consumer.poll(0)
139140
val partitions = consumer.assignment()
141+
142+
// Call `position` to wait until the potential offset request triggered by `poll(0)` is
143+
// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
144+
// `poll(0)` may reset offsets that should have been set by another request.
145+
partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
146+
140147
consumer.pause(partitions)
141148
assert(partitions.asScala == partitionOffsets.keySet,
142149
"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
@@ -192,19 +199,82 @@ private[kafka010] class KafkaOffsetReader(
192199
/**
193200
* Fetch the latest offsets for the topic partitions that are indicated
194201
* in the [[ConsumerStrategy]].
202+
*
203+
* Kafka may return earliest offsets when we are requesting latest offsets if `poll` is called
204+
* right before `seekToEnd` (KAFKA-7703). As a workaround, we will call `position` right after
205+
* `poll` to wait until the potential offset request triggered by `poll(0)` is done.
206+
*
207+
* In addition, to avoid other unknown issues, we also use the given `knownOffsets` to audit the
208+
* latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less
209+
* than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. When
210+
* a topic is recreated, the latest offsets may be less than offsets in `knownOffsets`. We cannot
211+
* distinguish this with KAFKA-7703, so we just return whatever we get from Kafka after retrying.
195212
*/
196-
def fetchLatestOffsets(): Map[TopicPartition, Long] = runUninterruptibly {
213+
def fetchLatestOffsets(
214+
knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap = runUninterruptibly {
197215
withRetriesWithoutInterrupt {
198216
// Poll to get the latest assigned partitions
199217
consumer.poll(0)
200218
val partitions = consumer.assignment()
219+
220+
// Call `position` to wait until the potential offset request triggered by `poll(0)` is
221+
// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
222+
// `poll(0)` may reset offsets that should have been set by another request.
223+
partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
224+
201225
consumer.pause(partitions)
202226
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.")
203227

204-
consumer.seekToEnd(partitions)
205-
val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
206-
logDebug(s"Got latest offsets for partition : $partitionOffsets")
207-
partitionOffsets
228+
if (knownOffsets.isEmpty) {
229+
consumer.seekToEnd(partitions)
230+
partitions.asScala.map(p => p -> consumer.position(p)).toMap
231+
} else {
232+
var partitionOffsets: PartitionOffsetMap = Map.empty
233+
234+
/**
235+
* Compare `knownOffsets` and `partitionOffsets`. Returns all partitions that have incorrect
236+
* latest offset (offset in `knownOffsets` is great than the one in `partitionOffsets`).
237+
*/
238+
def findIncorrectOffsets(): Seq[(TopicPartition, Long, Long)] = {
239+
var incorrectOffsets = ArrayBuffer[(TopicPartition, Long, Long)]()
240+
partitionOffsets.foreach { case (tp, offset) =>
241+
knownOffsets.foreach(_.get(tp).foreach { knownOffset =>
242+
if (knownOffset > offset) {
243+
val incorrectOffset = (tp, knownOffset, offset)
244+
incorrectOffsets += incorrectOffset
245+
}
246+
})
247+
}
248+
incorrectOffsets
249+
}
250+
251+
// Retry to fetch latest offsets when detecting incorrect offsets. We don't use
252+
// `withRetriesWithoutInterrupt` to retry because:
253+
//
254+
// - `withRetriesWithoutInterrupt` will reset the consumer for each attempt but a fresh
255+
// consumer has a much bigger chance to hit KAFKA-7703.
256+
// - Avoid calling `consumer.poll(0)` which may cause KAFKA-7703.
257+
var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil
258+
var attempt = 0
259+
do {
260+
consumer.seekToEnd(partitions)
261+
partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
262+
attempt += 1
263+
264+
incorrectOffsets = findIncorrectOffsets()
265+
if (incorrectOffsets.nonEmpty) {
266+
logWarning("Found incorrect offsets in some partitions " +
267+
s"(partition, previous offset, fetched offset): $incorrectOffsets")
268+
if (attempt < maxOffsetFetchAttempts) {
269+
logWarning("Retrying to fetch latest offsets because of incorrect offsets")
270+
Thread.sleep(offsetFetchAttemptIntervalMs)
271+
}
272+
}
273+
} while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)
274+
275+
logDebug(s"Got latest offsets for partition : $partitionOffsets")
276+
partitionOffsets
277+
}
208278
}
209279
}
210280

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private[kafka010] class KafkaSource(
130130
metadataLog.get(0).getOrElse {
131131
val offsets = startingOffsets match {
132132
case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
133-
case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
133+
case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets(None))
134134
case SpecificOffsetRangeLimit(p) => kafkaReader.fetchSpecificOffsets(p, reportDataLoss)
135135
}
136136
metadataLog.add(0, offsets)
@@ -148,7 +148,8 @@ private[kafka010] class KafkaSource(
148148
// Make sure initialPartitionOffsets is initialized
149149
initialPartitionOffsets
150150

151-
val latest = kafkaReader.fetchLatestOffsets()
151+
val latest = kafkaReader.fetchLatestOffsets(
152+
currentPartitionOffsets.orElse(Some(initialPartitionOffsets)))
152153
val offsets = maxOffsetsPerTrigger match {
153154
case None =>
154155
latest

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,54 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
329329
)
330330
}
331331

332+
test("subscribe topic by pattern with topic recreation between batches") {
333+
val topicPrefix = newTopic()
334+
val topic = topicPrefix + "-good"
335+
val topic2 = topicPrefix + "-bad"
336+
testUtils.createTopic(topic, partitions = 1)
337+
testUtils.sendMessages(topic, Array("1", "3"))
338+
testUtils.createTopic(topic2, partitions = 1)
339+
testUtils.sendMessages(topic2, Array("2", "4"))
340+
341+
val reader = spark
342+
.readStream
343+
.format("kafka")
344+
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
345+
.option("kafka.metadata.max.age.ms", "1")
346+
.option("kafka.default.api.timeout.ms", "3000")
347+
.option("startingOffsets", "earliest")
348+
.option("subscribePattern", s"$topicPrefix-.*")
349+
350+
val ds = reader.load()
351+
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
352+
.as[(String, String)]
353+
.map(kv => kv._2.toInt)
354+
355+
testStream(ds)(
356+
StartStream(),
357+
AssertOnQuery { q =>
358+
q.processAllAvailable()
359+
true
360+
},
361+
CheckAnswer(1, 2, 3, 4),
362+
// Restart the stream in this test to make the test stable. When recreating a topic when a
363+
// consumer is alive, it may not be able to see the recreated topic even if a fresh consumer
364+
// has seen it.
365+
StopStream,
366+
// Recreate `topic2` and wait until it's available
367+
WithOffsetSync(new TopicPartition(topic2, 0), expectedOffset = 1) { () =>
368+
testUtils.deleteTopic(topic2)
369+
testUtils.createTopic(topic2)
370+
testUtils.sendMessages(topic2, Array("6"))
371+
},
372+
StartStream(),
373+
ExpectFailure[IllegalStateException](e => {
374+
// The offset of `topic2` should be changed from 2 to 1
375+
assert(e.getMessage.contains("was changed from 2 to 1"))
376+
})
377+
)
378+
}
379+
332380
test("ensure that initial offset are written with an extra byte in the beginning (SPARK-19517)") {
333381
withTempDir { metadataPath =>
334382
val topic = "kafka-initial-offset-current"

0 commit comments

Comments
 (0)