Skip to content

Commit 5ee588f

Browse files
Mikhail Gorbovekrivokonmapr
authored andcommitted
2.2.1 build fixed (apache#231)
* MapR [SPARK-164] Update Kafka version to 1.0.1-mapr in Spark Kafka Producer module
1 parent 1e1dee3 commit 5ee588f

File tree

7 files changed

+40
-268
lines changed

7 files changed

+40
-268
lines changed

examples/src/main/scala/org/apache/spark/examples/streaming/KafkaProducerExample.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,11 @@ object KafkaProducerExample extends App {
7979
val producerConf = new ProducerConf(
8080
bootstrapServers = kafkaBrokers.split(",").toList)
8181

82-
val items = (0 until numMessages.toInt).map(i => Item(i, i))
83-
val defaultRDD: RDD[Item] = ssc.sparkContext.parallelize(items)
84-
val dStream: DStream[Item] = new ConstantInputDStream[Item](ssc, defaultRDD)
82+
val items = (0 until numMessages.toInt).map(i => Item(i, i).toString)
83+
val defaultRDD: RDD[String] = ssc.sparkContext.parallelize(items)
84+
val dStream: DStream[String] = new ConstantInputDStream[String](ssc, defaultRDD)
8585

86-
dStream.sendToKafka[ItemJsonSerializer](topics, producerConf)
86+
dStream.foreachRDD(_.sendToKafka(topics, producerConf))
8787
dStream.count().print()
8888

8989
ssc.start()

external/kafka-producer/pom.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
<groupId>org.eclipse.jetty</groupId>
5252
<artifactId>jetty-util</artifactId>
5353
</dependency>
54-
5554
<dependency>
5655
<groupId>org.apache.spark</groupId>
5756
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
@@ -120,6 +119,11 @@
120119
</exclusion>
121120
</exclusions>
122121
</dependency>
122+
<dependency>
123+
<groupId>org.scala-lang</groupId>
124+
<artifactId>scala-library</artifactId>
125+
<scope>provided</scope>
126+
</dependency>
123127
<dependency>
124128
<groupId>net.sf.jopt-simple</groupId>
125129
<artifactId>jopt-simple</artifactId>

external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/KafkaRDDWriter.scala

Lines changed: 0 additions & 103 deletions
This file was deleted.

external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/ProducerConf.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,14 @@ case class ProducerConf(
3333
linger: Long = 0,
3434
others: Map[String, String] = Map.empty[String, String]) {
3535

36-
private var keySerializer: Option[String] = null
37-
38-
private var valueSerializer: Option[String] = null
36+
private var keySerializer: String = ProducerConf.ByteArraySerializer
37+
private var valueSerializer: String = ProducerConf.StringSerializer
3938

4039
def asJMap(): java.util.Map[String, Object] = {
4140
val kafkaParams = Map[String, AnyRef](
4241
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers.mkString(","),
43-
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> keySerializer.getOrElse(
44-
ProducerConf.ByteArraySerializer),
45-
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> valueSerializer.getOrElse(
46-
ProducerConf.StringSerializer),
42+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> keySerializer,
43+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> valueSerializer,
4744
ProducerConfig.ACKS_CONFIG -> acks,
4845
ProducerConfig.BUFFER_MEMORY_CONFIG -> bufferMemory.toString,
4946
ProducerConfig.COMPRESSION_TYPE_CONFIG -> compressionType.getOrElse("none"),
@@ -56,12 +53,12 @@ case class ProducerConf(
5653
}
5754

5855
def withKeySerializer(serializer: String): ProducerConf = {
59-
this.keySerializer = Some(serializer)
56+
this.keySerializer = serializer
6057
this
6158
}
6259

6360
def withValueSerializer(serializer: String): ProducerConf = {
64-
this.valueSerializer = Some(serializer)
61+
this.valueSerializer = serializer
6562
this
6663
}
6764
}
@@ -71,7 +68,6 @@ object ProducerConf {
7168
private val Prefix = "spark.streaming.kafka"
7269

7370
val ByteArraySerializer = "org.apache.kafka.common.serialization.ByteArraySerializer"
74-
7571
val StringSerializer = "org.apache.kafka.common.serialization.StringSerializer"
7672

7773
def apply(sparkConf: SparkConf): ProducerConf = {

external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/RDDFunctions.scala

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,30 @@
1717

1818
package org.apache.spark.streaming.kafka.producer
1919

20-
import scala.reflect.ClassTag
20+
import scala.language.implicitConversions
2121

22-
import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer}
22+
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
2323

2424
import org.apache.spark.rdd.RDD
2525

26-
class RDDFunctions[T: ClassTag](rdd: RDD[T]) extends Serializable {
27-
def sendToKafka[S <: Serializer[T] : ClassTag](
28-
topic: String,
29-
conf: ProducerConf): Unit = {
30-
rdd.sparkContext.runJob(
31-
rdd,
32-
new KafkaRDDWriter[Array[Byte], T, ByteArraySerializer, S](topic, conf).sendV _)
26+
class RDDFunctions[T](rdd: RDD[T]) {
27+
def sendToKafka(topic: String, conf: ProducerConf): Unit = {
28+
rdd.foreachPartition(iter => {
29+
val producer = new KafkaProducer[String, T](conf.asJMap())
30+
iter.foreach { item =>
31+
producer.send(new ProducerRecord[String, T](topic, item))
32+
}
33+
})
3334
}
3435
}
3536

36-
class PairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) extends Serializable {
37-
def sendToKafka[
38-
KS <: Serializer[K] : ClassTag,
39-
VS <: Serializer[V] : ClassTag
40-
](topic: String, conf: ProducerConf): Unit = {
41-
rdd.sparkContext.runJob(
42-
rdd,
43-
new KafkaRDDWriter[K, V, KS, VS](topic, conf).sendKV _)
37+
class PairRDDFunctions[K, V](rdd: RDD[(K, V)]) {
38+
def sendToKafka(topic: String, conf: ProducerConf): Unit = {
39+
rdd.foreachPartition(iter => {
40+
val producer = new KafkaProducer[K, V](conf.asJMap())
41+
iter.foreach { item =>
42+
producer.send(new ProducerRecord[K, V](topic, item._1, item._2))
43+
}
44+
})
4445
}
4546
}

external/kafka-producer/src/test/scala/org/apache/spark/streaming/kafka/producer/RDDProducerSuite.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,17 @@
1717

1818
package org.apache.spark.streaming.kafka.producer
1919

20+
import scala.collection.JavaConverters._
21+
import scala.concurrent.duration._
22+
import scala.language.postfixOps
23+
2024
import org.apache.kafka.clients.consumer.ConsumerRecord
2125
import org.apache.kafka.common.TopicPartition
2226
import org.apache.kafka.common.serialization.StringSerializer
27+
2328
import org.apache.spark.rdd.RDD
2429
import org.apache.spark.{SparkConf, SparkContext}
2530

26-
import scala.collection.JavaConverters._
27-
import scala.concurrent.duration._
28-
import scala.language.postfixOps
29-
3031
class RDDProducerSuite extends BaseKafkaProducerTest {
3132

3233
private val numMessages = 10
@@ -51,7 +52,7 @@ class RDDProducerSuite extends BaseKafkaProducerTest {
5152

5253
val rdd: RDD[String] = sparkContext.parallelize(List.fill(numMessages)(recordValue))
5354

54-
rdd.sendToKafka[StringSerializer](topic, testConf)
55+
rdd.sendToKafka(topic, testConf)
5556

5657
consumer.assign(List(new TopicPartition(topic, partition)).asJava)
5758

@@ -75,7 +76,7 @@ class RDDProducerSuite extends BaseKafkaProducerTest {
7576

7677
val rdd = sparkContext.parallelize(List.fill(numMessages)(recordKey, recordValue))
7778

78-
rdd.sendToKafka[StringSerializer, StringSerializer](topic, testConf)
79+
rdd.sendToKafka(topic, testConf)
7980

8081
consumer.assign(List(new TopicPartition(topic, partition)).asJava)
8182

0 commit comments

Comments
 (0)