Skip to content

Commit d249636

Browse files
committed
[SPARK-9216] [STREAMING] Define KinesisBackedBlockRDDs
For more information see master JIRA: https://issues.apache.org/jira/browse/SPARK-9215 Design Doc: https://docs.google.com/document/d/1k0dl270EnK7uExrsCE7jYw7PYx0YC935uBcxn3p0f58/edit Author: Tathagata Das <[email protected]> Closes #7578 from tdas/kinesis-rdd and squashes the following commits: 543d208 [Tathagata Das] Fixed scala style 5082a30 [Tathagata Das] Fixed scala style 3f40c2d [Tathagata Das] Addressed comments c4f25d2 [Tathagata Das] Addressed comment d3d64d1 [Tathagata Das] Minor update f6e35c8 [Tathagata Das] Added retry logic to make it more robust 8874b70 [Tathagata Das] Updated Kinesis RDD 575bdbc [Tathagata Das] Fix scala style issues 4a36096 [Tathagata Das] Add license 5da3995 [Tathagata Das] Changed KinesisSuiteHelper to KinesisFunSuite 528e206 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into kinesis-rdd 3ae0814 [Tathagata Das] Added KinesisBackedBlockRDD
1 parent 52de3ac commit d249636

File tree

5 files changed

+545
-5
lines changed

5 files changed

