Skip to content

Commit b477090

Browse files
committed
Kinesis Testcases
- add new kinesis api testcases - add flag to disable old kinesis api testcases
1 parent 9712bd3 commit b477090

File tree

2 files changed

+38
-22
lines changed

2 files changed

+38
-22
lines changed

external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ object KinesisInputDStream {
267267
getRequiredParam(checkpointAppName, "checkpointAppName"),
268268
checkpointInterval.getOrElse(ssc.graph.batchDuration),
269269
storageLevel.getOrElse(DEFAULT_STORAGE_LEVEL),
270-
handler,
270+
ssc.sc.clean(handler),
271271
kinesisCredsProvider.getOrElse(DefaultCredentials),
272272
dynamoDBCredsProvider,
273273
cloudWatchCredsProvider)

external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import scala.concurrent.duration._
2222
import scala.language.postfixOps
2323
import scala.util.Random
2424

25-
import com.amazonaws.regions.RegionUtils
2625
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
2726
import com.amazonaws.services.kinesis.model.Record
2827
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
@@ -173,11 +172,15 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
173172
* and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
174173
*/
175174
testIfEnabled("basic operation") {
176-
val awsCredentials = KinesisTestUtils.getAWSCredentials()
177-
val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
178-
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
179-
Seconds(10), StorageLevel.MEMORY_ONLY,
180-
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
175+
val stream = KinesisInputDStream.builder.streamingContext(ssc)
176+
.checkpointAppName(appName)
177+
.streamName(testUtils.streamName)
178+
.endpointUrl(testUtils.endpointUrl)
179+
.regionName(testUtils.regionName)
180+
.initialPositionInStream(InitialPositionInStream.LATEST)
181+
.checkpointInterval(Seconds(10))
182+
.storageLevel(StorageLevel.MEMORY_ONLY)
183+
.build()
181184

182185
val collected = new mutable.HashSet[Int]
183186
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
@@ -198,12 +201,17 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
198201
}
199202

200203
testIfEnabled("custom message handling") {
201-
val awsCredentials = KinesisTestUtils.getAWSCredentials()
202204
def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
203-
val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
204-
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
205-
Seconds(10), StorageLevel.MEMORY_ONLY, addFive(_),
206-
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
205+
206+
val stream = KinesisInputDStream.builder.streamingContext(ssc)
207+
.checkpointAppName(appName)
208+
.streamName(testUtils.streamName)
209+
.endpointUrl(testUtils.endpointUrl)
210+
.regionName(testUtils.regionName)
211+
.initialPositionInStream(InitialPositionInStream.LATEST)
212+
.checkpointInterval(Seconds(10))
213+
.storageLevel(StorageLevel.MEMORY_ONLY)
214+
.buildWithMessageHandler(addFive(_))
207215

208216
stream shouldBe a [ReceiverInputDStream[_]]
209217

@@ -233,11 +241,15 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
233241
val localTestUtils = new KPLBasedKinesisTestUtils(1)
234242
localTestUtils.createStream()
235243
try {
236-
val awsCredentials = KinesisTestUtils.getAWSCredentials()
237-
val stream = KinesisUtils.createStream(ssc, localAppName, localTestUtils.streamName,
238-
localTestUtils.endpointUrl, localTestUtils.regionName, InitialPositionInStream.LATEST,
239-
Seconds(10), StorageLevel.MEMORY_ONLY,
240-
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
244+
val stream = KinesisInputDStream.builder.streamingContext(ssc)
245+
.checkpointAppName(localAppName)
246+
.streamName(localTestUtils.streamName)
247+
.endpointUrl(localTestUtils.endpointUrl)
248+
.regionName(localTestUtils.regionName)
249+
.initialPositionInStream(InitialPositionInStream.LATEST)
250+
.checkpointInterval(Seconds(10))
251+
.storageLevel(StorageLevel.MEMORY_ONLY)
252+
.build()
241253

242254
val collected = new mutable.HashSet[Int]
243255
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
@@ -303,13 +315,17 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
303315
ssc = new StreamingContext(sc, Milliseconds(1000))
304316
ssc.checkpoint(checkpointDir)
305317

306-
val awsCredentials = KinesisTestUtils.getAWSCredentials()
307318
val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
308319

309-
val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
310-
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
311-
Seconds(10), StorageLevel.MEMORY_ONLY,
312-
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
320+
val kinesisStream = KinesisInputDStream.builder.streamingContext(ssc)
321+
.checkpointAppName(appName)
322+
.streamName(testUtils.streamName)
323+
.endpointUrl(testUtils.endpointUrl)
324+
.regionName(testUtils.regionName)
325+
.initialPositionInStream(InitialPositionInStream.LATEST)
326+
.checkpointInterval(Seconds(10))
327+
.storageLevel(StorageLevel.MEMORY_ONLY)
328+
.build()
313329

314330
// Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect the data in each batch
315331
kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {

0 commit comments

Comments
 (0)