From c45ded7109474fcb40f03c772192eb38398f328a Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Thu, 13 Oct 2016 23:23:02 -0500 Subject: [PATCH 1/9] [SPARK-17812][SQL][KAFKA] parse json for topicpartitions and offsets --- .../apache/spark/sql/kafka010/JsonUtils.scala | 71 +++++++++++++++++++ .../spark/sql/kafka010/JsonUtilsSuite.scala | 45 ++++++++++++ 2 files changed, 116 insertions(+) create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala create mode 100644 external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/JsonUtilsSuite.scala diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala new file mode 100644 index 0000000000000..913cd640eee06 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import scala.collection.mutable.{ ArrayBuffer, HashMap } +import scala.util.control.NonFatal + +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.kafka.common.TopicPartition + +private object JsonUtils { + private val mapper = new ObjectMapper() + + def partitions(str: String): Array[TopicPartition] = { + try { + val res = new ArrayBuffer[TopicPartition]() + val topics = mapper.readTree(str).fields + while (topics.hasNext) { + val node = topics.next + val topic = node.getKey + val parts = node.getValue.elements + while (parts.hasNext) { + res.append(new TopicPartition(topic, parts.next().asInt)) + } + } + res.toArray + } catch { + case NonFatal(x) => + throw new IllegalArgumentException( + s"""Expected e.g. {"topicA":[0,1],"topicB":[0,1]}, got $str""") + } + } + + def partitionOffsets(str: String): Map[TopicPartition, Long] = { + try { + val res = new HashMap[TopicPartition, Long] + val topics = mapper.readTree(str).fields + while (topics.hasNext) { + val node = topics.next + val topic = node.getKey + val parts = node.getValue.fields + while (parts.hasNext) { + val node = parts.next + val part = node.getKey.toInt + val offset = node.getValue.asLong + res += new TopicPartition(topic, part) -> offset + } + } + res.toMap + } catch { + case NonFatal(x) => + throw new IllegalArgumentException( + s"""Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got $str""") + } + } +} diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/JsonUtilsSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/JsonUtilsSuite.scala new file mode 100644 index 0000000000000..54b980049d1a2 --- /dev/null +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/JsonUtilsSuite.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkFunSuite + +class JsonUtilsSuite extends SparkFunSuite { + + test("parsing partitions") { + val parsed = JsonUtils.partitions("""{"topicA":[0,1],"topicB":[4,6]}""") + val expected = Array( + new TopicPartition("topicA", 0), + new TopicPartition("topicA", 1), + new TopicPartition("topicB", 4), + new TopicPartition("topicB", 6) + ) + assert(parsed.toSeq === expected.toSeq) + } + + test("parsing partitionOffsets") { + val parsed = JsonUtils.partitionOffsets( + """{"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}""") + + assert(parsed(new TopicPartition("topicA", 0)) === 23) + assert(parsed(new TopicPartition("topicA", 1)) === -1) + assert(parsed(new TopicPartition("topicB", 0)) === -2) + } +} From 3120fd8ade24140777c29fc1487aa3f6e76152fb Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Fri, 14 Oct 2016 16:37:35 -0500 Subject: [PATCH 2/9] [SPARK-17812][SQL][KAFKA] implement specified offsets and assign --- .../spark/sql/kafka010/KafkaSource.scala | 54 ++++++++++++++++--- .../sql/kafka010/KafkaSourceProvider.scala | 45 +++++++--------- .../spark/sql/kafka010/StartingOffsets.scala | 32 +++++++++++ .../spark/sql/kafka010/KafkaSourceSuite.scala | 6 +-- 4 files changed, 101 insertions(+), 36 deletions(-) create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/StartingOffsets.scala diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 4b0bb0a0f725c..a4237feb5fd5f 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -22,7 +22,7 @@ import java.{util => ju} import scala.collection.JavaConverters._ import scala.util.control.NonFatal -import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer} +import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer, OffsetOutOfRangeException} import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.common.TopicPartition @@ -82,7 +82,7 @@ private[kafka010] case class KafkaSource( executorKafkaParams: ju.Map[String, Object], sourceOptions: Map[String, String], metadataPath: String, - startFromEarliestOffset: Boolean, + startingOffsets: StartingOffsets, failOnDataLoss: Boolean) extends Source with Logging { @@ -110,10 +110,10 @@ private[kafka010] case class KafkaSource( private lazy val initialPartitionOffsets = { val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) metadataLog.get(0).getOrElse { - val offsets = if (startFromEarliestOffset) { - KafkaSourceOffset(fetchEarliestOffsets()) - } else { - KafkaSourceOffset(fetchLatestOffsets()) + val offsets = startingOffsets match { + case EarliestOffsets => KafkaSourceOffset(fetchEarliestOffsets()) + case LatestOffsets => KafkaSourceOffset(fetchLatestOffsets()) + case SpecificOffsets(p) => KafkaSourceOffset(fetchSpecificStartingOffsets(p)) } metadataLog.add(0, offsets) logInfo(s"Initial offsets: $offsets") @@ -231,6 +231,33 @@ private[kafka010] case class KafkaSource( override def toString(): String = s"KafkaSource[$consumerStrategy]" + /** + * Set consumer position to specified offsets, making sure all assignments are set. + */ + private def fetchSpecificStartingOffsets( + partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = + withRetriesWithoutInterrupt { + // Poll to get the latest assigned partitions + consumer.poll(0) + val partitions = consumer.assignment() + consumer.pause(partitions) + assert(partitions.asScala == partitionOffsets.keySet, + "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + + "Use -1 for latest, -2 for earliest, if you don't care.\n" + + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") + + // These offsets may be out of range, but there isn't a good way of determining that here, + // even poll(0) afterwards may not throw immediately. + // The executor should throw if it is assigned out of range offsets. + partitionOffsets.foreach { + case (tp, -1) => consumer.seekToEnd(ju.Arrays.asList(tp)) + case (tp, -2) => consumer.seekToBeginning(ju.Arrays.asList(tp)) + case (tp, off) => consumer.seek(tp, off) + } + partitionOffsets + } + /** * Fetch the earliest offsets of partitions. */ @@ -273,7 +300,7 @@ private[kafka010] case class KafkaSource( consumer.poll(0) val partitions = consumer.assignment() consumer.pause(partitions) - logDebug(s"\tPartitioned assigned to consumer: $partitions") + logDebug(s"\tPartitions assigned to consumer: $partitions") // Get the earliest offset of each partition consumer.seekToBeginning(partitions) @@ -317,6 +344,8 @@ private[kafka010] case class KafkaSource( try { result = Some(body) } catch { + case x: OffsetOutOfRangeException => + reportDataLoss(x.getMessage) case NonFatal(e) => lastException = e logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e) @@ -373,6 +402,17 @@ private[kafka010] object KafkaSource { def createConsumer(): Consumer[Array[Byte], Array[Byte]] } + case class AssignStrategy(partitions: Array[TopicPartition], kafkaParams: ju.Map[String, Object]) + extends ConsumerStrategy { + override def createConsumer(): Consumer[Array[Byte], Array[Byte]] = { + val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams) + consumer.assign(ju.Arrays.asList(partitions: _*)) + consumer + } + + override def toString: String = s"Assign[${partitions.mkString(", ")}]" + } + case class SubscribeStrategy(topics: Seq[String], kafkaParams: ju.Map[String, Object]) extends ConsumerStrategy { override def createConsumer(): Consumer[Array[Byte], Array[Byte]] = { diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 23b1b60f3bcaa..33c42a6451fcc 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -77,14 +77,12 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider // id. Hence, we should generate a unique id for each query. val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" - val startFromEarliestOffset = - caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY).map(_.trim.toLowerCase) match { - case Some("latest") => false - case Some("earliest") => true - case Some(pos) => - // This should not happen since we have already checked the options. - throw new IllegalStateException(s"Invalid $STARTING_OFFSET_OPTION_KEY: $pos") - case None => false + val startingOffsets = + caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { + case Some("latest") => LatestOffsets + case Some("earliest") => EarliestOffsets + case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json)) + case None => LatestOffsets } val kafkaParamsForStrategy = @@ -95,9 +93,9 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider // So that consumers in Kafka source do not mess with any existing group id .set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-driver") - // Set to "latest" to avoid exceptions. However, KafkaSource will fetch the initial offsets - // by itself instead of counting on KafkaConsumer. - .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") + // Set to "earliest" to avoid exceptions. However, KafkaSource will fetch the initial + // offsets by itself instead of counting on KafkaConsumer. + .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // So that consumers in the driver does not commit offsets unnecessarily .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") @@ -130,6 +128,10 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider .build() val strategy = caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match { + case ("assign", value) => + AssignStrategy( + JsonUtils.partitions(value), + kafkaParamsForStrategy) case ("subscribe", value) => SubscribeStrategy( value.split(",").map(_.trim()).filter(_.nonEmpty), @@ -153,7 +155,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider kafkaParamsForExecutors, parameters, metadataPath, - startFromEarliestOffset, + startingOffsets, failOnDataLoss) } @@ -195,14 +197,6 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider throw new IllegalArgumentException("Unknown option") } - caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY) match { - case Some(pos) if !STARTING_OFFSET_OPTION_VALUES.contains(pos.trim.toLowerCase) => - throw new IllegalArgumentException( - s"Illegal value '$pos' for option '$STARTING_OFFSET_OPTION_KEY', " + - s"acceptable values are: ${STARTING_OFFSET_OPTION_VALUES.mkString(", ")}") - case _ => - } - // Validate user-specified Kafka options if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.GROUP_ID_CONFIG}")) { @@ -215,11 +209,11 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider throw new IllegalArgumentException( s""" |Kafka option '${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}' is not supported. - |Instead set the source option '$STARTING_OFFSET_OPTION_KEY' to 'earliest' or 'latest' to - |specify where to start. Structured Streaming manages which offsets are consumed + |Instead set the source option '$STARTING_OFFSETS_OPTION_KEY' to 'earliest' or 'latest' + |to specify where to start. Structured Streaming manages which offsets are consumed |internally, rather than relying on the kafkaConsumer to do it. This will ensure that no |data is missed when when new topics/partitions are dynamically subscribed. Note that - |'$STARTING_OFFSET_OPTION_KEY' only applies when a new Streaming query is started, and + |'$STARTING_OFFSETS_OPTION_KEY' only applies when a new Streaming query is started, and |that resuming will always pick up from where the query left off. See the docs for more |details. """.stripMargin) @@ -282,8 +276,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider } private[kafka010] object KafkaSourceProvider { - private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern") - private val STARTING_OFFSET_OPTION_KEY = "startingoffset" - private val STARTING_OFFSET_OPTION_VALUES = Set("earliest", "latest") + private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern", "assign") + private val STARTING_OFFSETS_OPTION_KEY = "startingoffsets" private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss" } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/StartingOffsets.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/StartingOffsets.scala new file mode 100644 index 0000000000000..83959e597171a --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/StartingOffsets.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +/* + * Values that can be specified for config startingOffsets + */ +private[kafka010] sealed trait StartingOffsets + +private[kafka010] case object EarliestOffsets extends StartingOffsets + +private[kafka010] case object LatestOffsets extends StartingOffsets + +private[kafka010] case class SpecificOffsets( + partitionOffsets: Map[TopicPartition, Long]) extends StartingOffsets diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 8b5296ea135c7..544400b35732c 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -52,7 +52,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { protected def makeSureGetOffsetCalled = AssertOnQuery { q => // Because KafkaSource's initialPartitionOffsets is set lazily, we need to make sure // its "getOffset" is called before pushing any data. Otherwise, because of the race contion, - // we don't know which data should be fetched when `startingOffset` is latest. + // we don't know which data should be fetched when `startingOffsets` is latest. q.processAllAvailable() true } @@ -301,7 +301,7 @@ class KafkaSourceSuite extends KafkaSourceTest { val reader = spark .readStream .format("kafka") - .option("startingOffset", s"latest") + .option("startingOffsets", s"latest") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") options.foreach { case (k, v) => reader.option(k, v) } @@ -340,7 +340,7 @@ class KafkaSourceSuite extends KafkaSourceTest { val reader = spark.readStream reader .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$")) - .option("startingOffset", s"earliest") + .option("startingOffsets", s"earliest") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") options.foreach { case (k, v) => reader.option(k, v) } From 35bb8c3cfe77f2cb3d26f4afd3364caa6d0ec4cf Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Sat, 15 Oct 2016 22:00:20 -0500 Subject: [PATCH 3/9] [SPARK-17812][SQL][KAFKA] doc and test updates --- .../structured-streaming-kafka-integration.md | 38 ++++-- .../apache/spark/sql/kafka010/JsonUtils.scala | 36 ++++++ .../spark/sql/kafka010/KafkaSource.scala | 4 +- .../sql/kafka010/KafkaSourceProvider.scala | 7 ++ .../spark/sql/kafka010/KafkaSourceSuite.scala | 115 ++++++++++++++++-- .../spark/sql/kafka010/KafkaTestUtils.scala | 14 ++- 6 files changed, 192 insertions(+), 22 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 668489addf82c..c9402173a0e75 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -150,16 +150,25 @@ The following options must be set for the Kafka source. + + + + + - + - @@ -174,16 +183,21 @@ The following configurations are optional:
Optionvaluemeaning
assignjson string {"topicA":[0,1],"topicB":[2,4]}Specific TopicPartitions to consume. + Only one of "assign", "subscribe" or "subscribePattern" + options can be specified for Kafka source.
subscribe A comma-separated list of topicsThe topic list to subscribe. Only one of "subscribe" and "subscribePattern" options can be - specified for Kafka source.The topic list to subscribe. + Only one of "assign", "subscribe" or "subscribePattern" + options can be specified for Kafka source.
subscribePattern Java regex stringThe pattern used to subscribe the topic. Only one of "subscribe" and "subscribePattern" + The pattern used to subscribe to topic(s). + Only one of "assign, "subscribe" or "subscribePattern" options can be specified for Kafka source.
- - - - + + + + - + diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala index fff57b484d4ac..6e91f4f69b81c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala @@ -26,13 +26,15 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.node._ import org.apache.kafka.common.TopicPartition -/* +/** * Utilities for converting Kafka related objects to and from json. */ private object JsonUtils { private val mapper = new ObjectMapper() - /* Read TopicPartitions from json string */ + /** + * Read TopicPartitions from json string + */ def partitions(str: String): Array[TopicPartition] = { try { val res = new ArrayBuffer[TopicPartition]() @@ -53,7 +55,9 @@ private object JsonUtils { } } - /* Write TopicPartitions as json */ + /** + * Write TopicPartitions as json + */ def partitions(partitions: Iterable[TopicPartition], writer: Writer): Unit = { val root = mapper.createObjectNode() partitions.foreach { tp => @@ -67,7 +71,9 @@ private object JsonUtils { mapper.writeValue(writer, root) } - /* Read per-TopicPartition offsets from json string */ + /** + * Read per-TopicPartition offsets from json string + */ def partitionOffsets(str: String): Map[TopicPartition, Long] = { try { val res = new HashMap[TopicPartition, Long] @@ -91,7 +97,9 @@ private object JsonUtils { } } - /* Write per-TopicPartition offsets as json */ + /** + * Write per-TopicPartition offsets as json + */ def partitionOffsets(partitionOffsets: Map[TopicPartition, Long], writer: Writer): Unit = { val root = mapper.createObjectNode() partitionOffsets.foreach { case (tp, off) => diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index acb1e4f76642b..e2467d88a8f32 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.kafka010 -import java.util.concurrent.atomic.AtomicInteger import java.io.StringWriter +import java.util.concurrent.atomic.AtomicInteger import scala.util.Random @@ -376,7 +376,7 @@ class KafkaSourceSuite extends KafkaSourceTest { StopStream, StartStream(), CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), // Should get the data back on recovery - AddKafkaData(Set(topic), 30, 31, 32, 33, 34)(ensureDataInMultiplePartition=true), + AddKafkaData(Set(topic), 30, 31, 32, 33, 34)(ensureDataInMultiplePartition = true), CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22, 30, 31, 32, 33, 34), StopStream ) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 1b89dad86c67d..9b24ccdd560e8 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -213,7 +213,7 @@ class KafkaTestUtils extends Logging { val offsets = try { messages.map { m => val record = partition match { - case Some(p) => new ProducerRecord[String, String](topic, p, null, m) + case Some(p) => new ProducerRecord[String, String](topic, p, null, m) case None => new ProducerRecord[String, String](topic, m) } val metadata = From 5e4511f0c7e84d15011a7eb8d208be13ed672b49 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Sat, 15 Oct 2016 22:52:39 -0500 Subject: [PATCH 5/9] [SPARK-17812][SQL][KAFKA] additional paranoia on reset of starting offsets --- .../apache/spark/sql/kafka010/KafkaSource.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index cf6ccdb9f4182..f5c42947fccf9 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -247,17 +247,24 @@ private[kafka010] case class KafkaSource( s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") - // These offsets may be out of range, but there isn't a good way of determining that here, - // even poll(0) afterwards may not throw immediately. - // The executor should throw if it is assigned out of range offsets. partitionOffsets.foreach { case (tp, -1) => consumer.seekToEnd(ju.Arrays.asList(tp)) case (tp, -2) => consumer.seekToBeginning(ju.Arrays.asList(tp)) case (tp, off) => consumer.seek(tp, off) } - partitionOffsets.map { + val result = partitionOffsets.map { case (tp, _) => tp -> consumer.position(tp) } + partitionOffsets.foreach { + case (tp, off) if off != -1 && off != -2 => + if (result(tp) != off) { + reportDataLoss( + s"startingOffsets for $tp was $off but consumer reset to earliest ${result(tp)}") + } + case _ => + // no real way to check that beginning or end is reasonable + } + result } /** From 6c8d459f9795c6ff32e8bf78f8796869ca722ee3 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 18 Oct 2016 00:20:53 -0500 Subject: [PATCH 6/9] [SPARK-17813][SQL][KAFKA] maxOffsetsPerTrigger proportional implementation --- .../structured-streaming-kafka-integration.md | 6 ++ .../spark/sql/kafka010/KafkaSource.scala | 99 ++++++++++++++----- .../spark/sql/kafka010/KafkaSourceSuite.scala | 39 +++++++- 3 files changed, 117 insertions(+), 27 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index e851f210c92c4..a6c3b3a9024d8 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -221,6 +221,12 @@ The following configurations are optional: + + + + + +
Optionvaluedefaultmeaning
startingOffset["earliest", "latest"]"latest"The start point when a query is started, either "earliest" which is from the earliest offset, - or "latest" which is just from the latest offset. Note: This only applies when a new Streaming q - uery is started, and that resuming will always pick up from where the query left off.startingOffsetsearliest, latest, or json string + {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} + latestThe start point when a query is started, either "earliest" which is from the earliest offsets, + "latest" which is just from the latest offsets, or a json string specifying a starting offset for + each TopicPartition. In the json, -2 as an offset can be used refer to earliest, -1 to latest. + Note: This only applies when a new Streaming query is started, and that resuming will always pick + up from where the query left off. Newly discovered partitions during a query will start at + earliest.
failOnDataLoss[true, false]true or false true Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work @@ -215,10 +229,10 @@ Kafka's own configurations can be set via `DataStreamReader.option` with `kafka. Note that the following Kafka params cannot be set and the Kafka source will throw an exception: - **group.id**: Kafka source will create a unique group id for each query automatically. -- **auto.offset.reset**: Set the source option `startingOffset` to `earliest` or `latest` to specify +- **auto.offset.reset**: Set the source option `startingOffsets` to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when when new - topics/partitions are dynamically subscribed. Note that `startingOffset` only applies when a new + topics/partitions are dynamically subscribed. Note that `startingOffsets` only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off. - **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys. diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala index 913cd640eee06..fff57b484d4ac 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala @@ -17,15 +17,22 @@ package org.apache.spark.sql.kafka010 +import java.io.Writer + import scala.collection.mutable.{ ArrayBuffer, HashMap } import scala.util.control.NonFatal import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.node._ import org.apache.kafka.common.TopicPartition +/* + * Utilities for converting Kafka related objects to and from json. + */ private object JsonUtils { private val mapper = new ObjectMapper() + /* Read TopicPartitions from json string */ def partitions(str: String): Array[TopicPartition] = { try { val res = new ArrayBuffer[TopicPartition]() @@ -46,6 +53,21 @@ private object JsonUtils { } } + /* Write TopicPartitions as json */ + def partitions(partitions: Iterable[TopicPartition], writer: Writer): Unit = { + val root = mapper.createObjectNode() + partitions.foreach { tp => + var topic = root.get(tp.topic) + if (null == topic) { + root.set(tp.topic, mapper.createArrayNode()) + topic = root.get(tp.topic) + } + topic.asInstanceOf[ArrayNode].add(tp.partition) + } + mapper.writeValue(writer, root) + } + + /* Read per-TopicPartition offsets from json string */ def partitionOffsets(str: String): Map[TopicPartition, Long] = { try { val res = new HashMap[TopicPartition, Long] @@ -68,4 +90,18 @@ private object JsonUtils { s"""Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got $str""") } } + + /* Write per-TopicPartition offsets as json */ + def partitionOffsets(partitionOffsets: Map[TopicPartition, Long], writer: Writer): Unit = { + val root = mapper.createObjectNode() + partitionOffsets.foreach { case (tp, off) => + var topic = root.get(tp.topic) + if (null == topic) { + root.set(tp.topic, mapper.createObjectNode()) + topic = root.get(tp.topic) + } + topic.asInstanceOf[ObjectNode].set(tp.partition.toString, new LongNode(off)) + } + mapper.writeValue(writer, root) + } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index a4237feb5fd5f..cf6ccdb9f4182 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -255,7 +255,9 @@ private[kafka010] case class KafkaSource( case (tp, -2) => consumer.seekToBeginning(ju.Arrays.asList(tp)) case (tp, off) => consumer.seek(tp, off) } - partitionOffsets + partitionOffsets.map { + case (tp, _) => tp -> consumer.position(tp) + } } /** diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 33c42a6451fcc..585ced875caa7 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -177,6 +177,13 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider } val strategy = caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match { + case ("assign", value) => + if (!value.trim.startsWith("{")) { + throw new IllegalArgumentException( + "No topicpartitions to assign as specified value for option " + + s"'assign' is '$value'") + } + case ("subscribe", value) => val topics = value.split(",").map(_.trim).filter(_.nonEmpty) if (topics.isEmpty) { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 544400b35732c..acb1e4f76642b 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -18,10 +18,12 @@ package org.apache.spark.sql.kafka010 import java.util.concurrent.atomic.AtomicInteger +import java.io.StringWriter import scala.util.Random import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.TopicPartition import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.execution.streaming._ @@ -155,26 +157,52 @@ class KafkaSourceSuite extends KafkaSourceTest { ) } + test("assign from latest offsets") { + val topic = newTopic() + testFromLatestOffsets(topic, false, "assign" -> assignString(topic, 0 to 4)) + } + + test("assign from earliest offsets") { + val topic = newTopic() + testFromEarliestOffsets(topic, false, "assign" -> assignString(topic, 0 to 4)) + } + + test("assign from specific offsets") { + val topic = newTopic() + testFromSpecificOffsets(topic, "assign" -> assignString(topic, 0 to 4)) + } + test("subscribing topic by name from latest offsets") { val topic = newTopic() - testFromLatestOffsets(topic, "subscribe" -> topic) + testFromLatestOffsets(topic, true, "subscribe" -> topic) } test("subscribing topic by name from earliest offsets") { val topic = newTopic() - testFromEarliestOffsets(topic, "subscribe" -> topic) + testFromEarliestOffsets(topic, true, "subscribe" -> topic) + } + + test("subscribing topic by name from specific offsets") { + val topic = newTopic() + testFromSpecificOffsets(topic, "subscribe" -> topic) } test("subscribing topic by pattern from latest offsets") { val topicPrefix = newTopic() val topic = topicPrefix + "-suffix" - testFromLatestOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*") + testFromLatestOffsets(topic, true, "subscribePattern" -> s"$topicPrefix-.*") } test("subscribing topic by pattern from earliest offsets") { val topicPrefix = newTopic() val topic = topicPrefix + "-suffix" - testFromEarliestOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*") + testFromEarliestOffsets(topic, true, "subscribePattern" -> s"$topicPrefix-.*") + } + + test("subscribing topic by pattern from specific offsets") { + val topicPrefix = newTopic() + val topic = topicPrefix + "-suffix" + testFromSpecificOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*") } test("subscribing topic by pattern with topic deletions") { @@ -233,6 +261,10 @@ class KafkaSourceSuite extends KafkaSourceTest { testBadOptions("subscribe" -> "t", "subscribePattern" -> "t.*")( "only one", "options can be specified") + testBadOptions("subscribe" -> "t", "assign" -> """{"a":[0]}""")( + "only one", "options can be specified") + + testBadOptions("assign" -> "")("no topicpartitions to assign") testBadOptions("subscribe" -> "")("no topics to subscribe") testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty") } @@ -293,7 +325,67 @@ class KafkaSourceSuite extends KafkaSourceTest { private def newTopic(): String = s"topic-${topicId.getAndIncrement()}" - private def testFromLatestOffsets(topic: String, options: (String, String)*): Unit = { + private def assignString(topic: String, partitions: Iterable[Int]): String = { + val writer = new StringWriter + JsonUtils.partitions( + partitions.map(p => new TopicPartition(topic, p)), + writer) + writer.toString + } + + private def testFromSpecificOffsets(topic: String, options: (String, String)*): Unit = { + val writer = new StringWriter + val partitionOffsets = Map( + new TopicPartition(topic, 0) -> -2L, + new TopicPartition(topic, 1) -> -1L, + new TopicPartition(topic, 2) -> 0L, + new TopicPartition(topic, 3) -> 1L, + new TopicPartition(topic, 4) -> 2L + ) + JsonUtils.partitionOffsets(partitionOffsets, writer) + val startingOffsets = writer.toString + + testUtils.createTopic(topic, partitions = 5) + // part 0 starts at earliest, these should all be seen + testUtils.sendMessages(topic, Array(-20, -21, -22).map(_.toString), Some(0)) + // part 1 starts at latest, these should all be skipped + testUtils.sendMessages(topic, Array(-10, -11, -12).map(_.toString), Some(1)) + // part 2 starts at 0, these should all be seen + testUtils.sendMessages(topic, Array(0, 1, 2).map(_.toString), Some(2)) + // part 3 starts at 1, first should be skipped + testUtils.sendMessages(topic, Array(10, 11, 12).map(_.toString), Some(3)) + // part 4 starts at 2, first and second should be skipped + testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4)) + require(testUtils.getLatestOffsets(Set(topic)).size === 5) + + val reader = spark + .readStream + .format("kafka") + .option("startingOffsets", startingOffsets) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + options.foreach { case (k, v) => reader.option(k, v) } + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + testStream(mapped)( + makeSureGetOffsetCalled, + CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), + StopStream, + StartStream(), + CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), // Should get the data back on recovery + AddKafkaData(Set(topic), 30, 31, 32, 33, 34)(ensureDataInMultiplePartition=true), + CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22, 30, 31, 32, 33, 34), + StopStream + ) + } + + private def testFromLatestOffsets( + topic: String, + addPartitions: Boolean, + options: (String, String)*): Unit = { testUtils.createTopic(topic, partitions = 5) testUtils.sendMessages(topic, Array("-1")) require(testUtils.getLatestOffsets(Set(topic)).size === 5) @@ -324,7 +416,9 @@ class KafkaSourceSuite extends KafkaSourceTest { AddKafkaData(Set(topic), 7, 8), CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9), AssertOnQuery("Add partitions") { query: StreamExecution => - testUtils.addPartitions(topic, 10) + if (addPartitions) { + testUtils.addPartitions(topic, 10) + } true }, AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16), @@ -332,7 +426,10 @@ class KafkaSourceSuite extends KafkaSourceTest { ) } - private def testFromEarliestOffsets(topic: String, options: (String, String)*): Unit = { + private def testFromEarliestOffsets( + topic: String, + addPartitions: Boolean, + options: (String, String)*): Unit = { testUtils.createTopic(topic, partitions = 5) testUtils.sendMessages(topic, (1 to 3).map { _.toString }.toArray) require(testUtils.getLatestOffsets(Set(topic)).size === 5) @@ -360,7 +457,9 @@ class KafkaSourceSuite extends KafkaSourceTest { StartStream(), CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9), AssertOnQuery("Add partitions") { query: StreamExecution => - testUtils.addPartitions(topic, 10) + if (addPartitions) { + testUtils.addPartitions(topic, 10) + } true }, AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16), diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 3eb8a737ba4c8..1b89dad86c67d 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -201,11 +201,23 @@ class KafkaTestUtils extends Logging { /** Send the array of messages to the Kafka broker */ def sendMessages(topic: String, messages: Array[String]): Seq[(String, RecordMetadata)] = { + sendMessages(topic, messages, None) + } + + /** Send the array of messages to the Kafka broker using specified partition */ + def sendMessages( + topic: String, + messages: Array[String], + partition: Option[Int]): Seq[(String, RecordMetadata)] = { producer = new KafkaProducer[String, String](producerConfiguration) val offsets = try { messages.map { m => + val record = partition match { + case Some(p) => new ProducerRecord[String, String](topic, p, null, m) + case None => new ProducerRecord[String, String](topic, m) + } val metadata = - producer.send(new ProducerRecord[String, String](topic, m)).get(10, TimeUnit.SECONDS) + producer.send(record).get(10, TimeUnit.SECONDS) logInfo(s"\tSent $m to partition ${metadata.partition}, offset ${metadata.offset}") (m, metadata) } From 2e53e5a3904305cb1d1b0f2325e31c9c434d16ec Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Sat, 15 Oct 2016 22:16:11 -0500 Subject: [PATCH 4/9] [SPARK-17812][SQL][KAFKA] style fixes --- docs/structured-streaming-kafka-integration.md | 2 +- .../apache/spark/sql/kafka010/JsonUtils.scala | 18 +++++++++++++----- .../spark/sql/kafka010/KafkaSourceSuite.scala | 4 ++-- .../spark/sql/kafka010/KafkaTestUtils.scala | 2 +- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index c9402173a0e75..e851f210c92c4 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -190,7 +190,7 @@ The following configurations are optional: latest The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for - each TopicPartition. In the json, -2 as an offset can be used refer to earliest, -1 to latest. + each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: This only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.10 milliseconds to wait before retrying to fetch Kafka offsets
maxOffsetsPerTriggerlongnoneRate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.
Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index f5c42947fccf9..0758e2b77929d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -96,6 +96,9 @@ private[kafka010] case class KafkaSource( private val offsetFetchAttemptIntervalMs = sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong + private val maxOffsetsPerTrigger = + sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + /** * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the * offsets and never commits them. @@ -121,6 +124,8 @@ private[kafka010] case class KafkaSource( }.partitionToOffsets } + private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None + override def schema: StructType = KafkaSource.kafkaSchema /** Returns the maximum available offset for this source. */ @@ -128,9 +133,52 @@ private[kafka010] case class KafkaSource( // Make sure initialPartitionOffsets is initialized initialPartitionOffsets - val offset = KafkaSourceOffset(fetchLatestOffsets()) - logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}") - Some(offset) + val latest = fetchLatestOffsets() + val offsets = maxOffsetsPerTrigger match { + case None => + latest + case Some(limit) if !currentPartitionOffsets.isDefined => + rateLimit(limit, initialPartitionOffsets, latest) + case Some(limit) => + rateLimit(limit, currentPartitionOffsets.get, latest) + } + + currentPartitionOffsets = Some(offsets) + logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}") + Some(KafkaSourceOffset(offsets)) + } + + /** Proportionally distribute limit number of offsets among topicpartitions */ + private def rateLimit( + limit: Long, + from: Map[TopicPartition, Long], + until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { + val fromNew = fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq) + val sizes = until.flatMap { case (tp, end) => + // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it + from.get(tp).orElse(fromNew.get(tp)).flatMap { begin => + val size = end - begin + logDebug(s"rateLimit $tp size is $size") + if (size > 0) Some(tp -> size) else None + } + } + val total = sizes.values.sum.toDouble + if (total < 1) { + until + } else { + until.map { case (tp, end) => + tp -> sizes.get(tp).map { size => + val begin = from.get(tp).getOrElse(fromNew(tp)) + val prorate = limit * (size / total) + logDebug(s"rateLimit $tp prorated amount is $prorate") + // Don't completely starve small topicpartitions + val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong + logDebug(s"rateLimit $tp new offset is $off") + // Paranoia, make sure not to return an offset that's past end + Math.min(end, off) + }.getOrElse(end) + } + } } /** @@ -153,11 +201,7 @@ private[kafka010] case class KafkaSource( // Find the new partitions, and get their earliest offsets val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet) - val newPartitionOffsets = if (newPartitions.nonEmpty) { - fetchNewPartitionEarliestOffsets(newPartitions.toSeq) - } else { - Map.empty[TopicPartition, Long] - } + val newPartitionOffsets = fetchNewPartitionEarliestOffsets(newPartitions.toSeq) if (newPartitionOffsets.keySet != newPartitions) { // We cannot get from offsets for some partitions. It means they got deleted. val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet) @@ -304,23 +348,28 @@ private[kafka010] case class KafkaSource( * some partitions if they are deleted. */ private def fetchNewPartitionEarliestOffsets( - newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = withRetriesWithoutInterrupt { - // Poll to get the latest assigned partitions - consumer.poll(0) - val partitions = consumer.assignment() - consumer.pause(partitions) - logDebug(s"\tPartitions assigned to consumer: $partitions") - - // Get the earliest offset of each partition - consumer.seekToBeginning(partitions) - val partitionOffsets = newPartitions.filter { p => - // When deleting topics happen at the same time, some partitions may not be in `partitions`. - // So we need to ignore them - partitions.contains(p) - }.map(p => p -> consumer.position(p)).toMap - logDebug(s"Got earliest offsets for new partitions: $partitionOffsets") - partitionOffsets - } + newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = + if (newPartitions.isEmpty) { + Map.empty[TopicPartition, Long] + } else { + withRetriesWithoutInterrupt { + // Poll to get the latest assigned partitions + consumer.poll(0) + val partitions = consumer.assignment() + consumer.pause(partitions) + logDebug(s"\tPartitions assigned to consumer: $partitions") + + // Get the earliest offset of each partition + consumer.seekToBeginning(partitions) + val partitionOffsets = newPartitions.filter { p => + // When deleting topics happen at the same time, some partitions may not be in + // `partitions`. So we need to ignore them + partitions.contains(p) + }.map(p => p -> consumer.position(p)).toMap + logDebug(s"Got earliest offsets for new partitions: $partitionOffsets") + partitionOffsets + } + } /** * Helper function that does multiple retries on the a body of code that returns offsets. diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index e2467d88a8f32..3eaaa5eeafc35 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -27,9 +27,9 @@ import org.apache.kafka.common.TopicPartition import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.streaming.{ ProcessingTime, StreamTest } import org.apache.spark.sql.test.SharedSQLContext - +import org.apache.spark.util.ManualClock abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { @@ -134,6 +134,41 @@ class KafkaSourceSuite extends KafkaSourceTest { private val topicId = new AtomicInteger(0) + test("maxOffsetsPerTrigger") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 3) + testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0)) + testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1)) + testUtils.sendMessages(topic, Array("1"), Some(2)) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("maxOffsetsPerTrigger", 10) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + val clock = new ManualClock + testStream(mapped)( + StartStream(ProcessingTime(100), clock), + AdvanceManualClock(100), + // 1 from smallest, 1 from middle, 8 from biggest + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107), + AdvanceManualClock(100), + // smallest now empty, 1 more from middle, 9 more from biggest + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 110, 111, 112, 113, 114, 115, 116 + ), + StopStream + ) + } + test("cannot stop Kafka stream") { val topic = newTopic() testUtils.createTopic(newTopic(), partitions = 5) From fde4e33f71572d15801ed1deafe996e866e49abd Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Mon, 24 Oct 2016 12:24:20 -0500 Subject: [PATCH 7/9] [SPARK-17813][SQL][KAFKA] fix test for upstream changes to StreamManualClock --- .../scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index a42d0112d8252..5896454a620d5 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -28,7 +28,6 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.streaming.{ ProcessingTime, StreamTest } import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.util.ManualClock abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { @@ -153,7 +152,7 @@ class KafkaSourceSuite extends KafkaSourceTest { .as[(String, String)] val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) - val clock = new ManualClock + val clock = new StreamManualClock testStream(mapped)( StartStream(ProcessingTime(100), clock), AdvanceManualClock(100), From 6a7ff24dbec5e195d2664bc8a86a70762de5554b Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Wed, 26 Oct 2016 22:28:04 -0500 Subject: [PATCH 8/9] [SPARK-17813][SQL][KAFKA] set current offsets on recovery --- .../spark/sql/kafka010/KafkaSource.scala | 8 +++- .../spark/sql/kafka010/KafkaSourceSuite.scala | 37 ++++++++++++++++++- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index c12ca866b4e24..15c34ffa935cf 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -137,7 +137,7 @@ private[kafka010] case class KafkaSource( val offsets = maxOffsetsPerTrigger match { case None => latest - case Some(limit) if !currentPartitionOffsets.isDefined => + case Some(limit) if currentPartitionOffsets.isEmpty => rateLimit(limit, initialPartitionOffsets, latest) case Some(limit) => rateLimit(limit, currentPartitionOffsets.get, latest) @@ -265,6 +265,12 @@ private[kafka010] case class KafkaSource( logInfo("GetBatch generating RDD of offset range: " + offsetRanges.sortBy(_.topicPartition.toString).mkString(", ")) + + // On recovery, getBatch will get called before getOffset + if (currentPartitionOffsets.isEmpty) { + currentPartitionOffsets = Some(untilPartitionOffsets) + } + sqlContext.createDataFrame(rdd, schema) } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 5896454a620d5..ed4cc75920e8e 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -23,6 +23,8 @@ import scala.util.Random import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.TopicPartition +import org.scalatest.concurrent.Eventually._ +import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.execution.streaming._ @@ -153,17 +155,48 @@ class KafkaSourceSuite extends KafkaSourceTest { val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) val clock = new StreamManualClock + + val waitUntilBatchProcessed = AssertOnQuery { q => + eventually(Timeout(streamingTimeout)) { + if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + true + } + testStream(mapped)( StartStream(ProcessingTime(100), clock), - AdvanceManualClock(100), + waitUntilBatchProcessed, // 1 from smallest, 1 from middle, 8 from biggest CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107), AdvanceManualClock(100), + waitUntilBatchProcessed, // smallest now empty, 1 more from middle, 9 more from biggest CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, 11, 108, 109, 110, 111, 112, 113, 114, 115, 116 ), - StopStream + StopStream, + StartStream(ProcessingTime(100), clock), + waitUntilBatchProcessed, + AdvanceManualClock(100), + waitUntilBatchProcessed, + // smallest now empty, 1 more from middle, 9 more from biggest + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, + 12, 117, 118, 119, 120, 121, 122, 123, 124, 125 + ), + AdvanceManualClock(100), + waitUntilBatchProcessed, + // smallest now empty, 1 more from middle, 9 more from biggest + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, + 12, 117, 118, 119, 120, 121, 122, 123, 124, 125, + 13, 126, 127, 128, 129, 130, 131, 132, 133, 134 + ) ) } From 5e4b468111ec20f11fc352de4075637f12e3a499 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Thu, 27 Oct 2016 09:16:02 -0500 Subject: [PATCH 9/9] [SPARK-17813][SQL][KAFKA] spacing --- .../scala/org/apache/spark/sql/kafka010/KafkaSource.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 15c34ffa935cf..61cba737d148a 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -154,7 +154,8 @@ private[kafka010] case class KafkaSource( from: Map[TopicPartition, Long], until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { val fromNew = fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq) - val sizes = until.flatMap { case (tp, end) => + val sizes = until.flatMap { + case (tp, end) => // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it from.get(tp).orElse(fromNew.get(tp)).flatMap { begin => val size = end - begin @@ -166,7 +167,8 @@ private[kafka010] case class KafkaSource( if (total < 1) { until } else { - until.map { case (tp, end) => + until.map { + case (tp, end) => tp -> sizes.get(tp).map { size => val begin = from.get(tp).getOrElse(fromNew(tp)) val prorate = limit * (size / total)