diff --git a/connector/kafka-0-10-sql/pom.xml b/connector/kafka-0-10-sql/pom.xml index 4ab99f7929591..dffefa89f1cce 100644 --- a/connector/kafka-0-10-sql/pom.xml +++ b/connector/kafka-0-10-sql/pom.xml @@ -78,6 +78,16 @@ org.scala-lang.modules scala-parallel-collections_${scala.binary.version} + + com.sparkconnector + SparkConnectorForEventHub + 1.0.0 + + + trident + trident-token-library + 1 + org.apache.kafka kafka-clients diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 82ad75e028afe..9e72373abedf6 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -22,9 +22,9 @@ import java.util.{Locale, UUID} import scala.jdk.CollectionConverters._ +import EventStreamsKafkaConnector.KafkaOptionsUtils import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} import org.apache.spark.internal.{Logging, LogKeys, MDC} @@ -58,7 +58,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister with Logging { import KafkaSourceProvider._ - override def shortName(): String = "kafka" + override def shortName(): String = "eventstreams" /** * Returns the name and schema of the source. In addition, it also verifies whether the options @@ -70,9 +70,10 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister providerName: String, parameters: Map[String, String]): (String, StructType) = { val caseInsensitiveParameters = CaseInsensitiveMap(parameters) + val translatedKafkaParameters = translateEventStreamProperties(caseInsensitiveParameters) validateStreamOptions(caseInsensitiveParameters) require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one") - val includeHeaders = caseInsensitiveParameters.getOrElse(INCLUDE_HEADERS, "false").toBoolean + val includeHeaders = translatedKafkaParameters.getOrElse(INCLUDE_HEADERS, "false").toBoolean (shortName(), KafkaRecordToRowConverter.kafkaSchema(includeHeaders)) } @@ -88,8 +89,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister // partial data since Kafka will assign partitions to multiple consumers having the same group // id. Hence, we should generate a unique id for each query. val uniqueGroupId = streamingUniqueGroupId(caseInsensitiveParameters, metadataPath) - - val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveParameters) + val translatedKafkaParameters = + translateEventStreamProperties(caseInsensitiveParameters) + val specifiedKafkaParams = convertToSpecifiedParams(translatedKafkaParameters) val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( caseInsensitiveParameters, STARTING_TIMESTAMP_OPTION_KEY, @@ -128,7 +130,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister parameters: Map[String, String]): BaseRelation = { val caseInsensitiveParameters = CaseInsensitiveMap(parameters) validateBatchOptions(caseInsensitiveParameters) - val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveParameters) + val translatedKafkaParameters = translateEventStreamProperties(caseInsensitiveParameters) + val specifiedKafkaParams = convertToSpecifiedParams(translatedKafkaParameters) val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( caseInsensitiveParameters, STARTING_TIMESTAMP_OPTION_KEY, @@ -142,8 +145,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister LatestOffsetRangeLimit) assert(endingRelationOffsets != EarliestOffsetRangeLimit) - checkOffsetLimitValidity(startingRelationOffsets, endingRelationOffsets) - val includeHeaders = caseInsensitiveParameters.getOrElse(INCLUDE_HEADERS, "false").toBoolean new KafkaRelation( @@ -357,6 +358,29 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister validateGeneralOptions(params) } + private def translateEventStreamProperties(params: CaseInsensitiveMap[String]) + : CaseInsensitiveMap[String] = { + if (params.contains(EVENTSTREAM_NAME_OPTION_KEY) || + params.contains(EVENTSTREAM_ARTIFACT_ID_OPTION_KEY) || + params.contains(EVENTSTREAM_CONSUMER_GROUP_OPTION_KEY)) { + validateEventStreamOptions(params) + return CaseInsensitiveMap(KafkaOptionsUtils.buildKafkaOptionsFromSparkConfig(params)); + } + params; + } + + private def validateEventStreamOptions(params: CaseInsensitiveMap[String]) = { + // Stream specific options + if (!params.contains(EVENTSTREAM_NAME_OPTION_KEY) || + !params.contains(EVENTSTREAM_ARTIFACT_ID_OPTION_KEY) || + !params.contains(EVENTSTREAM_CONSUMER_GROUP_OPTION_KEY)) { + throw new IllegalArgumentException( + "All threee EventStream properties - " + + "(eventstream.itemid, eventstream.name and eventstrream.conumergroup) " + + "are necessarry to run spark with eventstream configs ") + } + } + private def validateBatchOptions(params: CaseInsensitiveMap[String]) = { // Batch specific options KafkaSourceProvider.getKafkaOffsetRangeLimit( @@ -454,6 +478,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister override def toBatch(): Batch = { val caseInsensitiveOptions = CaseInsensitiveMap(options.asScala.toMap) validateBatchOptions(caseInsensitiveOptions) + val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions) val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( @@ -466,8 +491,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) - checkOffsetLimitValidity(startingRelationOffsets, endingRelationOffsets) - new KafkaBatch( strategy(caseInsensitiveOptions), caseInsensitiveOptions, @@ -562,6 +585,9 @@ private[kafka010] object KafkaSourceProvider extends Logging { private val SUBSCRIBE_PATTERN = "subscribepattern" private val SUBSCRIBE = "subscribe" private val STRATEGY_OPTION_KEYS = Set(SUBSCRIBE, SUBSCRIBE_PATTERN, ASSIGN) + private val EVENTSTREAM_ARTIFACT_ID_OPTION_KEY = "eventstream.itemid"; + private val EVENTSTREAM_NAME_OPTION_KEY = "eventstream.name"; + private val EVENTSTREAM_CONSUMER_GROUP_OPTION_KEY = "eventstream.consumerGroup"; private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets" private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets" private[kafka010] val STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "startingoffsetsbytimestamp" @@ -613,60 +639,6 @@ private[kafka010] object KafkaSourceProvider extends Logging { private val serClassName = classOf[ByteArraySerializer].getName private val deserClassName = classOf[ByteArrayDeserializer].getName - def checkStartOffsetNotGreaterThanEndOffset( - startOffset: Long, - endOffset: Long, - topicPartition: TopicPartition, - exception: (Long, Long, TopicPartition) => Exception): Unit = { - // earliest or latest offsets are negative and should not be compared - if (startOffset > endOffset && startOffset >= 0 && endOffset >= 0) { - throw exception(startOffset, endOffset, topicPartition) - } - } - - def checkOffsetLimitValidity( - startOffset: KafkaOffsetRangeLimit, - endOffset: KafkaOffsetRangeLimit): Unit = { - startOffset match { - case start: SpecificOffsetRangeLimit if endOffset.isInstanceOf[SpecificOffsetRangeLimit] => - val end = endOffset.asInstanceOf[SpecificOffsetRangeLimit] - if (start.partitionOffsets.keySet != end.partitionOffsets.keySet) { - throw KafkaExceptions.unmatchedTopicPartitionsBetweenOffsets( - start.partitionOffsets.keySet, end.partitionOffsets.keySet - ) - } - start.partitionOffsets.foreach { - case (tp, startOffset) => - checkStartOffsetNotGreaterThanEndOffset( - startOffset, - end.partitionOffsets(tp), - tp, - KafkaExceptions.unresolvedStartOffsetGreaterThanEndOffset - ) - } - - case start: SpecificTimestampRangeLimit - if endOffset.isInstanceOf[SpecificTimestampRangeLimit] => - val end = endOffset.asInstanceOf[SpecificTimestampRangeLimit] - if (start.topicTimestamps.keySet != end.topicTimestamps.keySet) { - throw KafkaExceptions.unmatchedTopicPartitionsBetweenOffsets( - start.topicTimestamps.keySet, end.topicTimestamps.keySet - ) - } - start.topicTimestamps.foreach { - case (tp, startOffset) => - checkStartOffsetNotGreaterThanEndOffset( - startOffset, - end.topicTimestamps(tp), - tp, - KafkaExceptions.unresolvedStartTimestampGreaterThanEndTimestamp - ) - } - - case _ => // do nothing - } - } - def getKafkaOffsetRangeLimit( params: CaseInsensitiveMap[String], globalOffsetTimestampOptionKey: String,