+545
-5
lines changed
Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.kinesis
19+
20+
import scala.collection.JavaConversions._
21+
import scala.util.control.NonFatal
22+
23+
import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
24+
import com.amazonaws.services.kinesis.AmazonKinesisClient
25+
import com.amazonaws.services.kinesis.model._
26+
27+
import org.apache.spark._
28+
import org.apache.spark.rdd.{BlockRDD, BlockRDDPartition}
29+
import org.apache.spark.storage.BlockId
30+
import org.apache.spark.util.NextIterator
31+
32+
33+
/** Class representing a range of Kinesis sequence numbers. Both sequence numbers are inclusive. */
34+
private[kinesis]
35+
case class SequenceNumberRange(
36+
streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String)
37+
38+
/** Class representing an array of Kinesis sequence number ranges */
39+
private[kinesis]
40+
case class SequenceNumberRanges(ranges: Array[SequenceNumberRange]) {
41+
def isEmpty(): Boolean = ranges.isEmpty
42+
def nonEmpty(): Boolean = ranges.nonEmpty
43+
override def toString(): String = ranges.mkString("SequenceNumberRanges(", ", ", ")")
44+
}
45+
46+
private[kinesis]
47+
object SequenceNumberRanges {
48+
def apply(range: SequenceNumberRange): SequenceNumberRanges = {
49+
new SequenceNumberRanges(Array(range))
50+
}
51+
}
52+
53+
54+
/** Partition storing the information of the ranges of Kinesis sequence numbers to read */
55+
private[kinesis]
56+
class KinesisBackedBlockRDDPartition(
57+
idx: Int,
58+
blockId: BlockId,
59+
val isBlockIdValid: Boolean,
60+
val seqNumberRanges: SequenceNumberRanges
61+
) extends BlockRDDPartition(blockId, idx)
62+
63+
/**
64+
* A BlockRDD where the block data is backed by Kinesis, which can accessed using the
65+
* sequence numbers of the corresponding blocks.
66+
*/
67+
private[kinesis]
68+
class KinesisBackedBlockRDD(
69+
sc: SparkContext,
70+
regionId: String,
71+
endpointUrl: String,
72+
@transient blockIds: Array[BlockId],
73+
@transient arrayOfseqNumberRanges: Array[SequenceNumberRanges],
74+
@transient isBlockIdValid: Array[Boolean] = Array.empty,
75+
retryTimeoutMs: Int = 10000,
76+
awsCredentialsOption: Option[SerializableAWSCredentials] = None
77+
) extends BlockRDD[Array[Byte]](sc, blockIds) {
78+
79+
require(blockIds.length == arrayOfseqNumberRanges.length,
80+
"Number of blockIds is not equal to the number of sequence number ranges")
81+
82+
override def isValid(): Boolean = true
83+
84+
override def getPartitions: Array[Partition] = {
85+
Array.tabulate(blockIds.length) { i =>
86+
val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
87+
new KinesisBackedBlockRDDPartition(i, blockIds(i), isValid, arrayOfseqNumberRanges(i))
88+
}
89+
}
90+
91+
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
92+
val blockManager = SparkEnv.get.blockManager
93+
val partition = split.asInstanceOf[KinesisBackedBlockRDDPartition]
94+
val blockId = partition.blockId
95+
96+
def getBlockFromBlockManager(): Option[Iterator[Array[Byte]]] = {
97+
logDebug(s"Read partition data of $this from block manager, block $blockId")
98+
blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[Array[Byte]]])
99+
}
100+
101+
def getBlockFromKinesis(): Iterator[Array[Byte]] = {
102+
val credenentials = awsCredentialsOption.getOrElse {
103+
new DefaultAWSCredentialsProviderChain().getCredentials()
104+
}
105+
partition.seqNumberRanges.ranges.iterator.flatMap { range =>
106+
new KinesisSequenceRangeIterator(
107+
credenentials, endpointUrl, regionId, range, retryTimeoutMs)
108+
}
109+
}
110+
if (partition.isBlockIdValid) {
111+
getBlockFromBlockManager().getOrElse { getBlockFromKinesis() }
112+
} else {
113+
getBlockFromKinesis()
114+
}
115+
}
116+
}
117+
118+
119+
/**
120+
* An iterator that return the Kinesis data based on the given range of sequence numbers.
121+
* Internally, it repeatedly fetches sets of records starting from the fromSequenceNumber,
122+
* until the endSequenceNumber is reached.
123+
*/
124+
private[kinesis]
125+
class KinesisSequenceRangeIterator(
126+
credentials: AWSCredentials,
127+
endpointUrl: String,
128+
regionId: String,
129+
range: SequenceNumberRange,
130+
retryTimeoutMs: Int
131+
) extends NextIterator[Array[Byte]] with Logging {
132+
133+
private val client = new AmazonKinesisClient(credentials)
134+
private val streamName = range.streamName
135+
private val shardId = range.shardId
136+
137+
private var toSeqNumberReceived = false
138+
private var lastSeqNumber: String = null
139+
private var internalIterator: Iterator[Record] = null
140+
141+
client.setEndpoint(endpointUrl, "kinesis", regionId)
142+
143+
override protected def getNext(): Array[Byte] = {
144+
var nextBytes: Array[Byte] = null
145+
if (toSeqNumberReceived) {
146+
finished = true
147+
} else {
148+
149+
if (internalIterator == null) {
150+
151+
// If the internal iterator has not been initialized,
152+
// then fetch records from starting sequence number
153+
internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber)
154+
} else if (!internalIterator.hasNext) {
155+
156+
// If the internal iterator does not have any more records,
157+
// then fetch more records after the last consumed sequence number
158+
internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber)
159+
}
160+
161+
if (!internalIterator.hasNext) {
162+
163+
// If the internal iterator still does not have any data, then throw exception
164+
// and terminate this iterator
165+
finished = true
166+
throw new SparkException(
167+
s"Could not read until the end sequence number of the range: $range")
168+
} else {
169+
170+
// Get the record, copy the data into a byte array and remember its sequence number
171+
val nextRecord: Record = internalIterator.next()
172+
val byteBuffer = nextRecord.getData()
173+
nextBytes = new Array[Byte](byteBuffer.remaining())
174+
byteBuffer.get(nextBytes)
175+
lastSeqNumber = nextRecord.getSequenceNumber()
176+
177+
// If the this record's sequence number matches the stopping sequence number, then make sure
178+
// the iterator is marked finished next time getNext() is called
179+
if (nextRecord.getSequenceNumber == range.toSeqNumber) {
180+
toSeqNumberReceived = true
181+
}
182+
}
183+
184+
}
185+
nextBytes
186+
}
187+
188+
override protected def close(): Unit = {
189+
client.shutdown()
190+
}
191+
192+
/**
193+
* Get records starting from or after the given sequence number.
194+
*/
195+
private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Iterator[Record] = {
196+
val shardIterator = getKinesisIterator(iteratorType, seqNum)
197+
val result = getRecordsAndNextKinesisIterator(shardIterator)
198+
result._1
199+
}
200+
201+
/**
202+
* Get the records starting from using a Kinesis shard iterator (which is a progress handle
203+
* to get records from Kinesis), and get the next shard iterator for next consumption.
204+
*/
205+
private def getRecordsAndNextKinesisIterator(
206+
shardIterator: String): (Iterator[Record], String) = {
207+
val getRecordsRequest = new GetRecordsRequest
208+
getRecordsRequest.setRequestCredentials(credentials)
209+
getRecordsRequest.setShardIterator(shardIterator)
210+
val getRecordsResult = retryOrTimeout[GetRecordsResult](
211+
s"getting records using shard iterator") {
212+
client.getRecords(getRecordsRequest)
213+
}
214+
(getRecordsResult.getRecords.iterator(), getRecordsResult.getNextShardIterator)
215+
}
216+
217+
/**
218+
* Get the Kinesis shard iterator for getting records starting from or after the given
219+
* sequence number.
220+
*/
221+
private def getKinesisIterator(
222+
iteratorType: ShardIteratorType,
223+
sequenceNumber: String): String = {
224+
val getShardIteratorRequest = new GetShardIteratorRequest
225+
getShardIteratorRequest.setRequestCredentials(credentials)
226+
getShardIteratorRequest.setStreamName(streamName)
227+
getShardIteratorRequest.setShardId(shardId)
228+
getShardIteratorRequest.setShardIteratorType(iteratorType.toString)
229+
getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber)
230+
val getShardIteratorResult = retryOrTimeout[GetShardIteratorResult](
231+
s"getting shard iterator from sequence number $sequenceNumber") {
232+
client.getShardIterator(getShardIteratorRequest)
233+
}
234+
getShardIteratorResult.getShardIterator
235+
}
236+
237+
/** Helper method to retry Kinesis API request with exponential backoff and timeouts */
238+
private def retryOrTimeout[T](message: String)(body: => T): T = {
239+
import KinesisSequenceRangeIterator._
240+
241+
var startTimeMs = System.currentTimeMillis()
242+
var retryCount = 0
243+
var waitTimeMs = MIN_RETRY_WAIT_TIME_MS
244+
var result: Option[T] = None
245+
var lastError: Throwable = null
246+
247+
def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= retryTimeoutMs
248+
def isMaxRetryDone = retryCount >= MAX_RETRIES
249+
250+
while (result.isEmpty && !isTimedOut && !isMaxRetryDone) {
251+
if (retryCount > 0) { // wait only if this is a retry
252+
Thread.sleep(waitTimeMs)
253+
waitTimeMs *= 2 // if you have waited, then double wait time for next round
254+
}
255+
try {
256+
result = Some(body)
257+
} catch {
258+
case NonFatal(t) =>
259+
lastError = t
260+
t match {
261+
case ptee: ProvisionedThroughputExceededException =>
262+
logWarning(s"Error while $message [attempt = ${retryCount + 1}]", ptee)
263+
case e: Throwable =>
264+
throw new SparkException(s"Error while $message", e)
265+
}
266+
}
267+
retryCount += 1
268+
}
269+
result.getOrElse {
270+
if (isTimedOut) {
271+
throw new SparkException(
272+
s"Timed out after $retryTimeoutMs ms while $message, last exception: ", lastError)
273+
} else {
274+
throw new SparkException(
275+
s"Gave up after $retryCount retries while $message, last exception: ", lastError)
276+
}
277+
}
278+
}
279+
}
280+
281+
private[streaming]
282+
object KinesisSequenceRangeIterator {
283+
val MAX_RETRIES = 3
284+
val MIN_RETRY_WAIT_TIME_MS = 100
285+
}

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ private class KinesisTestUtils(
177177

178178
private[kinesis] object KinesisTestUtils {
179179

180-
val envVarName = "RUN_KINESIS_TESTS"
180+
val envVarName = "ENABLE_KINESIS_TESTS"
181181

182182
val shouldRunTests = sys.env.get(envVarName) == Some("1")
183183

0 commit comments

Comments
 (0)