Skip to content

Commit 603f7e0

Browse files
vishchenkomaprEgor Krivokon
authored andcommitted
MapR [SPARK-795] - extended logging for Spark Streaming and Structured Str… (apache#729)
1 parent bca8d03 commit 603f7e0

File tree

5 files changed

+54
-27
lines changed

5 files changed

+54
-27
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import org.apache.kafka.clients.admin.Admin
2626
import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer}
2727
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
2828
import org.apache.kafka.common.TopicPartition
29-
3029
import org.apache.spark.internal.Logging
30+
3131
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaRedactionUtil}
3232

3333
/**
@@ -106,6 +106,7 @@ private[kafka010] case class SubscribeStrategy(topics: Seq[String])
106106
val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
107107
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](updatedKafkaParams)
108108
consumer.subscribe(topics.asJava)
109+
logDebug(s"The consumer has been subscribed to topics: ${topics.mkString(", ")}")
109110
consumer
110111
}
111112

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,11 @@ private case class Subscribe[K, V](
9191
val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
9292
val consumer = new KafkaConsumer[K, V](updatedKafkaParams)
9393
consumer.subscribe(topics)
94-
val toSeek = if (currentOffsets.isEmpty) {
95-
offsets
96-
} else {
97-
currentOffsets
98-
}
94+
logDebug(s"The consumer has been subscribed to topics: ${String.join(", ", topics)}")
95+
96+
val toSeek = if (currentOffsets.isEmpty) offsets else currentOffsets
97+
logDebug(s"TopicPartition-Offset map for seeking: $toSeek")
98+
9999
if (!toSeek.isEmpty) {
100100
// work around KAFKA-3370 when reset is none
101101
// poll will throw if no position, i.e. auto offset reset none and no explicit position
@@ -106,6 +106,7 @@ private case class Subscribe[K, V](
106106
aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
107107
try {
108108
consumer.poll(0)
109+
logDebug("Data fetched for the topics.")
109110

110111
if (KafkaUtils.isStreams(toSeek.asScala.toMap.map(a => (a._1, a._2.toLong)))) {
111112
KafkaUtils.waitForConsumerAssignment(consumer, toSeek.keySet())
@@ -115,7 +116,10 @@ private case class Subscribe[K, V](
115116
logWarning("Catching NoOffsetForPartitionException since " +
116117
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370")
117118
}
118-
toSeek.asScala.foreach { case (topicPartition, offset) =>
119+
120+
toSeek.asScala.foreach {
121+
case (topicPartition, offset) =>
122+
logDebug(s"Seeking for topicPartition=$topicPartition with offset=$offset")
119123
consumer.seek(topicPartition, offset)
120124
}
121125
// we've called poll, we must pause or next poll may consume messages and set position
@@ -152,11 +156,11 @@ private case class SubscribePattern[K, V](
152156
val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
153157
val consumer = new KafkaConsumer[K, V](updatedKafkaParams)
154158
consumer.subscribe(pattern, new NoOpConsumerRebalanceListener())
155-
val toSeek = if (currentOffsets.isEmpty) {
156-
offsets
157-
} else {
158-
currentOffsets
159-
}
159+
logDebug(s"The consumer has been subscribed to topics matching a pattern: $pattern")
160+
161+
val toSeek = if (currentOffsets.isEmpty) offsets else currentOffsets
162+
logDebug(s"TopicPartition-Offset map for seeking: $toSeek")
163+
160164
if (!toSeek.isEmpty) {
161165
// work around KAFKA-3370 when reset is none, see explanation in Subscribe above
162166
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
@@ -167,13 +171,17 @@ private case class SubscribePattern[K, V](
167171
KafkaUtils.waitForConsumerAssignment(consumer, toSeek.keySet())
168172
} else {
169173
consumer.poll(0)
174+
logDebug("Data fetched for the topics.")
170175
}
171176
} catch {
172177
case x: NoOffsetForPartitionException if shouldSuppress =>
173178
logWarning("Catching NoOffsetForPartitionException since " +
174179
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370")
175180
}
176-
toSeek.asScala.foreach { case (topicPartition, offset) =>
181+
182+
toSeek.asScala.foreach {
183+
case (topicPartition, offset) =>
184+
logDebug(s"Seeking for topicPartition=$topicPartition with offset=$offset")
177185
consumer.seek(topicPartition, offset)
178186
}
179187
// we've called poll, we must pause or next poll may consume messages and set position

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,13 +227,17 @@ object KafkaUtils extends Logging {
227227
Thread.sleep(500)
228228
timeout += 500
229229
}
230+
231+
if (timeout >= waitingForAssigmentTimeout) {
232+
logError(
233+
s"""Consumer assignment wasn't completed within the timeout $waitingForAssigmentTimeout.
234+
|Assigned partitions: ${consumer.assignment()}.""".stripMargin)
235+
}
230236
}
231237

232238
// Determine if Apache Kafka is used instead of MapR Streams
233239
def isStreams(currentOffsets: Map[TopicPartition, Long]): Boolean =
234240
currentOffsets.keys.map(_.topic()).exists(topic => topic.startsWith("/") && topic.contains(":"))
235-
236-
237241
}
238242

239243
object KafkaUtilsPythonHelper {

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/ConsumerStrategy.scala

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,11 @@ private case class Subscribe[K, V](
9292
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
9393
val consumer = new KafkaConsumer[K, V](kafkaParams)
9494
consumer.subscribe(topics)
95-
val toSeek = if (currentOffsets.isEmpty) {
96-
offsets
97-
} else {
98-
currentOffsets
99-
}
95+
logDebug(s"The consumer has been subscribed to topics: ${String.join(", ", topics)}")
96+
97+
val toSeek = if (currentOffsets.isEmpty) offsets else currentOffsets
98+
logDebug(s"TopicPartition-Offset map for seeking: $toSeek")
99+
100100
if (!toSeek.isEmpty) {
101101
// work around KAFKA-3370 when reset is none
102102
// poll will throw if no position, i.e. auto offset reset none and no explicit position
@@ -107,6 +107,7 @@ private case class Subscribe[K, V](
107107
aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
108108
try {
109109
consumer.poll(0)
110+
logDebug("Data fetched for the topics.")
110111

111112
if (KafkaUtils.isStreams(toSeek.asScala.toMap.map(a => (a._1, a._2.toLong)))) {
112113
KafkaUtils.waitForConsumerAssignment(consumer, toSeek.keySet())
@@ -116,7 +117,10 @@ private case class Subscribe[K, V](
116117
logWarning("Catching NoOffsetForPartitionException since " +
117118
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370")
118119
}
119-
toSeek.asScala.foreach { case (topicPartition, offset) =>
120+
121+
toSeek.asScala.foreach {
122+
case (topicPartition, offset) =>
123+
logDebug(s"Seeking for topicPartition=$topicPartition with offset=$offset")
120124
consumer.seek(topicPartition, offset)
121125
}
122126
// we've called poll, we must pause or next poll may consume messages and set position
@@ -152,11 +156,11 @@ private case class SubscribePattern[K, V](
152156
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
153157
val consumer = new KafkaConsumer[K, V](kafkaParams)
154158
consumer.subscribe(pattern, new NoOpConsumerRebalanceListener())
155-
val toSeek = if (currentOffsets.isEmpty) {
156-
offsets
157-
} else {
158-
currentOffsets
159-
}
159+
logDebug(s"The consumer has been subscribed to topics matching a pattern: $pattern")
160+
161+
val toSeek = if (currentOffsets.isEmpty) offsets else currentOffsets
162+
logDebug(s"TopicPartition-Offset map for seeking: $toSeek")
163+
160164
if (!toSeek.isEmpty) {
161165
// work around KAFKA-3370 when reset is none, see explanation in Subscribe above
162166
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
@@ -167,13 +171,17 @@ private case class SubscribePattern[K, V](
167171
KafkaUtils.waitForConsumerAssignment(consumer, toSeek.keySet())
168172
} else {
169173
consumer.poll(0)
174+
logDebug("Data fetched for the topics.")
170175
}
171176
} catch {
172177
case x: NoOffsetForPartitionException if shouldSuppress =>
173178
logWarning("Catching NoOffsetForPartitionException since " +
174179
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370")
175180
}
176-
toSeek.asScala.foreach { case (topicPartition, offset) =>
181+
182+
toSeek.asScala.foreach {
183+
case (topicPartition, offset) =>
184+
logDebug(s"Seeking for topicPartition=$topicPartition with offset=$offset")
177185
consumer.seek(topicPartition, offset)
178186
}
179187
// we've called poll, we must pause or next poll may consume messages and set position

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/KafkaUtils.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,12 @@ object KafkaUtils extends Logging {
243243
Thread.sleep(500)
244244
timeout += 500
245245
}
246+
247+
if (timeout >= waitingForAssigmentTimeout) {
248+
logError(
249+
s"""Consumer assignment wasn't completed within the timeout $waitingForAssigmentTimeout.
250+
|Assigned partitions: ${consumer.assignment()}.""".stripMargin)
251+
}
246252
}
247253

248254
// Determine if Apache Kafka is used instead of MapR Streams

0 commit comments

Comments
 (0)