File tree Expand file tree Collapse file tree 1 file changed +4
-2
lines changed
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010 Expand file tree Collapse file tree 1 file changed +4
-2
lines changed Original file line number Diff line number Diff line change @@ -154,7 +154,8 @@ private[kafka010] case class KafkaSource(
154154 from : Map [TopicPartition , Long ],
155155 until : Map [TopicPartition , Long ]): Map [TopicPartition , Long ] = {
156156 val fromNew = fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
157- val sizes = until.flatMap { case (tp, end) =>
157+ val sizes = until.flatMap {
158+ case (tp, end) =>
158159 // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
159160 from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
160161 val size = end - begin
@@ -166,7 +167,8 @@ private[kafka010] case class KafkaSource(
166167 if (total < 1 ) {
167168 until
168169 } else {
169- until.map { case (tp, end) =>
170+ until.map {
171+ case (tp, end) =>
170172 tp -> sizes.get(tp).map { size =>
171173 val begin = from.get(tp).getOrElse(fromNew(tp))
172174 val prorate = limit * (size / total)
You can’t perform that action at this time.
0 commit comments