From 9dc8e0fa6e679eb511b70a1796d9ae127661a95e Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Fri, 28 Oct 2016 17:07:42 +0800 Subject: [PATCH 1/4] Add Java code snippet for Kafka guide --- docs/streaming-kafka-0-10-integration.md | 125 +++++++++++++++++++++-- 1 file changed, 119 insertions(+), 6 deletions(-) diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md index de95ea90137eb..4f64d452ac410 100644 --- a/docs/streaming-kafka-0-10-integration.md +++ b/docs/streaming-kafka-0-10-integration.md @@ -8,9 +8,9 @@ The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 [ ### Linking For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). - groupId = org.apache.spark - artifactId = spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}} - version = {{site.SPARK_VERSION_SHORT}} + groupId = org.apache.spark + artifactId = spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}} + version = {{site.SPARK_VERSION_SHORT}} ### Creating a Direct Stream Note that the namespace for the import includes the version, org.apache.spark.streaming.kafka010 @@ -44,6 +44,42 @@ For Scala/Java applications using SBT/Maven project definitions, link your strea Each item in the stream is a [ConsumerRecord](http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html)
+ import java.util.*; + import org.apache.spark.SparkConf; + import org.apache.spark.TaskContext; + import org.apache.spark.api.java.*; + import org.apache.spark.api.java.function.*; + import org.apache.spark.streaming.api.java.*; + import org.apache.spark.streaming.kafka010.*; + import org.apache.kafka.clients.consumer.ConsumerRecord; + import org.apache.kafka.common.TopicPartition; + import org.apache.kafka.common.serialization.StringDeserializer; + import scala.Tuple2; + + Map kafkaParams = new HashMap(); + kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092"); + kafkaParams.put("key.deserializer", StringDeserializer.class); + kafkaParams.put("value.deserializer", StringDeserializer.class); + kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); + kafkaParams.put("auto.offset.reset", "latest"); + kafkaParams.put("enable.auto.commit", false); + + Collection topics = Arrays.asList("topicA", "topicB"); + + final JavaInputDStream> stream = + KafkaUtils.createDirectStream( + streamingContext, + LocationStrategies.PreferConsistent(), + ConsumerStrategies.Subscribe(topics, kafkaParams) + ); + + stream.mapToPair( + new PairFunction, String, String>() { + @Override + public Tuple2 call(ConsumerRecord record) { + return new Tuple2<>(record.key(), record.value()); + } + })
@@ -85,6 +121,20 @@ If you have a use case that is better suited to batch processing, you can create
+ // Import dependencies and create kafka params as in Create Direct Stream above + + OffsetRange[] offsetRanges = new OffsetRange[]{ + // topic, partition, inclusive starting offset, exclusive ending offset + OffsetRange.create("test", 0, 0, 100), + OffsetRange.create("test", 1, 0, 100) + }; + + JavaRDD> rdd = KafkaUtils.createRDD( + sparkContext, + kafkaParams, + offsetRanges, + LocationStrategies.PreferConsistent() + );
@@ -103,6 +153,20 @@ Note that you cannot use `PreferBrokers`, because without the stream there is no }
+ stream.foreachRDD(new VoidFunction>>() { + @Override + public void call(JavaRDD> rdd) { + final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + rdd.foreachPartition(new VoidFunction>>() { + @Override + public void call(Iterator> consumerRecords) throws Exception { + OffsetRange o = offsetRanges[TaskContext.get().partitionId()]; + System.out.println( + o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()); + } + }); + } + });
@@ -120,15 +184,24 @@ Kafka has an offset commit API that stores offsets in a special Kafka topic. By
stream.foreachRDD { rdd => - val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // some time later, after outputs have completed - stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets) + stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if called on the result of createDirectStream, not after transformations. The commitAsync call is threadsafe, but must occur after outputs if you want meaningful semantics.
+ stream.foreachRDD(new VoidFunction>>() { + @Override + public void call(JavaRDD> rdd) { + OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + + // some time later, after outputs have completed + ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); + } + });
@@ -141,9 +214,11 @@ For data stores that support transactions, saving offsets in the same transactio // begin from the the offsets committed to the database val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet => - new TopicPartition(resultSet.string("topic")), resultSet.int("partition")) -> resultSet.long("offset") + new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset") }.toMap + import org.apache.spark.streaming.kafka010.ConsumerStrategies.Assign + val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, @@ -165,6 +240,36 @@ For data stores that support transactions, saving offsets in the same transactio }
+ // The details depend on your data store, but the general idea looks like this + + // begin from the the offsets committed to the database + Map fromOffsets = new HashMap<>(); + for (resultSet: selectOffsetsFromYourDatabase) + fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset")); + } + + JavaInputDStream> stream = KafkaUtils.createDirectStream( + streamingContext, + LocationStrategies.PreferConsistent(), + ConsumerStrategies.Assign(fromOffsets.keySet(), kafkaParams, fromOffsets) + ); + + stream.foreachRDD(new VoidFunction>>() { + @Override + public void call(JavaRDD> rdd) { + OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + + Object results = yourCalculation(rdd); + + yourTransactionBlock { + // update results + + // update offsets where the end of existing offsets matches the beginning of this batch of offsets + + // assert that offsets were updated correctly + } + } + });
@@ -185,6 +290,14 @@ The new Kafka consumer [supports SSL](http://kafka.apache.org/documentation.html )
+ Map kafkaParams = new HashMap(); + // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS + kafkaParams.put("security.protocol", "SSL"); + kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks"); + kafkaParams.put("ssl.truststore.password", "test1234"); + kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks"); + kafkaParams.put("ssl.keystore.password", "test1234"); + kafkaParams.put("ssl.key.password", "test1234");
From 400ae12f90a5ee5bf01d3175ea4156702d5eb5e7 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sat, 29 Oct 2016 21:28:19 +0800 Subject: [PATCH 2/4] Address comments --- docs/streaming-kafka-0-10-integration.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md index 4f64d452ac410..713023c0a073b 100644 --- a/docs/streaming-kafka-0-10-integration.md +++ b/docs/streaming-kafka-0-10-integration.md @@ -56,7 +56,7 @@ Each item in the stream is a [ConsumerRecord](http://kafka.apache.org/0100/javad import org.apache.kafka.common.serialization.StringDeserializer; import scala.Tuple2; - Map kafkaParams = new HashMap(); + Map kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); @@ -123,13 +123,13 @@ If you have a use case that is better suited to batch processing, you can create
// Import dependencies and create kafka params as in Create Direct Stream above - OffsetRange[] offsetRanges = new OffsetRange[]{ + OffsetRange[] offsetRanges = { // topic, partition, inclusive starting offset, exclusive ending offset OffsetRange.create("test", 0, 0, 100), OffsetRange.create("test", 1, 0, 100) }; - JavaRDD> rdd = KafkaUtils.createRDD( + JavaRDD> rdd = KafkaUtils.createRDD( sparkContext, kafkaParams, offsetRanges, @@ -159,7 +159,7 @@ Note that you cannot use `PreferBrokers`, because without the stream there is no final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); rdd.foreachPartition(new VoidFunction>>() { @Override - public void call(Iterator> consumerRecords) throws Exception { + public void call(Iterator> consumerRecords) { OffsetRange o = offsetRanges[TaskContext.get().partitionId()]; System.out.println( o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()); @@ -244,11 +244,11 @@ For data stores that support transactions, saving offsets in the same transactio // begin from the the offsets committed to the database Map fromOffsets = new HashMap<>(); - for (resultSet: selectOffsetsFromYourDatabase) + for (resultSet : selectOffsetsFromYourDatabase) fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset")); } - JavaInputDStream> stream = KafkaUtils.createDirectStream( + JavaInputDStream> stream = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.Assign(fromOffsets.keySet(), kafkaParams, fromOffsets) From bd622e1ce2fdd2138c4e03d4680298d75deecc83 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sat, 29 Oct 2016 23:14:37 +0800 Subject: [PATCH 3/4] Address Cody's comments --- docs/streaming-kafka-0-10-integration.md | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md index 713023c0a073b..1b72446b63e42 100644 --- a/docs/streaming-kafka-0-10-integration.md +++ b/docs/streaming-kafka-0-10-integration.md @@ -216,8 +216,6 @@ For data stores that support transactions, saving offsets in the same transactio val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet => new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset") }.toMap - - import org.apache.spark.streaming.kafka010.ConsumerStrategies.Assign val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, @@ -230,13 +228,13 @@ For data stores that support transactions, saving offsets in the same transactio val results = yourCalculation(rdd) - yourTransactionBlock { - // update results + // begin your transaction - // update offsets where the end of existing offsets matches the beginning of this batch of offsets + // update results + // update offsets where the end of existing offsets matches the beginning of this batch of offsets + // assert that offsets were updated correctly - // assert that offsets were updated correctly - } + // end your transaction }
@@ -261,13 +259,13 @@ For data stores that support transactions, saving offsets in the same transactio Object results = yourCalculation(rdd); - yourTransactionBlock { - // update results + // begin your transaction - // update offsets where the end of existing offsets matches the beginning of this batch of offsets + // update results + // update offsets where the end of existing offsets matches the beginning of this batch of offsets + // assert that offsets were updated correctly - // assert that offsets were updated correctly - } + // end your transaction } });
From 8be3c8b1f586a1b392f6b0b6d18768f7cd84089f Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sat, 29 Oct 2016 23:39:05 +0800 Subject: [PATCH 4/4] Fix space --- docs/streaming-kafka-0-10-integration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md index 1b72446b63e42..c1ef396907db7 100644 --- a/docs/streaming-kafka-0-10-integration.md +++ b/docs/streaming-kafka-0-10-integration.md @@ -216,7 +216,7 @@ For data stores that support transactions, saving offsets in the same transactio val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet => new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset") }.toMap - + val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent,