Skip to content

Commit dd7c387

Browse files
committed
[SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka
Due to [KAFKA-7703](https://issues.apache.org/jira/browse/KAFKA-7703), Kafka may return an earliest offset when we are request a latest offset. This will cause Spark to reprocess data. As per suggestion in KAFKA-7703, we put a position call between poll and seekToEnd to block the fetch request triggered by `poll` before calling `seekToEnd`. In addition, to avoid other unknown issues, we also use the previous known offsets to audit the latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. Jenkins Closes #23324 from zsxwing/SPARK-26267. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 90a14d5 commit dd7c387

File tree

6 files changed

+146
-13
lines changed

6 files changed

+146
-13
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class KafkaContinuousReader(
7373
offset = start.orElse {
7474
val offsets = initialOffsets match {
7575
case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
76-
case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets())
76+
case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
7777
case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
7878
}
7979
logInfo(s"Initial offsets: $offsets")
@@ -128,7 +128,7 @@ class KafkaContinuousReader(
128128
}
129129

130130
override def needsReconfiguration(): Boolean = {
131-
knownPartitions != null && offsetReader.fetchLatestOffsets().keySet != knownPartitions
131+
knownPartitions != null && offsetReader.fetchLatestOffsets(None).keySet != knownPartitions
132132
}
133133

134134
override def toString(): String = s"KafkaSource[$offsetReader]"

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ private[kafka010] class KafkaMicroBatchReader(
9393
endPartitionOffsets = Option(end.orElse(null))
9494
.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
9595
.getOrElse {
96-
val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
96+
val latestPartitionOffsets =
97+
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
9798
maxOffsetsPerTrigger.map { maxOffsets =>
9899
rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
99100
}.getOrElse {
@@ -132,10 +133,21 @@ private[kafka010] class KafkaMicroBatchReader(
132133
}.toSeq
133134
logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
134135

136+
val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets
137+
val untilOffsets = endPartitionOffsets
138+
untilOffsets.foreach { case (tp, untilOffset) =>
139+
fromOffsets.get(tp).foreach { fromOffset =>
140+
if (untilOffset < fromOffset) {
141+
reportDataLoss(s"Partition $tp's offset was changed from " +
142+
s"$fromOffset to $untilOffset, some data may have been missed")
143+
}
144+
}
145+
}
146+
135147
// Calculate offset ranges
136148
val offsetRanges = rangeCalculator.getRanges(
137-
fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
138-
untilOffsets = endPartitionOffsets,
149+
fromOffsets = fromOffsets,
150+
untilOffsets = untilOffsets,
139151
executorLocations = getSortedExecutorList())
140152

141153
// Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions,
@@ -192,7 +204,7 @@ private[kafka010] class KafkaMicroBatchReader(
192204
case EarliestOffsetRangeLimit =>
193205
KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
194206
case LatestOffsetRangeLimit =>
195-
KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
207+
KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None))
196208
case SpecificOffsetRangeLimit(p) =>
197209
kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
198210
}

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int
3737
* the read tasks of the skewed partitions to multiple Spark tasks.
3838
* The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more
3939
* depending on rounding errors or Kafka partitions that didn't receive any new data.
40+
*
41+
* Empty ranges (`KafkaOffsetRange.size <= 0`) will be dropped.
4042
*/
4143
def getRanges(
4244
fromOffsets: PartitionOffsetMap,

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.{util => ju}
2121
import java.util.concurrent.{Executors, ThreadFactory}
2222

2323
import scala.collection.JavaConverters._
24+
import scala.collection.mutable.ArrayBuffer
2425
import scala.concurrent.{ExecutionContext, Future}
2526
import scala.concurrent.duration.Duration
2627
import scala.util.control.NonFatal
@@ -137,6 +138,12 @@ private[kafka010] class KafkaOffsetReader(
137138
// Poll to get the latest assigned partitions
138139
consumer.poll(0)
139140
val partitions = consumer.assignment()
141+
142+
// Call `position` to wait until the potential offset request triggered by `poll(0)` is
143+
// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
144+
// `poll(0)` may reset offsets that should have been set by another request.
145+
partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
146+
140147
consumer.pause(partitions)
141148
assert(partitions.asScala == partitionOffsets.keySet,
142149
"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
@@ -192,19 +199,82 @@ private[kafka010] class KafkaOffsetReader(
192199
/**
193200
* Fetch the latest offsets for the topic partitions that are indicated
194201
* in the [[ConsumerStrategy]].
202+
*
203+
* Kafka may return earliest offsets when we are requesting latest offsets if `poll` is called
204+
* right before `seekToEnd` (KAFKA-7703). As a workaround, we will call `position` right after
205+
* `poll` to wait until the potential offset request triggered by `poll(0)` is done.
206+
*
207+
* In addition, to avoid other unknown issues, we also use the given `knownOffsets` to audit the
208+
* latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less
209+
* than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. When
210+
* a topic is recreated, the latest offsets may be less than offsets in `knownOffsets`. We cannot
211+
* distinguish this with KAFKA-7703, so we just return whatever we get from Kafka after retrying.
195212
*/
196-
def fetchLatestOffsets(): Map[TopicPartition, Long] = runUninterruptibly {
213+
def fetchLatestOffsets(
214+
knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap = runUninterruptibly {
197215
withRetriesWithoutInterrupt {
198216
// Poll to get the latest assigned partitions
199217
consumer.poll(0)
200218
val partitions = consumer.assignment()
219+
220+
// Call `position` to wait until the potential offset request triggered by `poll(0)` is
221+
// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
222+
// `poll(0)` may reset offsets that should have been set by another request.
223+
partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
224+
201225
consumer.pause(partitions)
202226
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.")
203227

204-
consumer.seekToEnd(partitions)
205-
val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
206-
logDebug(s"Got latest offsets for partition : $partitionOffsets")
207-
partitionOffsets
228+
if (knownOffsets.isEmpty) {
229+
consumer.seekToEnd(partitions)
230+
partitions.asScala.map(p => p -> consumer.position(p)).toMap
231+
} else {
232+
var partitionOffsets: PartitionOffsetMap = Map.empty
233+
234+
/**
235+
* Compare `knownOffsets` and `partitionOffsets`. Returns all partitions that have incorrect
236+
* latest offset (offset in `knownOffsets` is great than the one in `partitionOffsets`).
237+
*/
238+
def findIncorrectOffsets(): Seq[(TopicPartition, Long, Long)] = {
239+
var incorrectOffsets = ArrayBuffer[(TopicPartition, Long, Long)]()
240+
partitionOffsets.foreach { case (tp, offset) =>
241+
knownOffsets.foreach(_.get(tp).foreach { knownOffset =>
242+
if (knownOffset > offset) {
243+
val incorrectOffset = (tp, knownOffset, offset)
244+
incorrectOffsets += incorrectOffset
245+
}
246+
})
247+
}
248+
incorrectOffsets
249+
}
250+
251+
// Retry to fetch latest offsets when detecting incorrect offsets. We don't use
252+
// `withRetriesWithoutInterrupt` to retry because:
253+
//
254+
// - `withRetriesWithoutInterrupt` will reset the consumer for each attempt but a fresh
255+
// consumer has a much bigger chance to hit KAFKA-7703.
256+
// - Avoid calling `consumer.poll(0)` which may cause KAFKA-7703.
257+
var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil
258+
var attempt = 0
259+
do {
260+
consumer.seekToEnd(partitions)
261+
partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
262+
attempt += 1
263+
264+
incorrectOffsets = findIncorrectOffsets()
265+
if (incorrectOffsets.nonEmpty) {
266+
logWarning("Found incorrect offsets in some partitions " +
267+
s"(partition, previous offset, fetched offset): $incorrectOffsets")
268+
if (attempt < maxOffsetFetchAttempts) {
269+
logWarning("Retrying to fetch latest offsets because of incorrect offsets")
270+
Thread.sleep(offsetFetchAttemptIntervalMs)
271+
}
272+
}
273+
} while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)
274+
275+
logDebug(s"Got latest offsets for partition : $partitionOffsets")
276+
partitionOffsets
277+
}
208278
}
209279
}
210280

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private[kafka010] class KafkaSource(
130130
metadataLog.get(0).getOrElse {
131131
val offsets = startingOffsets match {
132132
case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
133-
case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
133+
case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets(None))
134134
case SpecificOffsetRangeLimit(p) => kafkaReader.fetchSpecificOffsets(p, reportDataLoss)
135135
}
136136
metadataLog.add(0, offsets)
@@ -148,7 +148,8 @@ private[kafka010] class KafkaSource(
148148
// Make sure initialPartitionOffsets is initialized
149149
initialPartitionOffsets
150150

151-
val latest = kafkaReader.fetchLatestOffsets()
151+
val latest = kafkaReader.fetchLatestOffsets(
152+
currentPartitionOffsets.orElse(Some(initialPartitionOffsets)))
152153
val offsets = maxOffsetsPerTrigger match {
153154
case None =>
154155
latest

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,54 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
327327
)
328328
}
329329

330+
test("subscribe topic by pattern with topic recreation between batches") {
331+
val topicPrefix = newTopic()
332+
val topic = topicPrefix + "-good"
333+
val topic2 = topicPrefix + "-bad"
334+
testUtils.createTopic(topic, partitions = 1)
335+
testUtils.sendMessages(topic, Array("1", "3"))
336+
testUtils.createTopic(topic2, partitions = 1)
337+
testUtils.sendMessages(topic2, Array("2", "4"))
338+
339+
val reader = spark
340+
.readStream
341+
.format("kafka")
342+
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
343+
.option("kafka.metadata.max.age.ms", "1")
344+
.option("kafka.default.api.timeout.ms", "3000")
345+
.option("startingOffsets", "earliest")
346+
.option("subscribePattern", s"$topicPrefix-.*")
347+
348+
val ds = reader.load()
349+
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
350+
.as[(String, String)]
351+
.map(kv => kv._2.toInt)
352+
353+
testStream(ds)(
354+
StartStream(),
355+
AssertOnQuery { q =>
356+
q.processAllAvailable()
357+
true
358+
},
359+
CheckAnswer(1, 2, 3, 4),
360+
// Restart the stream in this test to make the test stable. When recreating a topic when a
361+
// consumer is alive, it may not be able to see the recreated topic even if a fresh consumer
362+
// has seen it.
363+
StopStream,
364+
// Recreate `topic2` and wait until it's available
365+
WithOffsetSync(new TopicPartition(topic2, 0), expectedOffset = 1) { () =>
366+
testUtils.deleteTopic(topic2)
367+
testUtils.createTopic(topic2)
368+
testUtils.sendMessages(topic2, Array("6"))
369+
},
370+
StartStream(),
371+
ExpectFailure[IllegalStateException](e => {
372+
// The offset of `topic2` should be changed from 2 to 1
373+
assert(e.getMessage.contains("was changed from 2 to 1"))
374+
})
375+
)
376+
}
377+
330378
test("ensure that initial offset are written with an extra byte in the beginning (SPARK-19517)") {
331379
withTempDir { metadataPath =>
332380
val topic = "kafka-initial-offset-current"

0 commit comments

Comments
 (0)