Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.streaming.kafka

import scala.collection.Map
import scala.reflect.ClassTag
import scala.reflect.{classTag, ClassTag}

import java.util.Properties
import java.util.concurrent.Executors
Expand Down Expand Up @@ -48,8 +48,8 @@ private[streaming]
class KafkaInputDStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: Manifest,
T <: Decoder[_]: Manifest](
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
Expand All @@ -66,8 +66,8 @@ private[streaming]
class KafkaReceiver[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: Manifest,
T <: Decoder[_]: Manifest](
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
Expand Down Expand Up @@ -103,10 +103,10 @@ class KafkaReceiver[
tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
}

val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[K]]
val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[V]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object KafkaUtils {
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
*/
def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest](
def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
Expand All @@ -89,8 +89,6 @@ object KafkaUtils {
groupId: String,
topics: JMap[String, JInt]
): JavaPairReceiverInputDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}

Expand All @@ -111,8 +109,6 @@ object KafkaUtils {
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
storageLevel)
}
Expand Down Expand Up @@ -140,13 +136,11 @@ object KafkaUtils {
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[K, V] = {
implicit val keyCmt: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val valueCmt: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)

implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)

createStream[K, V, U, T](
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
Expand Down
7 changes: 6 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ object MimaExcludes {
"org.apache.spark.storage.TachyonStore.putValues")
) ++
Seq(
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.flume.FlumeReceiver.this"),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"org.apache.spark.streaming.kafka.KafkaUtils.createStream"),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"org.apache.spark.streaming.kafka.KafkaReceiver.this")
) ++
Seq( // Ignore some private methods in ALS.
ProblemFilters.exclude[MissingMethodProblem](
Expand Down