Skip to content

Commit 691a6be

Browse files
committed
fixed tests and formatting, fixed a bug with JavaKinesisWordCount during
union of streams
1 parent 0e1c67b commit 691a6be

File tree

3 files changed

+16
-14
lines changed

3 files changed

+16
-14
lines changed

examples/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public static void main(String[] args) {
136136

137137
/** Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */
138138
List<JavaDStream<byte[]>> streamsList = new ArrayList<JavaDStream<byte[]>>(numStreams);
139-
for (int i = 0; i < streamsList.size(); i++) {
139+
for (int i = 0; i < numStreams; i++) {
140140
streamsList.add(
141141
KinesisUtils.createStream(jssc, streamName, endpointUrl, checkpointInterval,
142142
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2())

examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ object KinesisWordCountASL extends Logging {
9494
/** Determine the number of shards from the stream */
9595
val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
9696
kinesisClient.setEndpoint(endpointUrl)
97-
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size()
97+
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards()
98+
.size()
9899

99100
/** In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */
100101
val numStreams = numShards
@@ -108,7 +109,8 @@ object KinesisWordCountASL extends Logging {
108109
/** Setup the and SparkConfig and StreamingContext */
109110
/** Spark Streaming batch interval */
110111
val batchInterval = Milliseconds(2000)
111-
val sparkConfig = new SparkConf().setAppName("KinesisWordCount").setMaster(s"local[$numSparkThreads]")
112+
val sparkConfig = new SparkConf().setAppName("KinesisWordCount")
113+
.setMaster(s"local[$numSparkThreads]")
112114
val ssc = new StreamingContext(sparkConfig, batchInterval)
113115
/** Setup the checkpoint directory used by Spark Streaming */
114116
ssc.checkpoint("/tmp/checkpoint");

extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,7 @@ import com.amazonaws.services.kinesis.model.Record
4646
*/
4747
class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAfter
4848
with EasyMockSugar {
49-
50-
test("kinesis input stream") {
51-
val ssc = new StreamingContext(master, framework, batchDuration)
52-
// Tests the API, does not actually test data receiving
53-
val kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
54-
"https://kinesis.us-west-2.amazonaws.com", Seconds(2),
55-
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2);
56-
ssc.stop()
57-
}
58-
49+
5950
val app = "TestKinesisReceiver"
6051
val stream = "mySparkStream"
6152
val endpoint = "endpoint-url"
@@ -74,14 +65,23 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
7465
var checkpointStateMock: KinesisCheckpointState = _
7566
var currentClockMock: Clock = _
7667

77-
before {
68+
override def beforeFunction() = {
7869
receiverMock = mock[KinesisReceiver]
7970
checkpointerMock = mock[IRecordProcessorCheckpointer]
8071
checkpointClockMock = mock[ManualClock]
8172
checkpointStateMock = mock[KinesisCheckpointState]
8273
currentClockMock = mock[Clock]
8374
}
8475

76+
test("kinesis utils api") {
77+
val ssc = new StreamingContext(master, framework, batchDuration)
78+
// Tests the API, does not actually test data receiving
79+
val kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
80+
"https://kinesis.us-west-2.amazonaws.com", Seconds(2),
81+
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2);
82+
ssc.stop()
83+
}
84+
8585
test("process records including store and checkpoint") {
8686
val expectedCheckpointIntervalMillis = 10
8787
expecting {

0 commit comments

Comments
 (0)