Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,13 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
val defaultTopic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
// Picks the defaulttopicname from "path" key, an entry in "parameters" Map,
// if no topic key is present in the "parameters" Map and is provided with key "path".
val defaultTopic = parameters.get(TOPIC_OPTION_KEY) match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this simpler as something like

val defaultTopic = parameters.getOrElse(TOPIC_OPTION_KEY, parameters.get(PATH_OPTION_KEY)).map(_.trim)

case None => parameters.get(PATH_OPTION_KEY) match {
case path: Option[String] => parameters.get(PATH_OPTION_KEY).map(_.trim) case _ => None}
case topic: Option[String] => parameters.get(TOPIC_OPTION_KEY).map(_.trim)
}
val specifiedKafkaParams = kafkaParamsForProducer(parameters)
new KafkaSink(sqlContext,
new ju.HashMap[String, Object](specifiedKafkaParams.asJava), defaultTopic)
Expand All @@ -249,7 +255,12 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
s"${SaveMode.ErrorIfExists} (default).")
case _ => // good
}
val topic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
val topic = parameters.get(TOPIC_OPTION_KEY) match {
case None => parameters.get(PATH_OPTION_KEY) match {
case path: Option[String] => parameters.get(PATH_OPTION_KEY).map(_.trim) case _ => None}
case topic: Option[String] => parameters.get(TOPIC_OPTION_KEY).map(_.trim)
}// parameters.get(TOPIC_OPTION_KEY).map(_.trim)

val specifiedKafkaParams = kafkaParamsForProducer(parameters)
KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution,
new ju.HashMap[String, Object](specifiedKafkaParams.asJava), topic)
Expand Down Expand Up @@ -465,6 +476,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
private val MIN_PARTITIONS_OPTION_KEY = "minpartitions"

val TOPIC_OPTION_KEY = "topic"
val PATH_OPTION_KEY = "path"

val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
"""
Expand Down