From d4e1ed0c25121ad5bf24cfe137e2ee1bff430c94 Mon Sep 17 00:00:00 2001 From: Satyajit Vegesna Date: Wed, 1 Aug 2018 22:12:57 -0700 Subject: [PATCH 1/3] SPARK-20597 KafkaSourceProvider falls back on path as synonym for topic --- .../sql/kafka010/KafkaSourceProvider.scala | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index d225c1ea6b7f1..b1e8b49f31cb2 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -231,7 +231,13 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { - val defaultTopic = parameters.get(TOPIC_OPTION_KEY).map(_.trim) + // Picks the defaulttopicname from "path" key, an entry in "parameters" Map, + // if no topic key is present in the "parameters" Map and is provided with key "path". + val defaultTopic = parameters.get(TOPIC_OPTION_KEY) match { + case None => parameters.get(PATH_OPTION_KEY) match { + case path: Option[String] => parameters.get(PATH_OPTION_KEY).map(_.trim) case _ => None} + case topic: Option[String] => parameters.get(TOPIC_OPTION_KEY).map(_.trim) + } val specifiedKafkaParams = kafkaParamsForProducer(parameters) new KafkaSink(sqlContext, new ju.HashMap[String, Object](specifiedKafkaParams.asJava), defaultTopic) @@ -249,7 +255,12 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister s"${SaveMode.ErrorIfExists} (default).") case _ => // good } - val topic = parameters.get(TOPIC_OPTION_KEY).map(_.trim) + val topic = parameters.get(TOPIC_OPTION_KEY) match { + case None => parameters.get(PATH_OPTION_KEY) match { + case path: Option[String] => parameters.get(PATH_OPTION_KEY).map(_.trim) case _ => None} + case topic: Option[String] => parameters.get(TOPIC_OPTION_KEY).map(_.trim) + }// parameters.get(TOPIC_OPTION_KEY).map(_.trim) + val specifiedKafkaParams = kafkaParamsForProducer(parameters) KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution, new ju.HashMap[String, Object](specifiedKafkaParams.asJava), topic) @@ -465,24 +476,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { private val MIN_PARTITIONS_OPTION_KEY = "minpartitions" val TOPIC_OPTION_KEY = "topic" - - val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE = - """ - |Some data may have been lost because they are not available in Kafka any more; either the - | data was aged out by Kafka or the topic may have been deleted before all the data in the - | topic was processed. If you want your streaming query to fail on such cases, set the source - | option "failOnDataLoss" to "true". - """.stripMargin - - val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE = - """ - |Some data may have been lost because they are not available in Kafka any more; either the - | data was aged out by Kafka or the topic may have been deleted before all the data in the - | topic was processed. If you don't want your streaming query to fail on such cases, set the - | source option "failOnDataLoss" to "false". - """.stripMargin - - + val PATH_OPTION_KEY = "path" private val deserClassName = classOf[ByteArrayDeserializer].getName From 381e66fa0bdd14b5754d8d81710021714e5fc031 Mon Sep 17 00:00:00 2001 From: Satyajit Vegesna Date: Wed, 1 Aug 2018 22:22:07 -0700 Subject: [PATCH 2/3] Added parameters that were mistakenly taken out in previous commit --- .../spark/sql/kafka010/KafkaSourceProvider.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index b1e8b49f31cb2..e641ecafaaf62 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -478,6 +478,21 @@ private[kafka010] object KafkaSourceProvider extends Logging { val TOPIC_OPTION_KEY = "topic" val PATH_OPTION_KEY = "path" + val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE = + """ + |Some data may have been lost because they are not available in Kafka any more; either the + | data was aged out by Kafka or the topic may have been deleted before all the data in the + | topic was processed. If you want your streaming query to fail on such cases, set the source + | option "failOnDataLoss" to "true". + """.stripMargin + val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE = + """ + |Some data may have been lost because they are not available in Kafka any more; either the + | data was aged out by Kafka or the topic may have been deleted before all the data in the + | topic was processed. If you don't want your streaming query to fail on such cases, set the + | source option "failOnDataLoss" to "false". + """.stripMargin + private val deserClassName = classOf[ByteArrayDeserializer].getName def getKafkaOffsetRangeLimit( From 6dc893a681721b51e61a9df099ae8f2c865c38c1 Mon Sep 17 00:00:00 2001 From: Satyajit Vegesna Date: Wed, 1 Aug 2018 22:24:13 -0700 Subject: [PATCH 3/3] Added parameters that were mistakenly taken out in previous commit --- .../org/apache/spark/sql/kafka010/KafkaSourceProvider.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index e641ecafaaf62..6558d60345fef 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -485,6 +485,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { | topic was processed. If you want your streaming query to fail on such cases, set the source | option "failOnDataLoss" to "true". """.stripMargin + val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE = """ |Some data may have been lost because they are not available in Kafka any more; either the @@ -493,6 +494,8 @@ private[kafka010] object KafkaSourceProvider extends Logging { | source option "failOnDataLoss" to "false". """.stripMargin + + private val deserClassName = classOf[ByteArrayDeserializer].getName def getKafkaOffsetRangeLimit(