Skip to content

Commit c279fd5

Browse files
committed
address
1 parent bd1c0b8 commit c279fd5

File tree

4 files changed

+14
-37
lines changed

4 files changed

+14
-37
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -169,15 +169,7 @@ class KafkaContinuousScanConfigBuilder(
169169
if (deletedPartitions.nonEmpty) {
170170
val message = if (
171171
offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
172-
s"$deletedPartitions are gone. Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' " +
173-
"has been set on this query, it is not recommended to set this option. This option is " +
174-
"unsafe to use since multiple concurrent queries or sources using the same group id " +
175-
"will interfere with each other as they are part of the same consumer group. Restarted " +
176-
"queries may also suffer interference from the previous run having the same group id. " +
177-
"The user should have only one query per group id, and/or set " +
178-
"'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the " +
179-
"previous query are marked dead by the Kafka group coordinator before the restarted " +
180-
"query starts running."
172+
s"$deletedPartitions are gone. " + KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE
181173
} else {
182174
s"$deletedPartitions are gone. Some data may have been missed"
183175
}

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -125,15 +125,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
125125
if (deletedPartitions.nonEmpty) {
126126
val message =
127127
if (kafkaOffsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
128-
s"$deletedPartitions are gone. Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' " +
129-
"has been set on this query, it is not recommended to set this option. This option is " +
130-
"unsafe to use since multiple concurrent queries or sources using the same group id " +
131-
"will interfere with each other as they are part of the same consumer group. Restarted " +
132-
"queries may also suffer interference from the previous run having the same group id. " +
133-
"The user should have only one query per group id, and/or set " +
134-
"'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the " +
135-
"previous query are marked dead by the Kafka group coordinator before the restarted " +
136-
"query starts running."
128+
s"$deletedPartitions are gone. " + KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE
137129
} else {
138130
s"$deletedPartitions are gone. Some data may have been missed"
139131
}

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -243,15 +243,7 @@ private[kafka010] class KafkaSource(
243243
val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
244244
if (deletedPartitions.nonEmpty) {
245245
val message = if (kafkaReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
246-
s"$deletedPartitions are gone. Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' " +
247-
"has been set on this query, it is not recommended to set this option. This option is " +
248-
"unsafe to use since multiple concurrent queries or sources using the same group id " +
249-
"will interfere with each other as they are part of the same consumer group. Restarted " +
250-
"queries may also suffer interference from the previous run having the same group id. " +
251-
"The user should have only one query per group id, and/or set " +
252-
"'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the " +
253-
"previous query are marked dead by the Kafka group coordinator before the restarted " +
254-
"query starts running."
246+
s"$deletedPartitions are gone. " + KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE
255247
} else {
256248
s"$deletedPartitions are gone. Some data may have been missed"
257249
}

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -340,15 +340,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
340340
// Validate user-specified Kafka options
341341

342342
if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.GROUP_ID_CONFIG}")) {
343-
logWarning(
344-
s"It is not recommended to set Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}'. " +
345-
"This option is unsafe to use since multiple concurrent queries or sources using the " +
346-
"same group id will interfere with each other as they are part of the same consumer " +
347-
"group. Restarted queries may also suffer interference from the previous run having the " +
348-
"same group id. The user should have only one query per group id, and/or set " +
349-
"'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the " +
350-
"previous query are marked dead by the Kafka group coordinator before the restarted " +
351-
"query starts running.")
343+
logWarning(CUSTOM_GROUP_ID_ERROR_MESSAGE)
352344
if (caseInsensitiveParams.contains(GROUP_ID_PREFIX)) {
353345
logWarning("Option 'groupIdPrefix' will be ignored as " +
354346
s"option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' has been set.")
@@ -475,7 +467,16 @@ private[kafka010] object KafkaSourceProvider extends Logging {
475467
| source option "failOnDataLoss" to "false".
476468
""".stripMargin
477469

478-
470+
val CUSTOM_GROUP_ID_ERROR_MESSAGE =
471+
s"""Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' has been set on this query, it is
472+
| not recommended to set this option. This option is unsafe to use since multiple concurrent
473+
| queries or sources using the same group id will interfere with each other as they are part
474+
| of the same consumer group. Restarted queries may also suffer interference from the
475+
| previous run having the same group id. The user should have only one query per group id,
476+
| and/or set the option 'kafka.session.timeout.ms' to be very small so that the Kafka
477+
| consumers from the previous query are marked dead by the Kafka group coordinator before the
478+
| restarted query starts running.
479+
""".stripMargin
479480

480481
private val serClassName = classOf[ByteArraySerializer].getName
481482
private val deserClassName = classOf[ByteArrayDeserializer].getName

0 commit comments

Comments
 (0)