diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index d225c1ea6b7f1..6558d60345fef 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -231,7 +231,13 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { - val defaultTopic = parameters.get(TOPIC_OPTION_KEY).map(_.trim) + // Picks the defaulttopicname from "path" key, an entry in "parameters" Map, + // if no topic key is present in the "parameters" Map and is provided with key "path". + val defaultTopic = parameters.get(TOPIC_OPTION_KEY) match { + case None => parameters.get(PATH_OPTION_KEY) match { + case path: Option[String] => parameters.get(PATH_OPTION_KEY).map(_.trim) case _ => None} + case topic: Option[String] => parameters.get(TOPIC_OPTION_KEY).map(_.trim) + } val specifiedKafkaParams = kafkaParamsForProducer(parameters) new KafkaSink(sqlContext, new ju.HashMap[String, Object](specifiedKafkaParams.asJava), defaultTopic) @@ -249,7 +255,12 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister s"${SaveMode.ErrorIfExists} (default).") case _ => // good } - val topic = parameters.get(TOPIC_OPTION_KEY).map(_.trim) + val topic = parameters.get(TOPIC_OPTION_KEY) match { + case None => parameters.get(PATH_OPTION_KEY) match { + case path: Option[String] => parameters.get(PATH_OPTION_KEY).map(_.trim) case _ => None} + case topic: Option[String] => parameters.get(TOPIC_OPTION_KEY).map(_.trim) + }// parameters.get(TOPIC_OPTION_KEY).map(_.trim) + val specifiedKafkaParams = kafkaParamsForProducer(parameters) KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution, new ju.HashMap[String, Object](specifiedKafkaParams.asJava), topic) @@ -465,6 +476,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { private val MIN_PARTITIONS_OPTION_KEY = "minpartitions" val TOPIC_OPTION_KEY = "topic" + val PATH_OPTION_KEY = "path" val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE = """