Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
import com.amazonaws.services.kinesis.model._

Expand Down Expand Up @@ -71,14 +69,12 @@ class KinesisBackedBlockRDDPartition(
private[kinesis]
class KinesisBackedBlockRDD[T: ClassTag](
sc: SparkContext,
val regionName: String,
val endpointUrl: String,
val config: KinesisConfig,
@transient private val _blockIds: Array[BlockId],
@transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
@transient private val isBlockIdValid: Array[Boolean] = Array.empty,
val retryTimeoutMs: Int = 10000,
val messageHandler: Record => T = KinesisUtils.defaultMessageHandler _,
val awsCredentialsOption: Option[SerializableAWSCredentials] = None
val messageHandler: Record => T = KinesisUtils.defaultMessageHandler _
) extends BlockRDD[T](sc, _blockIds) {

require(_blockIds.length == arrayOfseqNumberRanges.length,
Expand All @@ -104,12 +100,8 @@ class KinesisBackedBlockRDD[T: ClassTag](
}

def getBlockFromKinesis(): Iterator[T] = {
val credentials = awsCredentialsOption.getOrElse {
new DefaultAWSCredentialsProviderChain().getCredentials()
}
partition.seqNumberRanges.ranges.iterator.flatMap { range =>
new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
range, retryTimeoutMs).map(messageHandler)
new KinesisSequenceRangeIterator(config, range, retryTimeoutMs).map(messageHandler)
}
}
if (partition.isBlockIdValid) {
Expand All @@ -128,22 +120,18 @@ class KinesisBackedBlockRDD[T: ClassTag](
*/
private[kinesis]
class KinesisSequenceRangeIterator(
credentials: AWSCredentials,
endpointUrl: String,
regionId: String,
config: KinesisConfig,
range: SequenceNumberRange,
retryTimeoutMs: Int) extends NextIterator[Record] with Logging {

private val client = new AmazonKinesisClient(credentials)
private val client = config.buildKinesisClient()
private val streamName = range.streamName
private val shardId = range.shardId

private var toSeqNumberReceived = false
private var lastSeqNumber: String = null
private var internalIterator: Iterator[Record] = null

client.setEndpoint(endpointUrl, "kinesis", regionId)

override protected def getNext(): Record = {
var nextRecord: Record = null
if (toSeqNumberReceived) {
Expand Down Expand Up @@ -205,7 +193,7 @@ class KinesisSequenceRangeIterator(
private def getRecordsAndNextKinesisIterator(
shardIterator: String): (Iterator[Record], String) = {
val getRecordsRequest = new GetRecordsRequest
getRecordsRequest.setRequestCredentials(credentials)
getRecordsRequest.setRequestCredentials(config.awsCredentials)
getRecordsRequest.setShardIterator(shardIterator)
val getRecordsResult = retryOrTimeout[GetRecordsResult](
s"getting records using shard iterator") {
Expand All @@ -225,7 +213,7 @@ class KinesisSequenceRangeIterator(
iteratorType: ShardIteratorType,
sequenceNumber: String): String = {
val getShardIteratorRequest = new GetShardIteratorRequest
getShardIteratorRequest.setRequestCredentials(credentials)
getShardIteratorRequest.setRequestCredentials(config.awsCredentials)
getShardIteratorRequest.setStreamName(streamName)
getShardIteratorRequest.setShardId(shardId)
getShardIteratorRequest.setShardIteratorType(iteratorType.toString)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kinesis

import scala.reflect.ClassTag

import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, DefaultAWSCredentialsProviderChain}
import com.amazonaws.regions.{Region, RegionUtils}
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration}


/**
* Configuration container for settings to be passed down into the kinesis-client-library (KCL).
* This class is also used to build any of the client instances used by the KCL so we
* can override the things like the endpoint.
*
*
* @param kinesisAppName The name of kinesis application (used in creating dynamo tables)
* @param streamName The name of the actual kinesis stream
* @param endpointUrl The AWS API endpoint that will be used for the kinesis client
* @param regionName The AWS region that will be connected to
* (will set default enpoint for dynamo and cloudwatch)
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
* @param awsCredentialsOption None or Some instance of SerializableAWSCredentials that
* will be used for credentials for Kinesis and the default
* for other clients. If None, then the
* DefaultAWSCredentialsProviderChain will be used
* @param dynamoEndpointUrl None or Some AWS API endpoint that will be used for
* the DynamoDBClient, if None, then the regionName
* will be used to build the default endpoint
* @param dynamoCredentials None or Some SerializableAWSCredentials that will be used
* as the credentials. If None, then the
* DefaultProviderKeychain will be used to build credentials
*
*/
case class KinesisConfig(
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream = InitialPositionInStream.TRIM_HORIZON,
awsCredentialsOption: Option[SerializableAWSCredentials] = None,
dynamoEndpointUrl: Option[String] = None,
dynamoCredentials: Option[SerializableAWSCredentials] = None
) extends Serializable {

/**
* Builds a KinesisClientLibConfiguration object, which contains all the configuration options
* See the
* <a href="http://bit.ly/1oyynyW">KinesisClientLibConfiguration docs</a>
* for more info:
*
*
* @param workerId A unique string to identify a worker
*/
def buildKCLConfig(workerId: String): KinesisClientLibConfiguration = {
// KCL config instance
val kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
kinesisAppName,
streamName,
resolveAWSCredentialsProvider(),
workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
return kinesisClientLibConfiguration

}


/**
* Returns a AmazonDynamoDBClient instance configured with the proper region/endpoint
*/
def buildDynamoClient(): AmazonDynamoDBClient = {
val client = if (dynamoCredentials.isDefined) {
new AmazonDynamoDBClient(resolveAWSCredentialsProvider(dynamoCredentials))
} else {
new AmazonDynamoDBClient(resolveAWSCredentialsProvider())
}

if (dynamoEndpointUrl.isDefined) {
client.withEndpoint(dynamoEndpointUrl.get)
} else {
client.withRegion(region)
}
}

/**
* Returns a AmazonKinesisClient instance configured with the proper region/endpoint
*/
def buildKinesisClient(): AmazonKinesisClient = {
val client = new AmazonKinesisClient(resolveAWSCredentialsProvider())
client.withEndpoint(endpointUrl)

}

/**
* Returns a AmazonCloudWatchClient instance configured with the proper region/endpoint
*/
def buildCloudwatchClient(): AmazonCloudWatchClient = {
val client = new AmazonCloudWatchClient(resolveAWSCredentialsProvider())
client.withRegion(region)
}

/**
* Returns the provided credentials or resolves a
* pair of credentials using DefaultAWSCredentialsProviderChain
*/
def awsCredentials: AWSCredentials = {
resolveAWSCredentialsProvider().getCredentials()
}


/**
* If AWS credential is provided, return a AWSCredentialProvider returning that credential.
* Otherwise, return the DefaultAWSCredentialsProviderChain.
*/
private def resolveAWSCredentialsProvider(
awsCredOpt: Option[SerializableAWSCredentials] = awsCredentialsOption
): AWSCredentialsProvider = {
awsCredOpt match {
case Some(awsCredentials) =>
new AWSCredentialsProvider {
override def getCredentials: AWSCredentials = awsCredentials
override def refresh(): Unit = { }
}
case None =>
new DefaultAWSCredentialsProviderChain()
}
}

/**
* Resolves string region into the region object
*/
private def region: Region = {
RegionUtils.getRegion(regionName)
}

}

/**
* A small class that extends AWSCredentials that is marked as serializable, which
* is needed in order to have it serialized into a spark context
*
* @param accessKeyId An AWS accessKeyId
* @param secretKey An AWS secretKey
*/
case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
extends AWSCredentials {
override def getAWSAccessKeyId: String = accessKeyId
override def getAWSSecretKey: String = secretKey
}

private object KinesisConfig {
/**
* @param kinesisAppName Kinesis application name used by the Kinesis Client Library
* (KCL) to update DynamoDB
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Name of region used by the Kinesis Client Library (KCL) to update
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
*
*/
def buildConfig(
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream = InitialPositionInStream.TRIM_HORIZON,
awsCredentialsOption: Option[SerializableAWSCredentials] = None): KinesisConfig = {
new KinesisConfig(kinesisAppName, streamName, endpointUrl,
regionName, initialPositionInStream, awsCredentialsOption)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.streaming.kinesis

import scala.reflect.ClassTag

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.model.Record

import org.apache.spark.rdd.RDD
Expand All @@ -31,15 +30,10 @@ import org.apache.spark.streaming.scheduler.ReceivedBlockInfo

private[kinesis] class KinesisInputDStream[T: ClassTag](
_ssc: StreamingContext,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointAppName: String,
config: KinesisConfig,
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T,
awsCredentialsOption: Option[SerializableAWSCredentials]
messageHandler: Record => T
) extends ReceiverInputDStream[T](_ssc) {

private[streaming]
Expand All @@ -57,11 +51,10 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
new KinesisBackedBlockRDD(
context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
context.sc, config, blockIds, seqNumRanges,
isBlockIdValid = isBlockIdValid,
retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
messageHandler = messageHandler,
awsCredentialsOption = awsCredentialsOption)
messageHandler = messageHandler)
} else {
logWarning("Kinesis sequence number information was not present with some block metadata," +
" it may not be possible to recover from failures")
Expand All @@ -70,7 +63,6 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
}

override def getReceiver(): Receiver[T] = {
new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream,
checkpointAppName, checkpointInterval, storageLevel, messageHandler, awsCredentialsOption)
new KinesisReceiver(config, checkpointInterval, storageLevel, messageHandler)
}
}
Loading