From fccfee1b59392b2f11e529a65c8272f2c24d58e6 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 12 Dec 2018 11:32:41 -0800 Subject: [PATCH 1/5] allow to override group id --- .../structured-streaming-kafka-integration.md | 25 ++++++++++++++--- .../kafka010/KafkaContinuousReadSupport.scala | 18 +++++++++++-- .../kafka010/KafkaMicroBatchReadSupport.scala | 17 +++++++++++- .../sql/kafka010/KafkaOffsetReader.scala | 6 +++-- .../spark/sql/kafka010/KafkaSource.scala | 16 ++++++++++- .../sql/kafka010/KafkaSourceProvider.scala | 21 +++++++++++---- .../kafka010/KafkaMicroBatchSourceSuite.scala | 27 +++++++++++++++++++ .../sql/kafka010/KafkaRelationSuite.scala | 12 +++++++++ 8 files changed, 128 insertions(+), 14 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 7040f8da2c614..b7a2040778215 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -379,7 +379,25 @@ The following configurations are optional: string spark-kafka-source streaming and batch - Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming queries + Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming + queries. If "kafka.group.id" is set, this option will be ignored. + + + kafka.group.id + string + none + streaming and batch + The Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution. + By default, each query generates a unique group id for reading data. This ensures that each Kafka + source has its own consumer group that does not face interference from any other consumer, and + therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, + Kafka group-based authorization), you may want to use a specific authorized group id to read data. + You can optionally set the group ID. However, do this with extreme caution as it can cause + unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the + same group id are likely interfere with each other causing each query to read only part of the + data. This may also occur when queries are started/restarted in quick succession. To minimize such + issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to + be very small. When this is set, option "groupIdPrefix" will be ignored. @@ -592,8 +610,9 @@ for parameters related to writing data. Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception: - **group.id**: Kafka source will create a unique group id for each query automatically. The user can -set the prefix of the automatically generated group.id's via the optional source option `groupIdPrefix`, default value -is "spark-kafka-source". +set the prefix of the automatically generated group.id's via the optional source option `groupIdPrefix`, +default value is "spark-kafka-source". You can also set "kafka.group.id" to force Spark to use a special +group id, however, please read warnings for this option and use it with caution. - **auto.offset.reset**: Set the source option `startingOffsets` to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala index 1753a28fba2fb..8c10515c06fd5 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} import java.util.concurrent.TimeoutException -import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException} +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, OffsetOutOfRangeException} import org.apache.kafka.common.TopicPartition import org.apache.spark.TaskContext @@ -167,7 +167,21 @@ class KafkaContinuousScanConfigBuilder( val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet) if (deletedPartitions.nonEmpty) { - reportDataLoss(s"Some partitions were deleted: $deletedPartitions") + val message = if ( + offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + s"$deletedPartitions are gone. Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' " + + "has been set on this query, it is not recommended to set this option. This option is " + + "unsafe to use since multiple concurrent queries or sources using the same group id " + + "will interfere with each other as they are part of the same consumer group. Restarted " + + "queries may also suffer interference from the previous run having the same group id. " + + "The user should have only one query per group id, and/or set " + + "'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the " + + "previous query are marked dead by the Kafka group coordinator before the restarted " + + "query starts running." + } else { + s"$deletedPartitions are gone. Some data may have been missed" + } + reportDataLoss(message) } val startOffsets = newPartitionOffsets ++ diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala index bb4de674c3c72..9722e82e83e03 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala @@ -22,6 +22,7 @@ import java.io._ import java.nio.charset.StandardCharsets import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging @@ -122,7 +123,21 @@ private[kafka010] class KafkaMicroBatchReadSupport( // Find deleted partitions, and report data loss if required val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet) if (deletedPartitions.nonEmpty) { - reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed") + val message = + if (kafkaOffsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + s"$deletedPartitions are gone. Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' " + + "has been set on this query, it is not recommended to set this option. This option is " + + "unsafe to use since multiple concurrent queries or sources using the same group id " + + "will interfere with each other as they are part of the same consumer group. Restarted " + + "queries may also suffer interference from the previous run having the same group id. " + + "The user should have only one query per group id, and/or set " + + "'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the " + + "previous query are marked dead by the Kafka group coordinator before the restarted " + + "query starts running." + } else { + s"$deletedPartitions are gone. Some data may have been missed" + } + reportDataLoss(message) } // Use the end partitions to calculate offset ranges to ignore partitions that have diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 82066697cb95a..88b7ef3cd27d5 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -45,7 +45,7 @@ import org.apache.spark.util.{ThreadUtils, UninterruptibleThread} */ private[kafka010] class KafkaOffsetReader( consumerStrategy: ConsumerStrategy, - driverKafkaParams: ju.Map[String, Object], + val driverKafkaParams: ju.Map[String, Object], readerOptions: Map[String, String], driverGroupIdPrefix: String) extends Logging { /** @@ -81,7 +81,9 @@ private[kafka010] class KafkaOffsetReader( assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) if (_consumer == null) { val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams) - newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId()) + if (driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) == null) { + newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId()) + } _consumer = consumerStrategy.createConsumer(newKafkaParams) } _consumer diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 66ec7e0cd084a..5b14b3064b31a 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -22,6 +22,7 @@ import java.io._ import java.nio.charset.StandardCharsets import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkContext @@ -241,7 +242,20 @@ private[kafka010] class KafkaSource( val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet) if (deletedPartitions.nonEmpty) { - reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed") + val message = if (kafkaReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + s"$deletedPartitions are gone. Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' " + + "has been set on this query, it is not recommended to set this option. This option is " + + "unsafe to use since multiple concurrent queries or sources using the same group id " + + "will interfere with each other as they are part of the same consumer group. Restarted " + + "queries may also suffer interference from the previous run having the same group id. " + + "The user should have only one query per group id, and/or set " + + "'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the " + + "previous query are marked dead by the Kafka group coordinator before the restarted " + + "query starts running." + } else { + s"$deletedPartitions are gone. Some data may have been missed" + } + reportDataLoss(message) } // Use the until partitions to calculate offset ranges to ignore partitions that have diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 0ac330435e5c5..ec2252bba076c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -340,9 +340,19 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister // Validate user-specified Kafka options if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.GROUP_ID_CONFIG}")) { - throw new IllegalArgumentException( - s"Kafka option '${ConsumerConfig.GROUP_ID_CONFIG}' is not supported as " + - s"user-specified consumer groups are not used to track offsets.") + logWarning( + s"It is not recommended to set Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}'. " + + "This option is unsafe to use since multiple concurrent queries or sources using the " + + "same group id will interfere with each other as they are part of the same consumer " + + "group. Restarted queries may also suffer interference from the previous run having the " + + "same group id. The user should have only one query per group id, and/or set " + + "'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the " + + "previous query are marked dead by the Kafka group coordinator before the restarted " + + "query starts running.") + if (caseInsensitiveParams.contains(GROUP_ID_PREFIX)) { + logWarning("Option 'groupIdPrefix' will be ignored as " + + s"option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' has been set.") + } } if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}")) { @@ -445,6 +455,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets" private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss" private val MIN_PARTITIONS_OPTION_KEY = "minpartitions" + private val GROUP_ID_PREFIX = "groupidprefix" val TOPIC_OPTION_KEY = "topic" @@ -515,7 +526,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") // So that consumers in executors do not mess with any existing group id - .set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor") + .setIfUnset(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor") // So that consumers in executors does not commit offsets unnecessarily .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") @@ -534,7 +545,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { parameters: Map[String, String], metadataPath: String): String = { val groupIdPrefix = parameters - .getOrElse("groupIdPrefix", "spark-kafka-source") + .getOrElse(GROUP_ID_PREFIX, "spark-kafka-source") s"${groupIdPrefix}-${UUID.randomUUID}-${metadataPath.hashCode}" } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 5ee76990b54f4..59e0a40b0bc74 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -581,6 +581,33 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { ) } + test("allow group.id override") { + // Tests code path KafkaSourceProvider.{sourceSchema(.), createSource(.)} + // as well as KafkaOffsetReader.createConsumer(.) + val topic = newTopic() + testUtils.createTopic(topic, partitions = 3) + testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray, Some(0)) + testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1)) + testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2)) + + val dsKafka = spark + .readStream + .format("kafka") + .option("kafka.group.id", "id-" + Random.nextInt()) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + .load() + .selectExpr("CAST(value AS STRING)") + .as[String] + .map(_.toInt) + + testStream(dsKafka)( + makeSureGetOffsetCalled, + CheckAnswer(1 to 30: _*) + ) + } + test("ensure stream-stream self-join generates only one offset in log and correct metrics") { val topic = newTopic() testUtils.createTopic(topic, partitions = 2) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index 8cfca56433f5d..efe7385ed16bc 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -239,6 +239,18 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty") } + test("allow group.id overriding") { + // Tests code path KafkaSourceProvider.createRelation(.) + val topic = newTopic() + testUtils.createTopic(topic, partitions = 3) + testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray, Some(0)) + testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1)) + testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2)) + + val df = createDF(topic, withOptions = Map("kafka.group.id" -> "custom")) + checkAnswer(df, (1 to 30).map(_.toString).toDF()) + } + test("read Kafka transactional messages: read_committed") { val topic = newTopic() testUtils.createTopic(topic) From bd1c0b8aa4b7206922ac602b0e21af883f9d27a2 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 12 Dec 2018 15:14:52 -0800 Subject: [PATCH 2/5] fix test --- .../apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 59e0a40b0bc74..4d53d9c2c2600 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1212,7 +1212,6 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("not supported")) } - testUnsupportedConfig("kafka.group.id") testUnsupportedConfig("kafka.auto.offset.reset") testUnsupportedConfig("kafka.enable.auto.commit") testUnsupportedConfig("kafka.interceptor.classes") From c279fd513c46babef1e71d55c64795f2feebd16b Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 13 Dec 2018 15:51:18 -0800 Subject: [PATCH 3/5] address --- .../kafka010/KafkaContinuousReadSupport.scala | 10 +-------- .../kafka010/KafkaMicroBatchReadSupport.scala | 10 +-------- .../spark/sql/kafka010/KafkaSource.scala | 10 +-------- .../sql/kafka010/KafkaSourceProvider.scala | 21 ++++++++++--------- 4 files changed, 14 insertions(+), 37 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala index 8c10515c06fd5..fc93595924bc5 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala @@ -169,15 +169,7 @@ class KafkaContinuousScanConfigBuilder( if (deletedPartitions.nonEmpty) { val message = if ( offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - s"$deletedPartitions are gone. Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' " + - "has been set on this query, it is not recommended to set this option. This option is " + - "unsafe to use since multiple concurrent queries or sources using the same group id " + - "will interfere with each other as they are part of the same consumer group. Restarted " + - "queries may also suffer interference from the previous run having the same group id. " + - "The user should have only one query per group id, and/or set " + - "'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the " + - "previous query are marked dead by the Kafka group coordinator before the restarted " + - "query starts running." + s"$deletedPartitions are gone. " + KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE } else { s"$deletedPartitions are gone. Some data may have been missed" } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala index 9722e82e83e03..1f31adee4c9a9 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala @@ -125,15 +125,7 @@ private[kafka010] class KafkaMicroBatchReadSupport( if (deletedPartitions.nonEmpty) { val message = if (kafkaOffsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - s"$deletedPartitions are gone. Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' " + - "has been set on this query, it is not recommended to set this option. This option is " + - "unsafe to use since multiple concurrent queries or sources using the same group id " + - "will interfere with each other as they are part of the same consumer group. Restarted " + - "queries may also suffer interference from the previous run having the same group id. " + - "The user should have only one query per group id, and/or set " + - "'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the " + - "previous query are marked dead by the Kafka group coordinator before the restarted " + - "query starts running." + s"$deletedPartitions are gone. " + KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE } else { s"$deletedPartitions are gone. Some data may have been missed" } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 5b14b3064b31a..5965787518575 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -243,15 +243,7 @@ private[kafka010] class KafkaSource( val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet) if (deletedPartitions.nonEmpty) { val message = if (kafkaReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - s"$deletedPartitions are gone. Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' " + - "has been set on this query, it is not recommended to set this option. This option is " + - "unsafe to use since multiple concurrent queries or sources using the same group id " + - "will interfere with each other as they are part of the same consumer group. Restarted " + - "queries may also suffer interference from the previous run having the same group id. " + - "The user should have only one query per group id, and/or set " + - "'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the " + - "previous query are marked dead by the Kafka group coordinator before the restarted " + - "query starts running." + s"$deletedPartitions are gone. " + KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE } else { s"$deletedPartitions are gone. Some data may have been missed" } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index ec2252bba076c..90d9ba8a840d2 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -340,15 +340,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister // Validate user-specified Kafka options if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.GROUP_ID_CONFIG}")) { - logWarning( - s"It is not recommended to set Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}'. " + - "This option is unsafe to use since multiple concurrent queries or sources using the " + - "same group id will interfere with each other as they are part of the same consumer " + - "group. Restarted queries may also suffer interference from the previous run having the " + - "same group id. The user should have only one query per group id, and/or set " + - "'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the " + - "previous query are marked dead by the Kafka group coordinator before the restarted " + - "query starts running.") + logWarning(CUSTOM_GROUP_ID_ERROR_MESSAGE) if (caseInsensitiveParams.contains(GROUP_ID_PREFIX)) { logWarning("Option 'groupIdPrefix' will be ignored as " + s"option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' has been set.") @@ -475,7 +467,16 @@ private[kafka010] object KafkaSourceProvider extends Logging { | source option "failOnDataLoss" to "false". """.stripMargin - + val CUSTOM_GROUP_ID_ERROR_MESSAGE = + s"""Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' has been set on this query, it is + | not recommended to set this option. This option is unsafe to use since multiple concurrent + | queries or sources using the same group id will interfere with each other as they are part + | of the same consumer group. Restarted queries may also suffer interference from the + | previous run having the same group id. The user should have only one query per group id, + | and/or set the option 'kafka.session.timeout.ms' to be very small so that the Kafka + | consumers from the previous query are marked dead by the Kafka group coordinator before the + | restarted query starts running. + """.stripMargin private val serClassName = classOf[ByteArraySerializer].getName private val deserClassName = classOf[ByteArrayDeserializer].getName From d9d3065b1ff09ee8a0a7affa447f37a04f15a982 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 17 Dec 2018 10:58:49 -0800 Subject: [PATCH 4/5] address --- .../apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala | 2 +- .../apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala | 2 +- .../main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala index fc93595924bc5..a2b4332d2691f 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala @@ -169,7 +169,7 @@ class KafkaContinuousScanConfigBuilder( if (deletedPartitions.nonEmpty) { val message = if ( offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - s"$deletedPartitions are gone. " + KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE + s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}" } else { s"$deletedPartitions are gone. Some data may have been missed" } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala index 1f31adee4c9a9..18cf15a60535b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala @@ -125,7 +125,7 @@ private[kafka010] class KafkaMicroBatchReadSupport( if (deletedPartitions.nonEmpty) { val message = if (kafkaOffsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - s"$deletedPartitions are gone. " + KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE + s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}" } else { s"$deletedPartitions are gone. Some data may have been missed" } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 5965787518575..231b2c736f176 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -243,7 +243,7 @@ private[kafka010] class KafkaSource( val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet) if (deletedPartitions.nonEmpty) { val message = if (kafkaReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - s"$deletedPartitions are gone. " + KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE + s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}" } else { s"$deletedPartitions are gone. Some data may have been missed" } From 0e2ca251583a4ef497a564487f0007bf4710a9ac Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 11 Jan 2019 10:51:56 -0800 Subject: [PATCH 5/5] Address TD's comment --- docs/structured-streaming-kafka-integration.md | 2 +- .../apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala | 2 +- .../apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala | 2 +- .../main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index b7a2040778215..88a772610578d 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -392,7 +392,7 @@ The following configurations are optional: source has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use a specific authorized group id to read data. - You can optionally set the group ID. However, do this with extreme caution as it can cause + You can optionally set the group id. However, do this with extreme caution as it can cause unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the same group id are likely interfere with each other causing each query to read only part of the data. This may also occur when queries are started/restarted in quick succession. To minimize such diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala index a2b4332d2691f..e2f476bd81eb8 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala @@ -171,7 +171,7 @@ class KafkaContinuousScanConfigBuilder( offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}" } else { - s"$deletedPartitions are gone. Some data may have been missed" + s"$deletedPartitions are gone. Some data may have been missed." } reportDataLoss(message) } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala index 18cf15a60535b..a5b5690e5be15 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala @@ -127,7 +127,7 @@ private[kafka010] class KafkaMicroBatchReadSupport( if (kafkaOffsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}" } else { - s"$deletedPartitions are gone. Some data may have been missed" + s"$deletedPartitions are gone. Some data may have been missed." } reportDataLoss(message) } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 231b2c736f176..c1731ea2ee1fd 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -245,7 +245,7 @@ private[kafka010] class KafkaSource( val message = if (kafkaReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}" } else { - s"$deletedPartitions are gone. Some data may have been missed" + s"$deletedPartitions are gone. Some data may have been missed." } reportDataLoss(message) }