1818package org .apache .spark .examples .streaming
1919
2020import java .nio .ByteBuffer
21-
2221import scala .util .Random
23-
2422import org .apache .spark .Logging
2523import org .apache .spark .SparkConf
2624import org .apache .spark .storage .StorageLevel
2725import org .apache .spark .streaming .Milliseconds
2826import org .apache .spark .streaming .StreamingContext
2927import org .apache .spark .streaming .StreamingContext .toPairDStreamFunctions
3028import org .apache .spark .streaming .kinesis .KinesisUtils
31-
3229import com .amazonaws .auth .DefaultAWSCredentialsProviderChain
3330import com .amazonaws .services .kinesis .AmazonKinesisClient
3431import com .amazonaws .services .kinesis .clientlibrary .lib .worker .InitialPositionInStream
3532import com .amazonaws .services .kinesis .model .PutRecordRequest
33+ import org .apache .log4j .Logger
34+ import org .apache .log4j .Level
3635
3736/**
3837 * Kinesis Spark Streaming WordCount example.
@@ -72,9 +71,7 @@ import com.amazonaws.services.kinesis.model.PutRecordRequest
7271 */
7372object KinesisWordCountASL extends Logging {
7473 def main (args : Array [String ]) {
75- /**
76- * Check that all required args were passed in.
77- */
74+ /* Check that all required args were passed in. */
7875 if (args.length < 2 ) {
7976 System .err.println(
8077 """
@@ -87,57 +84,57 @@ object KinesisWordCountASL extends Logging {
8784 }
8885
8986 StreamingExamples .setStreamingLogLevels()
90-
91- /** Populate the appropriate variables from the given args */
87+
88+ /* Populate the appropriate variables from the given args */
9289 val Array (streamName, endpointUrl) = args
9390
94- /** Determine the number of shards from the stream */
91+ /* Determine the number of shards from the stream */
9592 val kinesisClient = new AmazonKinesisClient (new DefaultAWSCredentialsProviderChain ())
9693 kinesisClient.setEndpoint(endpointUrl)
9794 val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards()
9895 .size()
9996
100- /** In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */
97+ /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */
10198 val numStreams = numShards
10299
103- /**
100+ /*
104101 * numSparkThreads should be 1 more thread than the number of receivers.
105102 * This leaves one thread available for actually processing the data.
106103 */
107104 val numSparkThreads = numStreams + 1
108105
109- /** Setup the and SparkConfig and StreamingContext */
110- /** Spark Streaming batch interval */
106+ /* Setup the and SparkConfig and StreamingContext */
107+ /* Spark Streaming batch interval */
111108 val batchInterval = Milliseconds (2000 )
112109 val sparkConfig = new SparkConf ().setAppName(" KinesisWordCount" )
113110 .setMaster(s " local[ $numSparkThreads] " )
114111 val ssc = new StreamingContext (sparkConfig, batchInterval)
115- /** Setup the checkpoint directory used by Spark Streaming */
112+ /* Setup the checkpoint directory used by Spark Streaming */
116113 ssc.checkpoint(" /tmp/checkpoint" );
117114
118- /** Kinesis checkpoint interval. Same as batchInterval for this example. */
115+ /* Kinesis checkpoint interval. Same as batchInterval for this example. */
119116 val kinesisCheckpointInterval = batchInterval
120117
121- /** Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */
118+ /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */
122119 val kinesisStreams = (0 until numStreams).map { i =>
123120 KinesisUtils .createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval,
124121 InitialPositionInStream .LATEST , StorageLevel .MEMORY_AND_DISK_2 )
125122 }
126123
127- /** Union all the streams */
124+ /* Union all the streams */
128125 val unionStreams = ssc.union(kinesisStreams)
129126
130- /** Convert each line of Array[Byte] to String, split into words, and count them */
127+ /* Convert each line of Array[Byte] to String, split into words, and count them */
131128 val words = unionStreams.flatMap(byteArray => new String (byteArray)
132129 .split(" " ))
133130
134- /** Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */
131+ /* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */
135132 val wordCounts = words.map(word => (word, 1 )).reduceByKey(_ + _)
136133
137- /** Print the first 10 wordCounts by key */
134+ /* Print the first 10 wordCounts by key */
138135 wordCounts.print()
139136
140- /** Start the streaming context and await termination */
137+ /* Start the streaming context and await termination */
141138 ssc.start()
142139 ssc.awaitTermination()
143140 }
@@ -169,13 +166,13 @@ object KinesisWordCountProducerASL {
169166
170167 StreamingExamples .setStreamingLogLevels()
171168
172- /** Populate the appropriate variables from the given args */
169+ /* Populate the appropriate variables from the given args */
173170 val Array (stream, endpoint, recordsPerSecond, wordsPerRecord) = args
174171
175- /** Generate the records and return the totals */
172+ /* Generate the records and return the totals */
176173 val totals = generate(stream, endpoint, recordsPerSecond.toInt, wordsPerRecord.toInt)
177174
178- /** Print the array of (index, total) tuples */
175+ /* Print the array of (index, total) tuples */
179176 println(" Totals" )
180177 totals.foreach(total => println(total.toString()))
181178 }
@@ -187,51 +184,70 @@ object KinesisWordCountProducerASL {
187184
188185 val MaxRandomInts = 10
189186
190- /** Create the Kinesis client */
187+ /* Create the Kinesis client */
191188 val kinesisClient = new AmazonKinesisClient (new DefaultAWSCredentialsProviderChain ())
192189 kinesisClient.setEndpoint(endpoint)
193190
194191 println(s " Putting records onto stream $stream and endpoint $endpoint at a rate of " +
195192 s " $recordsPerSecond records per second and $wordsPerRecord words per record " );
196193
197194 val totals = new Array [Int ](MaxRandomInts )
198- /** Put String records onto the stream per the given recordPerSec and wordsPerRecord */
195+ /* Put String records onto the stream per the given recordPerSec and wordsPerRecord */
199196 for (i <- 1 to 5 ) {
200197
201- /** Generate recordsPerSec records to put onto the stream */
198+ /* Generate recordsPerSec records to put onto the stream */
202199 val records = (1 to recordsPerSecond.toInt).map { recordNum =>
203- /**
200+ /*
204201 * Randomly generate each wordsPerRec words between 0 (inclusive)
205202 * and MAX_RANDOM_INTS (exclusive)
206203 */
207204 val data = (1 to wordsPerRecord.toInt).map(x => {
208- /** Generate the random int */
205+ /* Generate the random int */
209206 val randomInt = Random .nextInt(MaxRandomInts )
210207
211- /** Keep track of the totals */
208+ /* Keep track of the totals */
212209 totals(randomInt) += 1
213210
214211 randomInt.toString()
215212 }).mkString(" " )
216213
217- /** Create a partitionKey based on recordNum */
214+ /* Create a partitionKey based on recordNum */
218215 val partitionKey = s " partitionKey- $recordNum"
219216
220- /** Create a PutRecordRequest with an Array[Byte] version of the data */
217+ /* Create a PutRecordRequest with an Array[Byte] version of the data */
221218 val putRecordRequest = new PutRecordRequest ().withStreamName(stream)
222219 .withPartitionKey(partitionKey)
223220 .withData(ByteBuffer .wrap(data.getBytes()));
224221
225- /** Put the record onto the stream and capture the PutRecordResult */
222+ /* Put the record onto the stream and capture the PutRecordResult */
226223 val putRecordResult = kinesisClient.putRecord(putRecordRequest);
227224 }
228225
229- /** Sleep for a second */
226+ /* Sleep for a second */
230227 Thread .sleep(1000 )
231228 println(" Sent " + recordsPerSecond + " records" )
232229 }
233230
234- /** Convert the totals to (index, total) tuple */
231+ /* Convert the totals to (index, total) tuple */
235232 (0 to (MaxRandomInts - 1 )).zip(totals)
236233 }
237234}
235+
236+ /**
237+ * Utility functions for Spark Streaming examples.
238+ * This has been lifted from the examples/ project to remove the circular dependency.
239+ */
240+ object StreamingExamples extends Logging {
241+
242+ /** Set reasonable logging levels for streaming if the user has not configured log4j. */
243+ def setStreamingLogLevels () {
244+ val log4jInitialized = Logger .getRootLogger.getAllAppenders.hasMoreElements
245+ if (! log4jInitialized) {
246+ // We first log something to initialize Spark's default logging, then we override the
247+ // logging level.
248+ logInfo(" Setting log level to [WARN] for streaming example." +
249+ " To override add a custom log4j.properties to the classpath." )
250+ Logger .getRootLogger.setLevel(Level .WARN )
251+ }
252+ }
253+ }
0 commit comments