From 3ae0814c3f6a09ec5bebda51950290e17220c6aa Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 19 Jul 2015 18:50:34 -0700 Subject: [PATCH 01/11] Added KinesisBackedBlockRDD --- .../kinesis/KinesisBackedBlockRDD.scala | 208 ++++++++++++++++ .../kinesis/KinesisBackedBlockRDDSuite.scala | 225 ++++++++++++++++++ 2 files changed, 433 insertions(+) create mode 100644 extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala create mode 100644 extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala new file mode 100644 index 000000000000..8084144a1eb5 --- /dev/null +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -0,0 +1,208 @@ +package org.apache.spark.streaming.kinesis + +import scala.collection.JavaConversions._ + +import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain} +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.model._ + +import org.apache.spark.rdd.{BlockRDD, BlockRDDPartition} +import org.apache.spark.storage.BlockId +import org.apache.spark.util.NextIterator +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} + +private[kinesis] +case class SequenceNumberRange( + streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String) + +private[kinesis] +case class SequenceNumberRanges(ranges: Array[SequenceNumberRange]) { + def isEmpty(): Boolean = ranges.isEmpty + def nonEmpty(): Boolean = ranges.nonEmpty + override def toString(): String = ranges.mkString("SequenceNumberRanges(", ", ", ")") +} + +private[kinesis] +object SequenceNumberRanges { + + def apply(range: SequenceNumberRange): SequenceNumberRanges = { + new SequenceNumberRanges(Array(range)) + } + + def apply(ranges: Seq[SequenceNumberRange]): SequenceNumberRanges = { + new SequenceNumberRanges(ranges.toArray) + } + + def empty: SequenceNumberRanges = { + new SequenceNumberRanges(Array.empty) + } +} + +private[kinesis] +class KinesisBackedBlockRDDPartition( + idx: Int, + blockId: BlockId, + val isBlockIdValid: Boolean, + val seqNumberRanges: SequenceNumberRanges + ) extends BlockRDDPartition(blockId, idx) + +private[kinesis] +class KinesisBackedBlockRDD( + sc: SparkContext, + regionId: String, + endpointUrl: String, + @transient blockIds: Array[BlockId], + @transient arrayOfseqNumberRanges: Array[SequenceNumberRanges], + @transient isBlockIdValid: Array[Boolean] = Array.empty +) extends BlockRDD[Array[Byte]](sc, blockIds) { + + require(blockIds.length == arrayOfseqNumberRanges.length, + "Number of blockIds is not equal to the number of sequence number ranges") + + override def isValid(): Boolean = true + + override def getPartitions: Array[Partition] = { + Array.tabulate(blockIds.length) { i => + val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i) + new KinesisBackedBlockRDDPartition(i, blockIds(i), isValid, arrayOfseqNumberRanges(i)) + } + } + + override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { + val blockManager = SparkEnv.get.blockManager + val partition = split.asInstanceOf[KinesisBackedBlockRDDPartition] + val blockId = partition.blockId + + def getBlockFromBlockManager(): Option[Iterator[Array[Byte]]] = { + logDebug(s"Read partition data of $this from block manager, block $blockId") + blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[Array[Byte]]]) + } + + def getBlockFromKinesis(): Iterator[Array[Byte]] = { + val credenentials = new DefaultAWSCredentialsProviderChain().getCredentials() + partition.seqNumberRanges.ranges.iterator.flatMap { range => + new KinesisSequenceRangeIterator( + credenentials, endpointUrl, regionId, + range.streamName, range.shardId, range.fromSeqNumber, range.toSeqNumber) + } + } + if (partition.isBlockIdValid) { + getBlockFromBlockManager().getOrElse { getBlockFromKinesis() } + } else { + getBlockFromKinesis() + } + } +} + + +private[kinesis] +class KinesisSequenceRangeIterator( + credentials: AWSCredentials, + endpointUrl: String, + regionId: String, + streamName: String, + shardId: String, + fromSeqNumber: String, + toSeqNumber: String + ) extends NextIterator[Array[Byte]] { + + private val backoffTimeMillis = 1000 + private val client = new AmazonKinesisClient(credentials) + + private var toSeqNumberReceived = false + private var lastSeqNumber: String = null + private var internalIterator: Iterator[Record] = null + + client.setEndpoint(endpointUrl, "kinesis", regionId) + + override protected def getNext(): Array[Byte] = { + var nextBytes: Array[Byte] = null + if (toSeqNumberReceived) { + finished = true + } else { + + if (internalIterator == null) { + + // If the internal iterator has not been initialized, + // then fetch records from starting sequence number + getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, fromSeqNumber) + } else if (!internalIterator.hasNext) { + + // If the internal iterator does not have any more records, + // then fetch more records after the last consumed sequence number + getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber) + } + + if (!internalIterator.hasNext) { + + // If the internal iterator still does not have any data, then throw exception + // and terminate this iterator + finished = true + throw new Exception("Could not read until the specified end sequence number: " + + s"shardId = $shardId, fromSequenceNumber = $fromSeqNumber, " + + s"toSequenceNumber = $toSeqNumber") + } else { + + // Get the record, and remember its sequence number + val nextRecord = internalIterator.next() + nextBytes = nextRecord.getData().array() + lastSeqNumber = nextRecord.getSequenceNumber() + + // If the this record's sequence number matches the stopping sequence number, then make sure + // the iterator is marked finished next time getNext() is called + if (nextRecord.getSequenceNumber == toSeqNumber) { + toSeqNumberReceived = true + } + } + + } + nextBytes + } + + override protected def close(): Unit = { } + + private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Unit = { + val shardIterator = getKinesisIterator(streamName, shardId, iteratorType, seqNum) + var records: Seq[Record] = null + do { + try { + val getResult = getRecordsAndNextKinesisIterator(streamName, shardId, shardIterator) + records = getResult._1 + } catch { + case ptee: ProvisionedThroughputExceededException => + Thread.sleep(backoffTimeMillis) + } + } while (records == null || records.length == 0) // TODO: put a limit on the number of retries + if (records != null && records.nonEmpty) { + internalIterator = records.iterator + } + } + + private def getRecordsAndNextKinesisIterator( + streamName: String, + shardId: String, + shardIterator: String + ): (Seq[Record], String) = { + val getRecordsRequest = new GetRecordsRequest + getRecordsRequest.setRequestCredentials(credentials) + getRecordsRequest.setShardIterator(shardIterator) + val getRecordsResult = client.getRecords(getRecordsRequest) + (getRecordsResult.getRecords, getRecordsResult.getNextShardIterator) + } + + private def getKinesisIterator( + streamName: String, + shardId: String, + iteratorType: ShardIteratorType, + sequenceNumber: String + ): String = { + val getShardIteratorRequest = new GetShardIteratorRequest + getShardIteratorRequest.setRequestCredentials(credentials) + getShardIteratorRequest.setStreamName(streamName) + getShardIteratorRequest.setShardId(shardId) + getShardIteratorRequest.setShardIteratorType(iteratorType.toString) + getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber) + val getShardIteratorResult = client.getShardIterator(getShardIteratorRequest) + getShardIteratorResult.getShardIterator + } +} diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala new file mode 100644 index 000000000000..9ab6c9256b3c --- /dev/null +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala @@ -0,0 +1,225 @@ +package org.apache.spark.streaming.kinesis + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} +import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite} + +class KinesisBackedBlockRDDSuite extends SparkFunSuite + with KinesisSuiteHelper with BeforeAndAfterAll { + + private val regionId = "us-east-1" + private val endpointUrl = "https://kinesis.us-east-1.amazonaws.com" + + private val testUtils = new KinesisTestUtils(endpointUrl) + + private val testData = 1 to 8 + + private var sc: SparkContext = null + private var blockManager: BlockManager = null + + private var shardIds: Seq[String] = null + private var shardIdToData: Map[String, Seq[Int]] = null + private var shardIdToSeqNumbers: Map[String, Seq[String]] = null + private var shardIdToDataAndSeqNumbers: Map[String, Seq[(Int, String)]] = null + private var shardIdToRange: Map[String, SequenceNumberRange] = null + private var allRanges: Seq[SequenceNumberRange] = null + + override def beforeAll(): Unit = { + testUtils.createStream() + + shardIdToDataAndSeqNumbers = testUtils.pushData(testData) + require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards") + + shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq + shardIdToData = shardIdToDataAndSeqNumbers.mapValues { _.map { _._1 }} + shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }} + shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) => + val seqNumRange = SequenceNumberRange( + testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last) + (shardId, seqNumRange) + } + allRanges = shardIdToRange.values.toSeq + + val conf = new SparkConf().setMaster("local[4]").setAppName("KinesisBackedBlockRDDSuite") + sc = new SparkContext(conf) + blockManager = sc.env.blockManager + } + + override def afterAll(): Unit = { + sc.stop() + } + + testOrIgnore("Basic reading from Kinesis") { + // Verify all data using multiple ranges in a single RDD partition + val receivedData1 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, + fakeBlockIds(1), + Array(SequenceNumberRanges(allRanges.toArray)) + ).map { bytes => new String(bytes).toInt }.collect() + assert(receivedData1.toSet === testData.toSet) + + // Verify all data using one range in each of the multiple RDD partitions + val receivedData2 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, + fakeBlockIds(allRanges.size), + allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray + ).map { bytes => new String(bytes).toInt }.collect() + assert(receivedData2.toSet === testData.toSet) + + // Verify ordering within each partition + val receivedData3 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, + fakeBlockIds(allRanges.size), + allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray + ).map { bytes => new String(bytes).toInt }.collectPartitions() + assert(receivedData3.length === allRanges.size) + for (i <- 0 until allRanges.size) { + assert(receivedData3(i).toSeq === shardIdToData(allRanges(i).shardId)) + } + } + + testOrIgnore("Read data available in both block manager and Kinesis") { + testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2) + } + + testOrIgnore("Read data available only in block manager, not in Kinesis") { + testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0) + } + + testOrIgnore("Read data available only in Kinesis, not in block manager") { + testRDD(numPartitions = 2, numPartitionsInBM = 0, numPartitionsInKinesis = 2) + } + + testOrIgnore("Read data available partially in block manager, rest in Kinesis") { + testRDD(numPartitions = 2, numPartitionsInBM = 1, numPartitionsInKinesis = 1) + } + + testOrIgnore("Test isBlockValid skips block fetching from block manager") { + testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0, + testIsBlockValid = true) + } + + testOrIgnore("Test whether RDD is valid after removing blocks from block anager") { + testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2, + testBlockRemove = true) + } + + /** + * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager + * and the rest to a write ahead log, and then reading reading it all back using the RDD. + * It can also test if the partitions that were read from the log were again stored in + * block manager. + * + * + * + * @param numPartitions Number of partitions in RDD + * @param numPartitionsInBM Number of partitions to write to the BlockManager. + * Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager + * @param numPartitionsInKinesis Number of partitions to write to the Kinesis. + * Partitions (numPartitions - 1 - numPartitionsInKinesis) to + * (numPartitions - 1) will be written to Kinesis + * @param testIsBlockValid Test whether setting isBlockValid to false skips block fetching + * @param testBlockRemove Test whether calling rdd.removeBlock() makes the RDD still usable with + * reads falling back to the WAL + * Example with numPartitions = 5, numPartitionsInBM = 3, and numPartitionsInWAL = 4 + * + * numPartitionsInBM = 3 + * |------------------| + * | | + * 0 1 2 3 4 + * | | + * |-------------------------| + * numPartitionsInKinesis = 4 + */ + private def testRDD( + numPartitions: Int, + numPartitionsInBM: Int, + numPartitionsInKinesis: Int, + testIsBlockValid: Boolean = false, + testBlockRemove: Boolean = false + ): Unit = { + require(shardIds.size > 1, "Need at least 2 shards to test") + require(numPartitionsInBM <= shardIds.size , + "Number of partitions in BlockManager cannot be more than the Kinesis test shards available") + require(numPartitionsInKinesis <= shardIds.size , + "Number of partitions in Kinesis cannot be more than the Kinesis test shards available") + require(numPartitionsInBM <= numPartitions, + "Number of partitions in BlockManager cannot be more than that in RDD") + require(numPartitionsInKinesis <= numPartitions, + "Number of partitions in Kinesis cannot be more than that in RDD") + + // Put necessary blocks in the block manager + val blockIds = fakeBlockIds(numPartitions) + blockIds.foreach(blockManager.removeBlock(_)) + (0 until numPartitionsInBM).foreach { i => + val blockData = shardIdToData(shardIds(i)).iterator.map { _.toString.getBytes() } + blockManager.putIterator(blockIds(i), blockData, StorageLevel.MEMORY_ONLY) + } + + // Create the necessary ranges to use in the RDD + val fakeRanges = Array.fill(numPartitions - numPartitionsInKinesis)( + SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))) + val realRanges = Array.tabulate(numPartitionsInKinesis) { i => + val range = shardIdToRange(shardIds(i + (numPartitions - numPartitionsInKinesis))) + SequenceNumberRanges(Array(range)) + } + val ranges = (fakeRanges ++ realRanges) + + + // Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not + require( + blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty), + "Expected blocks not in BlockManager" + ) + + require( + blockIds.drop(numPartitionsInBM).forall(blockManager.get(_).isEmpty), + "Unexpected blocks in BlockManager" + ) + + // Make sure that the right sequence `numPartitionsInKinesis` are configured, and others are not + require( + ranges.takeRight(numPartitionsInKinesis).forall { + _.ranges.forall { _.streamName == testUtils.streamName } + }, "Incorrect configuration of RDD, expected ranges not set: " + ) + + require( + ranges.dropRight(numPartitionsInKinesis).forall { + _.ranges.forall { _.streamName != testUtils.streamName } + }, "Incorrect configuration of RDD, unexpected ranges set" + ) + + val rdd = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, blockIds, ranges) + val collectedData = rdd.map { bytes => + new String(bytes).toInt + }.collect() + assert(collectedData.toSet === testData.toSet) + + // Verify that the block fetching is skipped when isBlockValid is set to false. + // This is done by using a RDD whose data is only in memory but is set to skip block fetching + // Using that RDD will throw exception, as it skips block fetching even if the blocks are in + // in BlockManager. + if (testIsBlockValid) { + require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager") + require(numPartitionsInKinesis === 0, "No partitions must be in Kinesis") + val rdd2 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, blockIds.toArray, + ranges, isBlockIdValid = Array.fill(blockIds.length)(false)) + intercept[SparkException] { + rdd2.collect() + } + } + + // Verify that the RDD is not invalid after the blocks are removed and can still read data + // from write ahead log + if (testBlockRemove) { + require(numPartitions === numPartitionsInKinesis, "All partitions must be in WAL for this test") + require(numPartitionsInBM > 0, "Some partitions must be in BlockManager for this test") + rdd.removeBlocks() + assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSet === testData.toSet) + } + } + + /** Generate fake block ids */ + private def fakeBlockIds(num: Int): Array[BlockId] = { + Array.tabulate(num) { i => new StreamBlockId(0, i) } + } +} From 5da39958d1f1d168f10a0a84bd1789deb9ff338e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 21 Jul 2015 15:31:40 -0700 Subject: [PATCH 02/11] Changed KinesisSuiteHelper to KinesisFunSuite --- .../streaming/kinesis/KinesisTestUtils.scala | 2 +- .../kinesis/KinesisBackedBlockRDDSuite.scala | 67 ++++++++++--------- .../streaming/kinesis/KinesisFunSuite.scala | 13 +++- .../kinesis/KinesisStreamSuite.scala | 4 +- 4 files changed, 48 insertions(+), 38 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index f6bf552e6bb8..0ff1b7ed0fd9 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -177,7 +177,7 @@ private class KinesisTestUtils( private[kinesis] object KinesisTestUtils { - val envVarName = "RUN_KINESIS_TESTS" + val envVarName = "ENABLE_KINESIS_TESTS" val shouldRunTests = sys.env.get(envVarName) == Some("1") diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala index 9ab6c9256b3c..593a01ec0916 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala @@ -1,23 +1,17 @@ package org.apache.spark.streaming.kinesis -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite} -class KinesisBackedBlockRDDSuite extends SparkFunSuite - with KinesisSuiteHelper with BeforeAndAfterAll { +class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll { private val regionId = "us-east-1" private val endpointUrl = "https://kinesis.us-east-1.amazonaws.com" - - private val testUtils = new KinesisTestUtils(endpointUrl) - private val testData = 1 to 8 - private var sc: SparkContext = null - private var blockManager: BlockManager = null - + private var testUtils: KinesisTestUtils = null private var shardIds: Seq[String] = null private var shardIdToData: Map[String, Seq[Int]] = null private var shardIdToSeqNumbers: Map[String, Seq[String]] = null @@ -25,32 +19,39 @@ class KinesisBackedBlockRDDSuite extends SparkFunSuite private var shardIdToRange: Map[String, SequenceNumberRange] = null private var allRanges: Seq[SequenceNumberRange] = null + private var sc: SparkContext = null + private var blockManager: BlockManager = null + + override def beforeAll(): Unit = { - testUtils.createStream() - - shardIdToDataAndSeqNumbers = testUtils.pushData(testData) - require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards") - - shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq - shardIdToData = shardIdToDataAndSeqNumbers.mapValues { _.map { _._1 }} - shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }} - shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) => - val seqNumRange = SequenceNumberRange( - testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last) - (shardId, seqNumRange) - } - allRanges = shardIdToRange.values.toSeq + runIfTestsEnabled("Prepare KinesisTestUtils") { + testUtils = new KinesisTestUtils(endpointUrl) + testUtils.createStream() + + shardIdToDataAndSeqNumbers = testUtils.pushData(testData) + require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards") + + shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq + shardIdToData = shardIdToDataAndSeqNumbers.mapValues { _.map { _._1 }} + shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }} + shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) => + val seqNumRange = SequenceNumberRange( + testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last) + (shardId, seqNumRange) + } + allRanges = shardIdToRange.values.toSeq - val conf = new SparkConf().setMaster("local[4]").setAppName("KinesisBackedBlockRDDSuite") - sc = new SparkContext(conf) - blockManager = sc.env.blockManager + val conf = new SparkConf().setMaster("local[4]").setAppName("KinesisBackedBlockRDDSuite") + sc = new SparkContext(conf) + blockManager = sc.env.blockManager + } } override def afterAll(): Unit = { sc.stop() } - testOrIgnore("Basic reading from Kinesis") { + testIfEnabled("Basic reading from Kinesis") { // Verify all data using multiple ranges in a single RDD partition val receivedData1 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, fakeBlockIds(1), @@ -76,28 +77,28 @@ class KinesisBackedBlockRDDSuite extends SparkFunSuite } } - testOrIgnore("Read data available in both block manager and Kinesis") { + testIfEnabled("Read data available in both block manager and Kinesis") { testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2) } - testOrIgnore("Read data available only in block manager, not in Kinesis") { + testIfEnabled("Read data available only in block manager, not in Kinesis") { testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0) } - testOrIgnore("Read data available only in Kinesis, not in block manager") { + testIfEnabled("Read data available only in Kinesis, not in block manager") { testRDD(numPartitions = 2, numPartitionsInBM = 0, numPartitionsInKinesis = 2) } - testOrIgnore("Read data available partially in block manager, rest in Kinesis") { + testIfEnabled("Read data available partially in block manager, rest in Kinesis") { testRDD(numPartitions = 2, numPartitionsInBM = 1, numPartitionsInKinesis = 1) } - testOrIgnore("Test isBlockValid skips block fetching from block manager") { + testIfEnabled("Test isBlockValid skips block fetching from block manager") { testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0, testIsBlockValid = true) } - testOrIgnore("Test whether RDD is valid after removing blocks from block anager") { + testIfEnabled("Test whether RDD is valid after removing blocks from block anager") { testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2, testBlockRemove = true) } diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala index 6d011f295e7f..8373138785a8 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala @@ -23,15 +23,24 @@ import org.apache.spark.SparkFunSuite * Helper class that runs Kinesis real data transfer tests or * ignores them based on env variable is set or not. */ -trait KinesisSuiteHelper { self: SparkFunSuite => +trait KinesisFunSuite extends SparkFunSuite { import KinesisTestUtils._ /** Run the test if environment variable is set or ignore the test */ - def testOrIgnore(testName: String)(testBody: => Unit) { + def testIfEnabled(testName: String)(testBody: => Unit) { if (shouldRunTests) { test(testName)(testBody) } else { ignore(s"$testName [enable by setting env var $envVarName=1]")(testBody) } } + + /** Run the give body of code only if Kinesis tests are enabled */ + def runIfTestsEnabled(message: String)(body: => Unit): Unit = { + if (shouldRunTests) { + body + } else { + ignore(s"$message [enable by setting env var $envVarName=1]")() + } + } } diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 50f71413abf3..f9c952b9468b 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} -class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper +class KinesisStreamSuite extends KinesisFunSuite with Eventually with BeforeAndAfter with BeforeAndAfterAll { // This is the name that KCL uses to save metadata to DynamoDB @@ -83,7 +83,7 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper * you must have AWS credentials available through the default AWS provider chain, * and you have to set the system environment variable RUN_KINESIS_TESTS=1 . */ - testOrIgnore("basic operation") { + testIfEnabled("basic operation") { val kinesisTestUtils = new KinesisTestUtils() try { kinesisTestUtils.createStream() From 4a3609696da46325b1605d8d526a00df3b333328 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 21 Jul 2015 16:19:52 -0700 Subject: [PATCH 03/11] Add license --- .../kinesis/KinesisBackedBlockRDD.scala | 17 +++++++++++++++++ .../kinesis/KinesisBackedBlockRDDSuite.scala | 17 +++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index 8084144a1eb5..4dcb035a065d 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming.kinesis import scala.collection.JavaConversions._ diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala index 593a01ec0916..b96d418403e4 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming.kinesis import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} From 575bdbcc5ccf766ecaf324623e5d1204f7634224 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 21 Jul 2015 19:46:06 -0700 Subject: [PATCH 04/11] Fix scala style issues --- .../spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala index b96d418403e4..b9875c88604e 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala @@ -42,7 +42,7 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll override def beforeAll(): Unit = { runIfTestsEnabled("Prepare KinesisTestUtils") { - testUtils = new KinesisTestUtils(endpointUrl) + testUtils = new KinesisTestUtils(endpointUrl) testUtils.createStream() shardIdToDataAndSeqNumbers = testUtils.pushData(testData) @@ -229,7 +229,8 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll // Verify that the RDD is not invalid after the blocks are removed and can still read data // from write ahead log if (testBlockRemove) { - require(numPartitions === numPartitionsInKinesis, "All partitions must be in WAL for this test") + require(numPartitions === numPartitionsInKinesis, + "All partitions must be in WAL for this test") require(numPartitionsInBM > 0, "Some partitions must be in BlockManager for this test") rdd.removeBlocks() assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSet === testData.toSet) From 8874b709acd18719d7aca8d0e09fba52329f7a4d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 21 Jul 2015 20:23:52 -0700 Subject: [PATCH 05/11] Updated Kinesis RDD --- .../kinesis/KinesisBackedBlockRDD.scala | 50 +++++++++---------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index 4dcb035a065d..d3debea0cc08 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -23,15 +23,18 @@ import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain} import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.model._ +import org.apache.spark._ import org.apache.spark.rdd.{BlockRDD, BlockRDDPartition} import org.apache.spark.storage.BlockId import org.apache.spark.util.NextIterator -import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} + +/** Class representing a range of Kinesis sequence numbers */ private[kinesis] case class SequenceNumberRange( streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String) +/** Class representing an array of Kinesis sequence number ranges */ private[kinesis] case class SequenceNumberRanges(ranges: Array[SequenceNumberRange]) { def isEmpty(): Boolean = ranges.isEmpty @@ -41,20 +44,13 @@ case class SequenceNumberRanges(ranges: Array[SequenceNumberRange]) { private[kinesis] object SequenceNumberRanges { - def apply(range: SequenceNumberRange): SequenceNumberRanges = { new SequenceNumberRanges(Array(range)) } - - def apply(ranges: Seq[SequenceNumberRange]): SequenceNumberRanges = { - new SequenceNumberRanges(ranges.toArray) - } - - def empty: SequenceNumberRanges = { - new SequenceNumberRanges(Array.empty) - } } + +/** Partition storing the information of the ranges of Kinesis sequence numbers to read */ private[kinesis] class KinesisBackedBlockRDDPartition( idx: Int, @@ -63,6 +59,10 @@ class KinesisBackedBlockRDDPartition( val seqNumberRanges: SequenceNumberRanges ) extends BlockRDDPartition(blockId, idx) +/** + * A BlockRDD where the block data is backed by Kinesis, which can accessed using the + * sequence numbers of the corresponding blocks. + */ private[kinesis] class KinesisBackedBlockRDD( sc: SparkContext, @@ -70,7 +70,8 @@ class KinesisBackedBlockRDD( endpointUrl: String, @transient blockIds: Array[BlockId], @transient arrayOfseqNumberRanges: Array[SequenceNumberRanges], - @transient isBlockIdValid: Array[Boolean] = Array.empty + @transient isBlockIdValid: Array[Boolean] = Array.empty, + awsCredentialsOption: Option[SerializableAWSCredentials] = None ) extends BlockRDD[Array[Byte]](sc, blockIds) { require(blockIds.length == arrayOfseqNumberRanges.length, @@ -96,11 +97,11 @@ class KinesisBackedBlockRDD( } def getBlockFromKinesis(): Iterator[Array[Byte]] = { - val credenentials = new DefaultAWSCredentialsProviderChain().getCredentials() + val credenentials = awsCredentialsOption.getOrElse { + new DefaultAWSCredentialsProviderChain().getCredentials() + } partition.seqNumberRanges.ranges.iterator.flatMap { range => - new KinesisSequenceRangeIterator( - credenentials, endpointUrl, regionId, - range.streamName, range.shardId, range.fromSeqNumber, range.toSeqNumber) + new KinesisSequenceRangeIterator(credenentials, endpointUrl, regionId, range) } } if (partition.isBlockIdValid) { @@ -112,15 +113,13 @@ class KinesisBackedBlockRDD( } +/** An iterator that return the Kinesis data based on the given range of Sequence numbers */ private[kinesis] class KinesisSequenceRangeIterator( credentials: AWSCredentials, endpointUrl: String, regionId: String, - streamName: String, - shardId: String, - fromSeqNumber: String, - toSeqNumber: String + range: SequenceNumberRange ) extends NextIterator[Array[Byte]] { private val backoffTimeMillis = 1000 @@ -142,7 +141,7 @@ class KinesisSequenceRangeIterator( // If the internal iterator has not been initialized, // then fetch records from starting sequence number - getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, fromSeqNumber) + getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber) } else if (!internalIterator.hasNext) { // If the internal iterator does not have any more records, @@ -155,9 +154,7 @@ class KinesisSequenceRangeIterator( // If the internal iterator still does not have any data, then throw exception // and terminate this iterator finished = true - throw new Exception("Could not read until the specified end sequence number: " + - s"shardId = $shardId, fromSequenceNumber = $fromSeqNumber, " + - s"toSequenceNumber = $toSeqNumber") + throw new SparkException(s"Could not read until the specified end sequence number: $range") } else { // Get the record, and remember its sequence number @@ -167,7 +164,7 @@ class KinesisSequenceRangeIterator( // If the this record's sequence number matches the stopping sequence number, then make sure // the iterator is marked finished next time getNext() is called - if (nextRecord.getSequenceNumber == toSeqNumber) { + if (nextRecord.getSequenceNumber == range.toSeqNumber) { toSeqNumberReceived = true } } @@ -179,11 +176,12 @@ class KinesisSequenceRangeIterator( override protected def close(): Unit = { } private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Unit = { - val shardIterator = getKinesisIterator(streamName, shardId, iteratorType, seqNum) + val shardIterator = getKinesisIterator(range.streamName, range.shardId, iteratorType, seqNum) var records: Seq[Record] = null do { try { - val getResult = getRecordsAndNextKinesisIterator(streamName, shardId, shardIterator) + val getResult = getRecordsAndNextKinesisIterator( + range.streamName, range.shardId, shardIterator) records = getResult._1 } catch { case ptee: ProvisionedThroughputExceededException => From f6e35c843484fe8aa2fe6ada5e40bdad735147d9 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 23 Jul 2015 17:47:47 -0700 Subject: [PATCH 06/11] Added retry logic to make it more robust --- .../kinesis/KinesisBackedBlockRDD.scala | 135 ++++++++++++------ 1 file changed, 94 insertions(+), 41 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index d3debea0cc08..fb92bc70ee33 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.kinesis import scala.collection.JavaConversions._ +import scala.util.control.NonFatal import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain} import com.amazonaws.services.kinesis.AmazonKinesisClient @@ -29,7 +30,7 @@ import org.apache.spark.storage.BlockId import org.apache.spark.util.NextIterator -/** Class representing a range of Kinesis sequence numbers */ +/** Class representing a range of Kinesis sequence numbers. Both sequence numbers are inclusive. */ private[kinesis] case class SequenceNumberRange( streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String) @@ -71,8 +72,9 @@ class KinesisBackedBlockRDD( @transient blockIds: Array[BlockId], @transient arrayOfseqNumberRanges: Array[SequenceNumberRanges], @transient isBlockIdValid: Array[Boolean] = Array.empty, + retryTimeoutMs: Int = 10000, awsCredentialsOption: Option[SerializableAWSCredentials] = None -) extends BlockRDD[Array[Byte]](sc, blockIds) { + ) extends BlockRDD[Array[Byte]](sc, blockIds) { require(blockIds.length == arrayOfseqNumberRanges.length, "Number of blockIds is not equal to the number of sequence number ranges") @@ -101,7 +103,8 @@ class KinesisBackedBlockRDD( new DefaultAWSCredentialsProviderChain().getCredentials() } partition.seqNumberRanges.ranges.iterator.flatMap { range => - new KinesisSequenceRangeIterator(credenentials, endpointUrl, regionId, range) + new KinesisSequenceRangeIterator( + credenentials, endpointUrl, regionId, range, retryTimeoutMs) } } if (partition.isBlockIdValid) { @@ -113,17 +116,23 @@ class KinesisBackedBlockRDD( } -/** An iterator that return the Kinesis data based on the given range of Sequence numbers */ +/** + * An iterator that return the Kinesis data based on the given range of sequence numbers. + * Internally, it repeatedly fetches sets of records starting from the fromSequenceNumber, + * until the endSequenceNumber is reached. + */ private[kinesis] class KinesisSequenceRangeIterator( credentials: AWSCredentials, endpointUrl: String, regionId: String, - range: SequenceNumberRange - ) extends NextIterator[Array[Byte]] { + range: SequenceNumberRange, + retryTimeoutMs: Int + ) extends NextIterator[Array[Byte]] with Logging { - private val backoffTimeMillis = 1000 private val client = new AmazonKinesisClient(credentials) + private val streamName = range.streamName + private val shardId = range.shardId private var toSeqNumberReceived = false private var lastSeqNumber: String = null @@ -141,12 +150,12 @@ class KinesisSequenceRangeIterator( // If the internal iterator has not been initialized, // then fetch records from starting sequence number - getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber) + internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber) } else if (!internalIterator.hasNext) { // If the internal iterator does not have any more records, // then fetch more records after the last consumed sequence number - getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber) + internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber) } if (!internalIterator.hasNext) { @@ -154,12 +163,15 @@ class KinesisSequenceRangeIterator( // If the internal iterator still does not have any data, then throw exception // and terminate this iterator finished = true - throw new SparkException(s"Could not read until the specified end sequence number: $range") + throw new SparkException( + s"Could not read until the end sequence number of the range: $range") } else { - // Get the record, and remember its sequence number - val nextRecord = internalIterator.next() - nextBytes = nextRecord.getData().array() + // Get the record, copy the data into a byte array and remember its sequence number + val nextRecord: Record = internalIterator.next() + val byteBuffer = nextRecord.getData() + nextBytes = new Array[Byte](byteBuffer.remaining()) + byteBuffer.get(nextBytes ) lastSeqNumber = nextRecord.getSequenceNumber() // If the this record's sequence number matches the stopping sequence number, then make sure @@ -173,51 +185,92 @@ class KinesisSequenceRangeIterator( nextBytes } - override protected def close(): Unit = { } + override protected def close(): Unit = { + client.shutdown() + } - private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Unit = { - val shardIterator = getKinesisIterator(range.streamName, range.shardId, iteratorType, seqNum) - var records: Seq[Record] = null - do { - try { - val getResult = getRecordsAndNextKinesisIterator( - range.streamName, range.shardId, shardIterator) - records = getResult._1 - } catch { - case ptee: ProvisionedThroughputExceededException => - Thread.sleep(backoffTimeMillis) - } - } while (records == null || records.length == 0) // TODO: put a limit on the number of retries - if (records != null && records.nonEmpty) { - internalIterator = records.iterator - } + /** + * Get records starting from or after the given sequence number. + */ + private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Iterator[Record] = { + val shardIterator = getKinesisIterator(iteratorType, seqNum) + val result = getRecordsAndNextKinesisIterator(shardIterator) + result._1 } + /** + * Get the records starting from using a Kinesis shard iterator (which is a progress handle + * to get records from Kinesis), and get the next shard iterator for next consumption. + */ private def getRecordsAndNextKinesisIterator( - streamName: String, - shardId: String, - shardIterator: String - ): (Seq[Record], String) = { + shardIterator: String): (Iterator[Record], String) = { val getRecordsRequest = new GetRecordsRequest getRecordsRequest.setRequestCredentials(credentials) getRecordsRequest.setShardIterator(shardIterator) - val getRecordsResult = client.getRecords(getRecordsRequest) - (getRecordsResult.getRecords, getRecordsResult.getNextShardIterator) + val getRecordsResult = retryOrTimeout[GetRecordsResult]( + s"getting records using shard iterator") { + client.getRecords(getRecordsRequest) + } + (getRecordsResult.getRecords.iterator(), getRecordsResult.getNextShardIterator) } + /** + * Get the Kinesis shard iterator for getting records starting from or after the given + * sequence number. + */ private def getKinesisIterator( - streamName: String, - shardId: String, iteratorType: ShardIteratorType, - sequenceNumber: String - ): String = { + sequenceNumber: String): String = { val getShardIteratorRequest = new GetShardIteratorRequest getShardIteratorRequest.setRequestCredentials(credentials) getShardIteratorRequest.setStreamName(streamName) getShardIteratorRequest.setShardId(shardId) getShardIteratorRequest.setShardIteratorType(iteratorType.toString) getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber) - val getShardIteratorResult = client.getShardIterator(getShardIteratorRequest) + val getShardIteratorResult = retryOrTimeout[GetShardIteratorResult]( + s"getting shard iterator from sequence number $sequenceNumber") { + client.getShardIterator(getShardIteratorRequest) + } getShardIteratorResult.getShardIterator } + + private def retryOrTimeout[T](message: String)(body: => T): T = { + import KinesisSequenceRangeIterator._ + + var startTimeMs = System.currentTimeMillis() + var retryCount = 0 + var waitTimeMs = 0 + var result: Option[T] = None + var lastError: Throwable = null + + def timeSpentMs = System.currentTimeMillis() - startTimeMs + + while (result == null && retryCount < MAX_RETRIES && timeSpentMs <= retryTimeoutMs) { + Thread.sleep(waitTimeMs) + try { + result = Some(body) + } catch { + case NonFatal(t) => + lastError = t + t match { + case ptee: ProvisionedThroughputExceededException => + logWarning(s"Exception while $message", ptee) + case e: Throwable => + throw new SparkException(s"Error $message", e) + } + } finally { + retryCount += 1 + if (waitTimeMs == 0) waitTimeMs = MIN_RETRY_WAIT_TIME_MS else waitTimeMs *= 2 + } + } + result.getOrElse { + throw new SparkException(s"Timed out while $message, last exception: ", lastError) + } + } } + +private[streaming] +object KinesisSequenceRangeIterator { + val MAX_RETRIES = 3 + val MIN_RETRY_WAIT_TIME_MS = 100 +} \ No newline at end of file From d3d64d129694a82d453a545b61fd9c07c5cbcff3 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 23 Jul 2015 17:54:40 -0700 Subject: [PATCH 07/11] Minor update --- .../kinesis/KinesisBackedBlockRDD.scala | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index fb92bc70ee33..a56ffe01ad8b 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -234,6 +234,7 @@ class KinesisSequenceRangeIterator( getShardIteratorResult.getShardIterator } + /** Helper method to retry Kinesis API request with exponential backoff and timeouts */ private def retryOrTimeout[T](message: String)(body: => T): T = { import KinesisSequenceRangeIterator._ @@ -243,9 +244,11 @@ class KinesisSequenceRangeIterator( var result: Option[T] = None var lastError: Throwable = null - def timeSpentMs = System.currentTimeMillis() - startTimeMs + def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= retryTimeoutMs + def isMaxRetryDone = retryCount >= MAX_RETRIES - while (result == null && retryCount < MAX_RETRIES && timeSpentMs <= retryTimeoutMs) { + + while (result == null && !isTimedOut && !isMaxRetryDone) { Thread.sleep(waitTimeMs) try { result = Some(body) @@ -254,17 +257,22 @@ class KinesisSequenceRangeIterator( lastError = t t match { case ptee: ProvisionedThroughputExceededException => - logWarning(s"Exception while $message", ptee) + logWarning(s"Error while $message [attempt = ${retryCount + 1}]", ptee) case e: Throwable => - throw new SparkException(s"Error $message", e) + throw new SparkException(s"Error while $message", e) } - } finally { - retryCount += 1 - if (waitTimeMs == 0) waitTimeMs = MIN_RETRY_WAIT_TIME_MS else waitTimeMs *= 2 } + retryCount += 1 + if (waitTimeMs == 0) waitTimeMs = MIN_RETRY_WAIT_TIME_MS else waitTimeMs *= 2 } result.getOrElse { - throw new SparkException(s"Timed out while $message, last exception: ", lastError) + if (isTimedOut) { + throw new SparkException( + s"Timed out after $retryTimeoutMs ms while $message, last exception: ", lastError) + } else { + throw new SparkException( + s"Gave up after $retryCount retries while $message, last exception: ", lastError) + } } } } From c4f25d28e4a8e837a90b92f1ada7d7eeff090550 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 23 Jul 2015 18:00:24 -0700 Subject: [PATCH 08/11] Addressed comment --- .../spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala index b9875c88604e..b2e2a4246dbd 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala @@ -65,7 +65,9 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll } override def afterAll(): Unit = { - sc.stop() + if (sc != null) { + sc.stop() + } } testIfEnabled("Basic reading from Kinesis") { From 3f40c2da20711017cef969b608fd457ec47e22b1 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 23 Jul 2015 18:23:54 -0700 Subject: [PATCH 09/11] Addressed comments --- .../streaming/kinesis/KinesisBackedBlockRDD.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index a56ffe01ad8b..c8c094c54161 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -240,16 +240,18 @@ class KinesisSequenceRangeIterator( var startTimeMs = System.currentTimeMillis() var retryCount = 0 - var waitTimeMs = 0 + var waitTimeMs = MIN_RETRY_WAIT_TIME_MS var result: Option[T] = None var lastError: Throwable = null def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= retryTimeoutMs def isMaxRetryDone = retryCount >= MAX_RETRIES - - while (result == null && !isTimedOut && !isMaxRetryDone) { - Thread.sleep(waitTimeMs) + while (result.isEmpty && !isTimedOut && !isMaxRetryDone) { + if (retryCount > 0) { // wait only if this is a retry + Thread.sleep(waitTimeMs) + waitTimeMs *= 2 // if you have waited, then double wait time for next round + } try { result = Some(body) } catch { @@ -263,7 +265,6 @@ class KinesisSequenceRangeIterator( } } retryCount += 1 - if (waitTimeMs == 0) waitTimeMs = MIN_RETRY_WAIT_TIME_MS else waitTimeMs *= 2 } result.getOrElse { if (isTimedOut) { From 5082a30eb60b9435e9264b65e980c967cce92c3c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 23 Jul 2015 18:24:40 -0700 Subject: [PATCH 10/11] Fixed scala style --- .../apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index c8c094c54161..07265148689b 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -282,4 +282,4 @@ private[streaming] object KinesisSequenceRangeIterator { val MAX_RETRIES = 3 val MIN_RETRY_WAIT_TIME_MS = 100 -} \ No newline at end of file +} From 543d208dfaa819aabdf62d0d3c958a110518d966 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 23 Jul 2015 18:50:05 -0700 Subject: [PATCH 11/11] Fixed scala style --- .../apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index 07265148689b..8f144a4d974a 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -171,7 +171,7 @@ class KinesisSequenceRangeIterator( val nextRecord: Record = internalIterator.next() val byteBuffer = nextRecord.getData() nextBytes = new Array[Byte](byteBuffer.remaining()) - byteBuffer.get(nextBytes ) + byteBuffer.get(nextBytes) lastSeqNumber = nextRecord.getSequenceNumber() // If the this record's sequence number matches the stopping sequence number, then make sure