Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ object KinesisInputDStream {
getRequiredParam(checkpointAppName, "checkpointAppName"),
checkpointInterval.getOrElse(ssc.graph.batchDuration),
storageLevel.getOrElse(DEFAULT_STORAGE_LEVEL),
handler,
ssc.sc.clean(handler),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is required.
KinesisUtils used to send a cleaned Handler while creating stream, so we do this in the KinesisInputDstream now.

val cleanedHandler = ssc.sc.clean(messageHandler)

kinesisCredsProvider.getOrElse(DefaultCredentials),
dynamoDBCredsProvider,
cloudWatchCredsProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random

import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.model.Record
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
Expand Down Expand Up @@ -173,11 +172,15 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
* and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
*/
testIfEnabled("basic operation") {
val awsCredentials = KinesisTestUtils.getAWSCredentials()
val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
Seconds(10), StorageLevel.MEMORY_ONLY,
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
val stream = KinesisInputDStream.builder.streamingContext(ssc)
.checkpointAppName(appName)
.streamName(testUtils.streamName)
.endpointUrl(testUtils.endpointUrl)
.regionName(testUtils.regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
.build()

val collected = new mutable.HashSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
Expand All @@ -198,12 +201,17 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
}

testIfEnabled("custom message handling") {
val awsCredentials = KinesisTestUtils.getAWSCredentials()
def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
Seconds(10), StorageLevel.MEMORY_ONLY, addFive(_),
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)

val stream = KinesisInputDStream.builder.streamingContext(ssc)
.checkpointAppName(appName)
.streamName(testUtils.streamName)
.endpointUrl(testUtils.endpointUrl)
.regionName(testUtils.regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
.buildWithMessageHandler(addFive(_))

stream shouldBe a [ReceiverInputDStream[_]]

Expand Down Expand Up @@ -233,11 +241,15 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
val localTestUtils = new KPLBasedKinesisTestUtils(1)
localTestUtils.createStream()
try {
val awsCredentials = KinesisTestUtils.getAWSCredentials()
val stream = KinesisUtils.createStream(ssc, localAppName, localTestUtils.streamName,
localTestUtils.endpointUrl, localTestUtils.regionName, InitialPositionInStream.LATEST,
Seconds(10), StorageLevel.MEMORY_ONLY,
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
val stream = KinesisInputDStream.builder.streamingContext(ssc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my understanding, it's no longer necessary to pass in the credentials explicitly?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes thats true. The KinesisInputDStream.builder uses the Dafault credentials if no creds are passed, and the default creds work with both AWS Key and Session tokens.
So now "mvn test" works on both permanent aws keys & session based token environments.

.checkpointAppName(localAppName)
.streamName(localTestUtils.streamName)
.endpointUrl(localTestUtils.endpointUrl)
.regionName(localTestUtils.regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
.build()

val collected = new mutable.HashSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
Expand Down Expand Up @@ -303,13 +315,17 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
ssc = new StreamingContext(sc, Milliseconds(1000))
ssc.checkpoint(checkpointDir)

val awsCredentials = KinesisTestUtils.getAWSCredentials()
val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]

val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
Seconds(10), StorageLevel.MEMORY_ONLY,
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
val kinesisStream = KinesisInputDStream.builder.streamingContext(ssc)
.checkpointAppName(appName)
.streamName(testUtils.streamName)
.endpointUrl(testUtils.endpointUrl)
.regionName(testUtils.regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
.build()

// Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect the data in each batch
kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
Expand Down