Skip to content

Commit e686f10

Browse files
committed
address
1 parent ba02dd3 commit e686f10

File tree

1 file changed

+38
-32
lines changed

1 file changed

+38
-32
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -138,29 +138,29 @@ private[kafka010] class KafkaOffsetReader(
138138
// Poll to get the latest assigned partitions
139139
consumer.poll(0)
140140
val partitions = consumer.assignment()
141+
142+
// Call `position` to wait until the potential offset request triggered by `poll(0)` is
143+
// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
144+
// `poll(0)` may reset offsets that should have been set by another request.
145+
partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
146+
141147
consumer.pause(partitions)
142148
assert(partitions.asScala == partitionOffsets.keySet,
143149
"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
144150
"Use -1 for latest, -2 for earliest, if you don't care.\n" +
145151
s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
146152
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets")
147153

148-
def _fetchOffsets(): PartitionOffsetMap = {
149-
partitionOffsets.foreach {
150-
case (tp, KafkaOffsetRangeLimit.LATEST) =>
151-
consumer.seekToEnd(ju.Arrays.asList(tp))
152-
case (tp, KafkaOffsetRangeLimit.EARLIEST) =>
153-
consumer.seekToBeginning(ju.Arrays.asList(tp))
154-
case (tp, off) => consumer.seek(tp, off)
155-
}
156-
partitionOffsets.map {
157-
case (tp, _) => tp -> consumer.position(tp)
158-
}
154+
partitionOffsets.foreach {
155+
case (tp, KafkaOffsetRangeLimit.LATEST) =>
156+
consumer.seekToEnd(ju.Arrays.asList(tp))
157+
case (tp, KafkaOffsetRangeLimit.EARLIEST) =>
158+
consumer.seekToBeginning(ju.Arrays.asList(tp))
159+
case (tp, off) => consumer.seek(tp, off)
160+
}
161+
partitionOffsets.map {
162+
case (tp, _) => tp -> consumer.position(tp)
159163
}
160-
161-
// Fetch the offsets twice to reduce the chance to hit KAFKA-7703.
162-
_fetchOffsets()
163-
_fetchOffsets()
164164
}
165165
}
166166

@@ -200,30 +200,32 @@ private[kafka010] class KafkaOffsetReader(
200200
* Fetch the latest offsets for the topic partitions that are indicated
201201
* in the [[ConsumerStrategy]].
202202
*
203-
* Kafka may return earliest offsets when we are requesting latest offsets (KAFKA-7703). To avoid
204-
* hitting this issue, we will use the given `knownOffsets` to audit the latest offsets returned
205-
* by Kafka, if we find some incorrect offsets (a latest offset is less than an offset in
206-
* `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. If `knownOffsets` is not
207-
* provided, we simply fetch the latest offsets twice and use the second result which is more
208-
* likely correct.
203+
* Kafka may return earliest offsets when we are requesting latest offsets if `poll` is called
204+
* right before `seekToEnd` (KAFKA-7703). As a workaround, we will call `position` right after
205+
* `poll` to wait until the potential offset request triggered by `poll(0)` is done.
209206
*
210-
* When a topic is recreated, the latest offsets may be less than offsets in `knownOffsets`. We
211-
* cannot distinguish this with KAFKA-7703, so we just return whatever we get from Kafka after
212-
* retrying.
207+
* In addition, to avoid other unknown issues, we also use the given `knownOffsets` to audit the
208+
* latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less
209+
* than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. When
210+
* a topic is recreated, the latest offsets may be less than offsets in `knownOffsets`. We cannot
211+
* distinguish this with KAFKA-7703, so we just return whatever we get from Kafka after retrying.
213212
*/
214213
def fetchLatestOffsets(
215214
knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap = runUninterruptibly {
216215
withRetriesWithoutInterrupt {
217216
// Poll to get the latest assigned partitions
218217
consumer.poll(0)
219218
val partitions = consumer.assignment()
219+
220+
// Call `position` to wait until the potential offset request triggered by `poll(0)` is
221+
// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
222+
// `poll(0)` may reset offsets that should have been set by another request.
223+
partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
224+
220225
consumer.pause(partitions)
221226
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.")
222227

223228
if (knownOffsets.isEmpty) {
224-
// Fetch the latest offsets twice and use the second result which is more likely correct.
225-
consumer.seekToEnd(partitions)
226-
partitions.asScala.map(p => p -> consumer.position(p)).toMap
227229
consumer.seekToEnd(partitions)
228230
partitions.asScala.map(p => p -> consumer.position(p)).toMap
229231
} else {
@@ -233,7 +235,7 @@ private[kafka010] class KafkaOffsetReader(
233235
* Compare `knownOffsets` and `partitionOffsets`. Returns all partitions that have incorrect
234236
* latest offset (offset in `knownOffsets` is great than the one in `partitionOffsets`).
235237
*/
236-
def findIncorrectOffsets: Seq[(TopicPartition, Long, Long)] = {
238+
def findIncorrectOffsets(): Seq[(TopicPartition, Long, Long)] = {
237239
var incorrectOffsets = ArrayBuffer[(TopicPartition, Long, Long)]()
238240
partitionOffsets.foreach { case (tp, offset) =>
239241
knownOffsets.foreach(_.get(tp).foreach { knownOffset =>
@@ -259,10 +261,14 @@ private[kafka010] class KafkaOffsetReader(
259261
partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
260262
attempt += 1
261263

262-
incorrectOffsets = findIncorrectOffsets
263-
if (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts) {
264-
logWarning("Retrying to fetch latest offsets because of incorrect offsets " +
265-
"(partition, previous offset, fetched offset): " + incorrectOffsets)
264+
incorrectOffsets = findIncorrectOffsets()
265+
if (incorrectOffsets.nonEmpty) {
266+
logWarning("Found incorrect offsets in some partitions " +
267+
s"(partition, previous offset, fetched offset): $incorrectOffsets")
268+
}
269+
if (attempt < maxOffsetFetchAttempts) {
270+
logWarning("Retrying to fetch latest offsets because of incorrect offsets")
271+
Thread.sleep(offsetFetchAttemptIntervalMs)
266272
}
267273
} while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)
268274

0 commit comments

Comments
 (0)