From 92dced8b531c4f1d8ccf09a757f948ef185510f9 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 16 Feb 2016 13:50:01 -0800 Subject: [PATCH 01/16] Clean up blocks for Kinesis source; add unit tests --- .../org/apache/spark/storage/BlockId.scala | 8 + .../apache/spark/storage/BlockIdSuite.scala | 12 ++ .../apache/spark/util/JsonProtocolSuite.scala | 1 + .../streaming/kinesis/KinesisReceiver.scala | 10 +- .../streaming/kinesis/KinesisSource.scala | 137 +++++++++++------- .../streaming/kinesis/KinesisTestUtils.scala | 5 - .../kinesis/KinesisSourceSuite.scala | 103 ++++++++----- 7 files changed, 179 insertions(+), 97 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 524f6970992a5..90f88ad62a026 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -85,6 +85,11 @@ case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId { override def name: String = "input-" + streamId + "-" + uniqueId } +@DeveloperApi +case class StreamSourceBlockId(source: String, uniqueId: Long) extends BlockId { + override def name: String = s"source-$source-$uniqueId" +} + /** Id associated with temporary local data managed as blocks. Not serializable. */ private[spark] case class TempLocalBlockId(id: UUID) extends BlockId { override def name: String = "temp_local_" + id @@ -109,6 +114,7 @@ object BlockId { val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r val TASKRESULT = "taskresult_([0-9]+)".r val STREAM = "input-([0-9]+)-([0-9]+)".r + val STREAM_SOURCE = "source-([_A-Za-z0-9]+)-([0-9]+)".r val TEST = "test_(.*)".r /** Converts a BlockId "name" String back into a BlockId. */ @@ -127,6 +133,8 @@ object BlockId { TaskResultBlockId(taskId.toLong) case STREAM(streamId, uniqueId) => StreamBlockId(streamId.toInt, uniqueId.toLong) + case STREAM_SOURCE(source, uniqueId) => + StreamSourceBlockId(source, uniqueId.toLong) case TEST(value) => TestBlockId(value) case _ => diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala index 89ed031b6fcd1..aee43e2cfaa7f 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala @@ -101,6 +101,18 @@ class BlockIdSuite extends SparkFunSuite { assertSame(id, BlockId(id.toString)) } + test("stream-source") { + val id = StreamSourceBlockId("test", 100) + assertSame(id, StreamSourceBlockId("test", 100)) + assertDifferent(id, StreamSourceBlockId("test", 101)) + assert(id.name === "source-test-100") + assert(id.asRDDId === None) + assert(id.source === "test") + assert(id.uniqueId === 100) + assert(!id.isBroadcast) + assertSame(id, BlockId(id.toString)) + } + test("test") { val id = TestBlockId("abc") assertSame(id, TestBlockId("abc")) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index de6f408fa82be..53465cef24371 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -163,6 +163,7 @@ class JsonProtocolSuite extends SparkFunSuite { testBlockId(BroadcastBlockId(1L, "insert_words_of_wisdom_here")) testBlockId(TaskResultBlockId(1L)) testBlockId(StreamBlockId(1, 2L)) + testBlockId(StreamSourceBlockId("test", 2L)) } /* ============================== * diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index ca13a21087cc8..e6b61af7c94f0 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -41,7 +41,7 @@ case class SerializableAWSCredentials(accessKeyId: String, secretKey: String) override def getAWSSecretKey: String = secretKey } -object SerializableAWSCredentials { +private[kinesis] object SerializableAWSCredentials { def apply(credentials: AWSCredentials): SerializableAWSCredentials = { new SerializableAWSCredentials(credentials.getAWSAccessKeyId, credentials.getAWSSecretKey) } @@ -339,7 +339,7 @@ private[kinesis] class KinesisReceiver[T]( * The data addition, block generation, and calls to onAddData and onGenerateBlock * are all synchronized through the same lock. */ - def onAddData(data: Any, metadata: Any): Unit = { + override def onAddData(data: Any, metadata: Any): Unit = { rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange]) } @@ -348,18 +348,18 @@ private[kinesis] class KinesisReceiver[T]( * The data addition, block generation, and calls to onAddData and onGenerateBlock * are all synchronized through the same lock. */ - def onGenerateBlock(blockId: StreamBlockId): Unit = { + override def onGenerateBlock(blockId: StreamBlockId): Unit = { finalizeRangesForCurrentBlock(blockId) } /** Callback method called when a block is ready to be pushed / stored. */ - def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { + override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { storeBlockWithRanges(blockId, arrayBuffer.asInstanceOf[mutable.ArrayBuffer[T]]) } /** Callback called in case of any error in internal of the BlockGenerator */ - def onError(message: String, throwable: Throwable): Unit = { + override def onError(message: String, throwable: Throwable): Unit = { reportError(message, throwable) } } diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala index 07876f5f45aa1..4189e58d485ba 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala @@ -16,11 +16,14 @@ */ package org.apache.spark.streaming.kinesis +import java.util.concurrent.atomic.AtomicLong + import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.util.Random import scala.util.control.NonFatal +import com.amazonaws.AbortedException import com.amazonaws.auth.DefaultAWSCredentialsProviderChain import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream @@ -28,11 +31,12 @@ import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord import com.amazonaws.services.kinesis.model._ import org.apache.spark._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.execution.streaming.{Batch, Offset, Source, StreamingRelation} +import org.apache.spark.sql.execution.streaming.{Batch, Offset, Source} import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, Dataset, SQLContext} -import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} +import org.apache.spark.storage.{BlockId, StorageLevel, StreamSourceBlockId} private[kinesis] case class Shard(streamName: String, shardId: String) @@ -40,7 +44,6 @@ private[kinesis] case class KinesisSourceOffset(seqNums: Map[Shard, String]) extends Offset { override def compareTo(otherOffset: Offset): Int = otherOffset match { - case that: KinesisSourceOffset => val allShards = this.seqNums.keySet ++ that.seqNums.keySet val comparisons = allShards.map { shard => @@ -65,20 +68,7 @@ private[kinesis] case class KinesisSourceOffset(seqNums: Map[Shard, String]) } } - -private[kinesis] object KinesisSourceOffset { - def fromOffset(offset: Offset): KinesisSourceOffset = { - offset match { - case o: KinesisSourceOffset => o - case _ => - throw new IllegalArgumentException( - s"Invalid conversion from offset of ${offset.getClass} to $getClass") - } - } -} - - -private[kinesis] case class KinesisSource( +private[kinesis] class KinesisSource( sqlContext: SQLContext, regionName: String, endpointUrl: String, @@ -86,21 +76,36 @@ private[kinesis] case class KinesisSource( initialPosInStream: InitialPositionInStream = InitialPositionInStream.LATEST, awsCredentialsOption: Option[SerializableAWSCredentials] = None) extends Source { + // How long we should wait before calling `fetchShards()`. Because DescribeStream has a limit of + // 10 transactions per second per account, we should not request too frequently. + private val FETCH_SHARDS_INTERVAL_MS = 200L + + // The last time `fetchShards()` is called. + private var lastFetchShardsTimeMS = 0L + implicit private val encoder = ExpressionEncoder[Array[Byte]] - private val logicalPlan = StreamingRelation(this) - @transient val credentials = SerializableAWSCredentials( + private val credentials = SerializableAWSCredentials( awsCredentialsOption.getOrElse(new DefaultAWSCredentialsProviderChain().getCredentials()) ) - @transient private val client = new AmazonKinesisClient(credentials) + private val client = new AmazonKinesisClient(credentials) client.setEndpoint(endpointUrl, "kinesis", regionName) - override def schema: StructType = encoder.schema + private var cachedBlocks = new mutable.HashSet[BlockId] + + override val schema: StructType = encoder.schema override def getNextBatch(start: Option[Offset]): Option[Batch] = { - val startOffset = start.map(KinesisSourceOffset.fromOffset) + val startOffset = start.map(_.asInstanceOf[KinesisSourceOffset]) + val now = System.currentTimeMillis() + if (now - lastFetchShardsTimeMS < FETCH_SHARDS_INTERVAL_MS) { + // Because DescribeStream has a limit of 10 transactions per second per account, we should not + // request too frequently. + return None + } + lastFetchShardsTimeMS = now val shards = fetchShards() // Get the starting seq number of each shard if available @@ -108,7 +113,11 @@ private[kinesis] case class KinesisSource( /** Prefetch Kinesis data from the starting seq nums */ val prefetchedData = new KinesisDataFetcher( - sqlContext, credentials, endpointUrl, regionName, fromSeqNums, initialPosInStream).fetch() + credentials, + endpointUrl, + regionName, + fromSeqNums, + initialPosInStream).fetch(sqlContext.sparkContext) if (prefetchedData.nonEmpty) { val prefetechedRanges = prefetchedData.map(_._2) @@ -125,45 +134,70 @@ private[kinesis] case class KinesisSource( val rdd = new KinesisBackedBlockRDD[Array[Byte]](sqlContext.sparkContext, regionName, endpointUrl, prefetchedBlockIds, prefetechedRanges.map(SequenceNumberRanges.apply)) + + dropOldBlocks() + cachedBlocks ++= prefetchedBlockIds + Some(new Batch(new KinesisSourceOffset(endOffset), sqlContext.createDataset(rdd).toDF)) } else { None } } - def toDS(): Dataset[Array[Byte]] = { - toDF.as[Array[Byte]] - } - - def toDF(): DataFrame = { - new DataFrame(sqlContext, logicalPlan) + private def dropOldBlocks(): Unit = { + val droppedBlocks = ArrayBuffer[BlockId]() + try { + for (blockId <- cachedBlocks) { + SparkEnv.get.blockManager.removeBlock(blockId) + droppedBlocks += blockId + } + } finally { + cachedBlocks --= droppedBlocks + } } private def fetchShards(): Seq[Shard] = { - streamNames.toSeq.flatMap { streamName => - val desc = client.describeStream(streamName) - desc.getStreamDescription.getShards.asScala.map { s => - Shard(streamName, s.getShardId) + try { + streamNames.toSeq.flatMap { streamName => + val desc = client.describeStream(streamName) + desc.getStreamDescription.getShards.asScala.map { s => + Shard(streamName, s.getShardId) + } } + } catch { + case e: AbortedException => + // AbortedException will be thrown if the current thread is interrupted + // So let's convert it back to InterruptedException + val e1 = new InterruptedException("thread is interrupted") + e1.addSuppressed(e) + throw e1 } } } +private[kinesis] object KinesisSource { + + private val nextId = new AtomicLong(0) + + def nextBlockId: StreamSourceBlockId = StreamSourceBlockId("kinesis", nextId.getAndIncrement) +} +/** + * However, this class runs in the driver so could be a bottleneck. + */ private[kinesis] class KinesisDataFetcher( - sqlContext: SQLContext, credentials: SerializableAWSCredentials, endpointUrl: String, regionName: String, fromSeqNums: Seq[(Shard, Option[String])], initialPositionInStream: InitialPositionInStream, - readTimeoutMs: Int = 2000 + readTimeoutMs: Long = 2000L ) extends Serializable with Logging { - @transient private lazy val client = new AmazonKinesisClient(credentials) + @transient private lazy val client = new AmazonKinesisClient(credentials) - def fetch(): Array[(BlockId, SequenceNumberRange)] = { - sqlContext.sparkContext.makeRDD(fromSeqNums, fromSeqNums.size).map { + def fetch(sc: SparkContext): Array[(BlockId, SequenceNumberRange)] = { + sc.makeRDD(fromSeqNums, fromSeqNums.size).map { case (shard, fromSeqNum) => fetchPartition(shard, fromSeqNum) }.collect().flatten } @@ -200,10 +234,7 @@ private[kinesis] class KinesisDataFetcher( } records.foreach { record => - val byteBuffer = record.getData() - val byteArray = new Array[Byte](byteBuffer.remaining()) - byteBuffer.get(byteArray) - buffer += byteArray + buffer += JavaUtils.bufferToArray(record.getData()) if (firstSeqNumber == null) { firstSeqNumber = record.getSequenceNumber } @@ -214,7 +245,7 @@ private[kinesis] class KinesisDataFetcher( } if (buffer.nonEmpty) { - val blockId = StreamBlockId(0, Random.nextLong) + val blockId = KinesisSource.nextBlockId SparkEnv.get.blockManager.putIterator(blockId, buffer.iterator, StorageLevel.MEMORY_ONLY) val range = SequenceNumberRange( shard.streamName, shard.shardId, firstSeqNumber, lastSeqNumber) @@ -230,16 +261,14 @@ private[kinesis] class KinesisDataFetcher( } } - /** * Get the records starting from using a Kinesis shard iterator (which is a progress handle * to get records from Kinesis), and get the next shard iterator for next consumption. */ private def getRecordsAndNextKinesisIterator( shardIterator: String): (Seq[Record], String) = { - val getRecordsRequest = new GetRecordsRequest + val getRecordsRequest = new GetRecordsRequest().withShardIterator(shardIterator) getRecordsRequest.setRequestCredentials(credentials) - getRecordsRequest.setShardIterator(shardIterator) val getRecordsResult = client.getRecords(getRecordsRequest) // De-aggregate records, if KPL was used in producing the records. The KCL automatically // handles de-aggregation during regular operation. This code path is used during recovery @@ -257,12 +286,12 @@ private[kinesis] class KinesisDataFetcher( shard: Shard, iteratorType: ShardIteratorType, sequenceNumber: String): String = { - val getShardIteratorRequest = new GetShardIteratorRequest + val getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.streamName) + .withShardId(shard.shardId) + .withShardIteratorType(iteratorType.toString) + .withStartingSequenceNumber(sequenceNumber) getShardIteratorRequest.setRequestCredentials(credentials) - getShardIteratorRequest.setStreamName(shard.streamName) - getShardIteratorRequest.setShardId(shard.shardId) - getShardIteratorRequest.setShardIteratorType(iteratorType.toString) - getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber) val getShardIteratorResult = client.getShardIterator(getShardIteratorRequest) logTrace(s"Shard $shard: Got iterator ${getShardIteratorResult.getShardIterator}") getShardIteratorResult.getShardIterator @@ -271,7 +300,7 @@ private[kinesis] class KinesisDataFetcher( /** Helper method to retry Kinesis API request with exponential backoff and timeouts */ private def retryOrTimeout[T](message: String, retryTimeoutMs: Long)(body: => T): T = { import KinesisSequenceRangeIterator._ - var startTimeMs = System.currentTimeMillis() + val startTimeMs = System.currentTimeMillis() var retryCount = 0 var waitTimeMs = MIN_RETRY_WAIT_TIME_MS var result: Option[T] = None diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index e3814be676e85..300a2416229cc 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -118,7 +118,6 @@ private[kinesis] class KinesisTestUtils extends Logging { def deleteStream(): Unit = { try { if (streamCreated) { - logInfo(s"Deleting stream $streamName") kinesisClient.deleteStream(streamName) } } catch { @@ -256,10 +255,6 @@ private[kinesis] class SimpleDataGenerator( val seqNumber = putRecordResult.getSequenceNumber() val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId, new ArrayBuffer[(Int, String)]()) - // scalastyle:off println - println(s"$data with key $str in shard ${putRecordResult.getShardId} " + - s"and seq ${putRecordResult.getSequenceNumber}") - // scalastyle:on println sentSeqNumbers += ((num, seqNumber)) seqNumForOrdering = seqNumber } diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala index f17131dc93c6c..ab3dedd442ea6 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala @@ -17,63 +17,100 @@ package org.apache.spark.streaming.kinesis - import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream -import org.scalatest.time.SpanSugar._ +import org.apache.spark.SparkEnv import org.apache.spark.sql.StreamTest import org.apache.spark.sql.execution.streaming.{Offset, Source} import org.apache.spark.sql.test.SharedSQLContext - +import org.apache.spark.storage.StreamSourceBlockId class KinesisSourceSuite extends StreamTest with SharedSQLContext with KinesisFunSuite { import testImplicits._ - private var testUtils: KPLBasedKinesisTestUtils = _ - private var streamName: String = _ - - override val streamingTimout = 60.seconds + case class AddKinesisData( + testUtils: KPLBasedKinesisTestUtils, + kinesisSource: KinesisSource, + data: Seq[Int]) extends AddData { - case class AddKinesisData(kinesisSource: KinesisSource, data: Int*) extends AddData { override def addData(): Offset = { val shardIdToSeqNums = testUtils.pushData(data, false).map { case (shardId, info) => - (Shard(streamName, shardId), info.last._2) + (Shard(testUtils.streamName, shardId), info.last._2) } assert(shardIdToSeqNums.size === testUtils.streamShardCount, - s"Data must be send to all ${testUtils.streamShardCount} shards of stream $streamName") + s"Data must be send to all ${testUtils.streamShardCount} " + + s"shards of stream ${testUtils.streamName}") KinesisSourceOffset(shardIdToSeqNums) } override def source: Source = kinesisSource } - override def beforeAll(): Unit = { - super.beforeAll() - testUtils = new KPLBasedKinesisTestUtils - testUtils.createStream() - streamName = testUtils.streamName - } + testIfEnabled("basic receiving") { + var streamBlocksInLastBatch: Seq[StreamSourceBlockId] = Seq.empty - override def afterAll(): Unit = { - if (testUtils != null) { + def assertStreamBlocks: Boolean = { + // Assume the test runs in local mode and there is only one BlockManager. + val streamBlocks = + SparkEnv.get.blockManager.getMatchingBlockIds(_.isInstanceOf[StreamSourceBlockId]) + val cleaned = streamBlocks.intersect(streamBlocksInLastBatch).isEmpty + streamBlocksInLastBatch = streamBlocks.map(_.asInstanceOf[StreamSourceBlockId]) + cleaned + } + + val testUtils = new KPLBasedKinesisTestUtils + testUtils.createStream() + try { + val kinesisSource = new KinesisSource( + sqlContext, + testUtils.regionName, + testUtils.endpointUrl, + Set(testUtils.streamName), + InitialPositionInStream.TRIM_HORIZON) + val mapped = + kinesisSource.toDS[Array[Byte]]().map((bytes: Array[Byte]) => new String(bytes).toInt + 1) + val testData = 1 to 10 + testStream(mapped)( + AddKinesisData(testUtils, kinesisSource, testData), + CheckAnswer((1 to 10).map(_ + 1): _*), + Assert(assertStreamBlocks, "Old stream blocks should be cleaned"), + AddKinesisData(testUtils, kinesisSource, 11 to 20), + CheckAnswer((1 to 20).map(_ + 1): _*), + Assert(assertStreamBlocks, "Old stream blocks should be cleaned"), + AddKinesisData(testUtils, kinesisSource, 21 to 30), + CheckAnswer((1 to 30).map(_ + 1): _*), + Assert(assertStreamBlocks, "Old stream blocks should be cleaned") + ) + } finally { testUtils.deleteStream() } - super.afterAll() } - test("basic receiving") { - val kinesisSource = KinesisSource( - sqlContext, - testUtils.regionName, - testUtils.endpointUrl, - Set(streamName), - InitialPositionInStream.TRIM_HORIZON) - val mapped = kinesisSource.toDS().map[Int]((bytes: Array[Byte]) => new String(bytes).toInt + 1) - val testData = 1 to 10 // This ensures that data is sent to multiple shards for 2 shard streams - testStream(mapped)( - AddKinesisData(kinesisSource, testData: _*), - CheckAnswer(testData.map { _ + 1 }: _*) - ) + testIfEnabled("failover") { + val testUtils = new KPLBasedKinesisTestUtils + testUtils.createStream() + try { + val kinesisSource = new KinesisSource( + sqlContext, + testUtils.regionName, + testUtils.endpointUrl, + Set(testUtils.streamName), + InitialPositionInStream.TRIM_HORIZON) + val mapped = + kinesisSource.toDS[Array[Byte]]().map((bytes: Array[Byte]) => new String(bytes).toInt + 1) + testStream(mapped)( + AddKinesisData(testUtils, kinesisSource, 1 to 10), + CheckAnswer((1 to 10).map(_ + 1): _*), + StopStream, + AddKinesisData(testUtils, kinesisSource, 11 to 20), + StartStream, + CheckAnswer((1 to 20).map(_ + 1): _*), + AddKinesisData(testUtils, kinesisSource, 21 to 30), + CheckAnswer((1 to 30).map(_ + 1): _*) + ) + } finally { + testUtils.deleteStream() + } } -} \ No newline at end of file +} From ed8bdc38e925ffe04cb22ded64396cdcd61804c7 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 16 Feb 2016 19:42:57 -0800 Subject: [PATCH 02/16] Add Kinesis DefaultSource --- .../streaming/kinesis/DefaultSource.scala | 67 +++++++++++++++++++ .../spark/streaming/kinesis/package.scala | 31 +++++++++ .../kinesis/KinesisSourceSuite.scala | 33 ++++++++- 3 files changed, 129 insertions(+), 2 deletions(-) create mode 100644 extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/DefaultSource.scala create mode 100644 extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/package.scala diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/DefaultSource.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/DefaultSource.scala new file mode 100644 index 0000000000000..3692bab2ccbd9 --- /dev/null +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/DefaultSource.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap +import org.apache.spark.sql.execution.streaming.Source +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.StructType + +class DefaultSource extends StreamSourceProvider with DataSourceRegister { + + override def shortName(): String = "kinesis" + + override def createSource( + sqlContext: SQLContext, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): Source = { + val caseInsensitiveOptions = new CaseInsensitiveMap(parameters) + val regionName = caseInsensitiveOptions.getOrElse("regionName", { + throw new IllegalArgumentException("regionName is not specified") + }) + val endpointUrl = caseInsensitiveOptions.getOrElse("endpointUrl", { + throw new IllegalArgumentException("endpointUrl is not specified") + }) + val streamNames = caseInsensitiveOptions.getOrElse("streamNames", { + throw new IllegalArgumentException("streamNames is not specified") + }).split(',').toSet + val initialPosInStream = + caseInsensitiveOptions.getOrElse("initialPosInStream", "LATEST") match { + case "LATEST" => InitialPositionInStream.LATEST + case "TRIM_HORIZON" => InitialPositionInStream.TRIM_HORIZON + case pos => throw new IllegalArgumentException(s"unknown initialPosInStream: $pos") + } + val awsCredentialsOption = + for (accessKeyId <- caseInsensitiveOptions.get("AWS_ACCESS_KEY_ID"); + secretKey <- caseInsensitiveOptions.get("AWS_SECRET_ACCESS_KEY")) + yield new SerializableAWSCredentials(accessKeyId, secretKey) + + new KinesisSource( + sqlContext, + regionName, + endpointUrl, + streamNames, + initialPosInStream, + awsCredentialsOption + ) + } +} diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/package.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/package.scala new file mode 100644 index 0000000000000..efb79bad1ee9a --- /dev/null +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/package.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import org.apache.spark.sql.DataFrameReader + +package object kinesis { + + /** + * Add the `kinesis` method to DataFrameReader that allows people to read from Kinesis using + * the DataFileReader. + */ + implicit class KinesisDataFrameReader(reader: DataFrameReader) { + def kinesis(): DataFrameReader = reader.format("org.apache.spark.streaming.kinesis") + } +} diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala index ab3dedd442ea6..46e7fb5963029 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala @@ -20,8 +20,8 @@ package org.apache.spark.streaming.kinesis import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import org.apache.spark.SparkEnv -import org.apache.spark.sql.StreamTest -import org.apache.spark.sql.execution.streaming.{Offset, Source} +import org.apache.spark.sql.{AnalysisException, StreamTest} +import org.apache.spark.sql.execution.streaming.{Offset, Source, StreamingRelation} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.storage.StreamSourceBlockId @@ -113,4 +113,33 @@ class KinesisSourceSuite extends StreamTest with SharedSQLContext with KinesisFu testUtils.deleteStream() } } + + testIfEnabled("DataFrameReader") { + val testUtils = new KPLBasedKinesisTestUtils + testUtils.createStream() + try { + val df = sqlContext.read + .option("regionName", testUtils.regionName) + .option("endpointUrl", testUtils.endpointUrl) + .option("streamNames", testUtils.streamName) + .option("initialPosInStream", "TRIM_HORIZON") + .kinesis().stream() + + val sources = df.queryExecution.analyzed + .collect { + case StreamingRelation(s: KinesisSource, _) => s + } + assert(sources.size === 1) + } finally { + testUtils.deleteStream() + } + } + + testIfEnabled("call kinesis when not using stream") { + val e = intercept[AnalysisException] { + sqlContext.read.kinesis().load() + } + assert(e.getMessage === "org.apache.spark.streaming.kinesis.DefaultSource is " + + "neither a RelationProvider nor a FSBasedRelationProvider.;") + } } From a0c03e6f7a87bbd1540adb4ab3914cb96e89ef03 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 22 Feb 2016 15:02:28 -0800 Subject: [PATCH 03/16] Revert changes to core --- .../scala/org/apache/spark/storage/BlockId.scala | 8 -------- .../org/apache/spark/storage/BlockIdSuite.scala | 12 ------------ .../org/apache/spark/util/JsonProtocolSuite.scala | 1 - 3 files changed, 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 90f88ad62a026..524f6970992a5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -85,11 +85,6 @@ case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId { override def name: String = "input-" + streamId + "-" + uniqueId } -@DeveloperApi -case class StreamSourceBlockId(source: String, uniqueId: Long) extends BlockId { - override def name: String = s"source-$source-$uniqueId" -} - /** Id associated with temporary local data managed as blocks. Not serializable. */ private[spark] case class TempLocalBlockId(id: UUID) extends BlockId { override def name: String = "temp_local_" + id @@ -114,7 +109,6 @@ object BlockId { val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r val TASKRESULT = "taskresult_([0-9]+)".r val STREAM = "input-([0-9]+)-([0-9]+)".r - val STREAM_SOURCE = "source-([_A-Za-z0-9]+)-([0-9]+)".r val TEST = "test_(.*)".r /** Converts a BlockId "name" String back into a BlockId. */ @@ -133,8 +127,6 @@ object BlockId { TaskResultBlockId(taskId.toLong) case STREAM(streamId, uniqueId) => StreamBlockId(streamId.toInt, uniqueId.toLong) - case STREAM_SOURCE(source, uniqueId) => - StreamSourceBlockId(source, uniqueId.toLong) case TEST(value) => TestBlockId(value) case _ => diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala index aee43e2cfaa7f..89ed031b6fcd1 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala @@ -101,18 +101,6 @@ class BlockIdSuite extends SparkFunSuite { assertSame(id, BlockId(id.toString)) } - test("stream-source") { - val id = StreamSourceBlockId("test", 100) - assertSame(id, StreamSourceBlockId("test", 100)) - assertDifferent(id, StreamSourceBlockId("test", 101)) - assert(id.name === "source-test-100") - assert(id.asRDDId === None) - assert(id.source === "test") - assert(id.uniqueId === 100) - assert(!id.isBroadcast) - assertSame(id, BlockId(id.toString)) - } - test("test") { val id = TestBlockId("abc") assertSame(id, TestBlockId("abc")) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 53465cef24371..de6f408fa82be 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -163,7 +163,6 @@ class JsonProtocolSuite extends SparkFunSuite { testBlockId(BroadcastBlockId(1L, "insert_words_of_wisdom_here")) testBlockId(TaskResultBlockId(1L)) testBlockId(StreamBlockId(1, 2L)) - testBlockId(StreamSourceBlockId("test", 2L)) } /* ============================== * From 6d39214e06def64a89d30e0b509ac80c584b589c Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 22 Feb 2016 15:13:01 -0800 Subject: [PATCH 04/16] Hack StreamBlockId --- .../apache/spark/streaming/kinesis/KinesisSource.scala | 4 ++-- .../spark/streaming/kinesis/KinesisSourceSuite.scala | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala index 4189e58d485ba..471e3dc0ea05e 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.{Batch, Offset, Source} import org.apache.spark.sql.types.StructType -import org.apache.spark.storage.{BlockId, StorageLevel, StreamSourceBlockId} +import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} private[kinesis] case class Shard(streamName: String, shardId: String) @@ -179,7 +179,7 @@ private[kinesis] object KinesisSource { private val nextId = new AtomicLong(0) - def nextBlockId: StreamSourceBlockId = StreamSourceBlockId("kinesis", nextId.getAndIncrement) + def nextBlockId: StreamBlockId = StreamBlockId(-1, nextId.getAndIncrement) } /** diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala index 46e7fb5963029..71056ea51e892 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.SparkEnv import org.apache.spark.sql.{AnalysisException, StreamTest} import org.apache.spark.sql.execution.streaming.{Offset, Source, StreamingRelation} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.storage.StreamSourceBlockId +import org.apache.spark.storage.StreamBlockId class KinesisSourceSuite extends StreamTest with SharedSQLContext with KinesisFunSuite { @@ -48,14 +48,14 @@ class KinesisSourceSuite extends StreamTest with SharedSQLContext with KinesisFu } testIfEnabled("basic receiving") { - var streamBlocksInLastBatch: Seq[StreamSourceBlockId] = Seq.empty + var streamBlocksInLastBatch: Seq[StreamBlockId] = Seq.empty def assertStreamBlocks: Boolean = { // Assume the test runs in local mode and there is only one BlockManager. val streamBlocks = - SparkEnv.get.blockManager.getMatchingBlockIds(_.isInstanceOf[StreamSourceBlockId]) + SparkEnv.get.blockManager.getMatchingBlockIds(_.isInstanceOf[StreamBlockId]) val cleaned = streamBlocks.intersect(streamBlocksInLastBatch).isEmpty - streamBlocksInLastBatch = streamBlocks.map(_.asInstanceOf[StreamSourceBlockId]) + streamBlocksInLastBatch = streamBlocks.map(_.asInstanceOf[StreamBlockId]) cleaned } From 8cdb26929c34b586a6d5efeb0fb31bdf74cda040 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 22 Feb 2016 15:21:22 -0800 Subject: [PATCH 05/16] Address TD's comments --- .../streaming/kinesis/DefaultSource.scala | 20 +++++++++++++++---- .../streaming/kinesis/KinesisReceiver.scala | 8 ++++---- .../streaming/kinesis/KinesisSource.scala | 2 +- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/DefaultSource.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/DefaultSource.scala index 3692bab2ccbd9..3f8c74d4a05c1 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/DefaultSource.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/DefaultSource.scala @@ -36,20 +36,32 @@ class DefaultSource extends StreamSourceProvider with DataSourceRegister { parameters: Map[String, String]): Source = { val caseInsensitiveOptions = new CaseInsensitiveMap(parameters) val regionName = caseInsensitiveOptions.getOrElse("regionName", { - throw new IllegalArgumentException("regionName is not specified") + throw new IllegalArgumentException("Option 'regionName' is not specified") }) val endpointUrl = caseInsensitiveOptions.getOrElse("endpointUrl", { - throw new IllegalArgumentException("endpointUrl is not specified") + throw new IllegalArgumentException("Option 'endpointUrl' is not specified") }) val streamNames = caseInsensitiveOptions.getOrElse("streamNames", { - throw new IllegalArgumentException("streamNames is not specified") + throw new IllegalArgumentException("Option 'streamNames' is not specified") }).split(',').toSet val initialPosInStream = caseInsensitiveOptions.getOrElse("initialPosInStream", "LATEST") match { case "LATEST" => InitialPositionInStream.LATEST case "TRIM_HORIZON" => InitialPositionInStream.TRIM_HORIZON - case pos => throw new IllegalArgumentException(s"unknown initialPosInStream: $pos") + case pos => + throw new IllegalArgumentException(s"Unknown value of option initialPosInStream: $pos") } + + if ((caseInsensitiveOptions.get("AWS_ACCESS_KEY_ID").nonEmpty + && caseInsensitiveOptions.get("AWS_SECRET_ACCESS_KEY").isEmpty)) { + throw new IllegalArgumentException( + s"AWS_ACCESS_KEY_ID is set but AWS_SECRET_ACCESS_KEY is not found") + } + if ((caseInsensitiveOptions.get("AWS_ACCESS_KEY_ID").isEmpty + && caseInsensitiveOptions.get("AWS_SECRET_ACCESS_KEY").nonEmpty)) { + throw new IllegalArgumentException( + s"AWS_SECRET_ACCESS_KEY is set but AWS_ACCESS_KEY_ID is not found") + } val awsCredentialsOption = for (accessKeyId <- caseInsensitiveOptions.get("AWS_ACCESS_KEY_ID"); secretKey <- caseInsensitiveOptions.get("AWS_SECRET_ACCESS_KEY")) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index e6b61af7c94f0..d295c4d7a63eb 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -339,7 +339,7 @@ private[kinesis] class KinesisReceiver[T]( * The data addition, block generation, and calls to onAddData and onGenerateBlock * are all synchronized through the same lock. */ - override def onAddData(data: Any, metadata: Any): Unit = { + def onAddData(data: Any, metadata: Any): Unit = { rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange]) } @@ -348,18 +348,18 @@ private[kinesis] class KinesisReceiver[T]( * The data addition, block generation, and calls to onAddData and onGenerateBlock * are all synchronized through the same lock. */ - override def onGenerateBlock(blockId: StreamBlockId): Unit = { + def onGenerateBlock(blockId: StreamBlockId): Unit = { finalizeRangesForCurrentBlock(blockId) } /** Callback method called when a block is ready to be pushed / stored. */ - override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { + def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { storeBlockWithRanges(blockId, arrayBuffer.asInstanceOf[mutable.ArrayBuffer[T]]) } /** Callback called in case of any error in internal of the BlockGenerator */ - override def onError(message: String, throwable: Throwable): Unit = { + def onError(message: String, throwable: Throwable): Unit = { reportError(message, throwable) } } diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala index 471e3dc0ea05e..32af10d3112bc 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala @@ -60,7 +60,7 @@ private[kinesis] case class KinesisSourceOffset(seqNums: Map[Shard, String]) case 1 => nonZeroSigns.head // if there are only (0s and 1s) or (0s and -1s) case _ => // there are both 1s and -1s throw new IllegalArgumentException( - s"Invalid comparison between non-linear histories: $this <=> $that") + s"Invalid comparison between KinesisSource offsets: \n\t this: $this\n\t that: $that") } case _ => From e26eee9e188b175ee3d01aed9638210bffaba0d3 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 22 Feb 2016 15:24:52 -0800 Subject: [PATCH 06/16] Move KinesisDataFetcher to a new file --- .../kinesis/KinesisDataFetcher.scala | 189 ++++++++++++++++++ .../streaming/kinesis/KinesisSource.scala | 165 +-------------- 2 files changed, 190 insertions(+), 164 deletions(-) create mode 100644 extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala new file mode 100644 index 0000000000000..6318533843d6d --- /dev/null +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.util.control.NonFatal + +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord +import com.amazonaws.services.kinesis.model._ + +import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException} +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.storage.{BlockId, StorageLevel} + +/** + * However, this class runs in the driver so could be a bottleneck. + */ +private[kinesis] class KinesisDataFetcher( + credentials: SerializableAWSCredentials, + endpointUrl: String, + regionName: String, + fromSeqNums: Seq[(Shard, Option[String])], + initialPositionInStream: InitialPositionInStream, + readTimeoutMs: Long = 2000L +) extends Serializable with Logging { + + @transient private lazy val client = new AmazonKinesisClient(credentials) + + def fetch(sc: SparkContext): Array[(BlockId, SequenceNumberRange)] = { + sc.makeRDD(fromSeqNums, fromSeqNums.size).map { + case (shard, fromSeqNum) => fetchPartition(shard, fromSeqNum) + }.collect().flatten + } + + private def fetchPartition( + shard: Shard, + fromSeqNum: Option[String]): Option[(BlockId, SequenceNumberRange)] = { + client.setEndpoint(endpointUrl, "kinesis", regionName) + + val endTime = System.currentTimeMillis + readTimeoutMs + def timeLeft = math.max(endTime - System.currentTimeMillis, 0) + + val buffer = new ArrayBuffer[Array[Byte]] + var firstSeqNumber: String = null + var lastSeqNumber: String = fromSeqNum.orNull + var lastIterator: String = null + try { + logDebug(s"Trying to fetch data from $shard, from seq num $lastSeqNumber") + + while (timeLeft > 0) { + val (records, nextIterator) = retryOrTimeout("getting shard iterator", timeLeft) { + if (lastIterator == null) { + lastIterator = if (lastSeqNumber != null) { + getKinesisIterator(shard, ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber) + } else { + if (initialPositionInStream == InitialPositionInStream.LATEST) { + getKinesisIterator(shard, ShardIteratorType.LATEST, lastSeqNumber) + } else { + getKinesisIterator(shard, ShardIteratorType.TRIM_HORIZON, lastSeqNumber) + } + } + } + getRecordsAndNextKinesisIterator(lastIterator) + } + + records.foreach { record => + buffer += JavaUtils.bufferToArray(record.getData()) + if (firstSeqNumber == null) { + firstSeqNumber = record.getSequenceNumber + } + lastSeqNumber = record.getSequenceNumber + } + + lastIterator = nextIterator + } + + if (buffer.nonEmpty) { + val blockId = KinesisSource.nextBlockId + SparkEnv.get.blockManager.putIterator(blockId, buffer.iterator, StorageLevel.MEMORY_ONLY) + val range = SequenceNumberRange( + shard.streamName, shard.shardId, firstSeqNumber, lastSeqNumber) + logDebug(s"Received block $blockId having range $range from shard $shard") + Some(blockId -> range) + } else { + None + } + } catch { + case NonFatal(e) => + logWarning(s"Error fetching data from shard $shard", e) + None + } + } + + /** + * Get the records starting from using a Kinesis shard iterator (which is a progress handle + * to get records from Kinesis), and get the next shard iterator for next consumption. + */ + private def getRecordsAndNextKinesisIterator( + shardIterator: String): (Seq[Record], String) = { + val getRecordsRequest = new GetRecordsRequest().withShardIterator(shardIterator) + getRecordsRequest.setRequestCredentials(credentials) + val getRecordsResult = client.getRecords(getRecordsRequest) + // De-aggregate records, if KPL was used in producing the records. The KCL automatically + // handles de-aggregation during regular operation. This code path is used during recovery + val records = UserRecord.deaggregate(getRecordsResult.getRecords) + logTrace( + s"Got ${records.size()} records and next iterator ${getRecordsResult.getNextShardIterator}") + (records.asScala, getRecordsResult.getNextShardIterator) + } + + /** + * Get the Kinesis shard iterator for getting records starting from or after the given + * sequence number. + */ + private def getKinesisIterator( + shard: Shard, + iteratorType: ShardIteratorType, + sequenceNumber: String): String = { + val getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.streamName) + .withShardId(shard.shardId) + .withShardIteratorType(iteratorType.toString) + .withStartingSequenceNumber(sequenceNumber) + getShardIteratorRequest.setRequestCredentials(credentials) + val getShardIteratorResult = client.getShardIterator(getShardIteratorRequest) + logTrace(s"Shard $shard: Got iterator ${getShardIteratorResult.getShardIterator}") + getShardIteratorResult.getShardIterator + } + + /** Helper method to retry Kinesis API request with exponential backoff and timeouts */ + private def retryOrTimeout[T](message: String, retryTimeoutMs: Long)(body: => T): T = { + import KinesisSequenceRangeIterator._ + val startTimeMs = System.currentTimeMillis() + var retryCount = 0 + var waitTimeMs = MIN_RETRY_WAIT_TIME_MS + var result: Option[T] = None + var lastError: Throwable = null + + def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= retryTimeoutMs + def isMaxRetryDone = retryCount >= MAX_RETRIES + + while (result.isEmpty && !isTimedOut && !isMaxRetryDone) { + if (retryCount > 0) { + // wait only if this is a retry + Thread.sleep(waitTimeMs) + waitTimeMs *= 2 // if you have waited, then double wait time for next round + } + try { + result = Some(body) + } catch { + case NonFatal(t) => + lastError = t + t match { + case ptee: ProvisionedThroughputExceededException => + logWarning(s"Error while $message [attempt = ${retryCount + 1}]", ptee) + case e: Throwable => + throw new SparkException(s"Error while $message", e) + } + } + retryCount += 1 + } + result.getOrElse { + if (isTimedOut) { + throw new SparkException( + s"Timed out after $retryTimeoutMs ms while $message, last exception: ", lastError) + } else { + throw new SparkException( + s"Gave up after $retryCount retries while $message, last exception: ", lastError) + } + } + } +} diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala index 32af10d3112bc..64e93bfbcf444 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala @@ -21,22 +21,18 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.util.control.NonFatal import com.amazonaws.AbortedException import com.amazonaws.auth.DefaultAWSCredentialsProviderChain import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream -import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord -import com.amazonaws.services.kinesis.model._ import org.apache.spark._ -import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.{Batch, Offset, Source} import org.apache.spark.sql.types.StructType -import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} +import org.apache.spark.storage.{BlockId, StreamBlockId} private[kinesis] case class Shard(streamName: String, shardId: String) @@ -181,162 +177,3 @@ private[kinesis] object KinesisSource { def nextBlockId: StreamBlockId = StreamBlockId(-1, nextId.getAndIncrement) } - -/** - * However, this class runs in the driver so could be a bottleneck. - */ -private[kinesis] class KinesisDataFetcher( - credentials: SerializableAWSCredentials, - endpointUrl: String, - regionName: String, - fromSeqNums: Seq[(Shard, Option[String])], - initialPositionInStream: InitialPositionInStream, - readTimeoutMs: Long = 2000L - ) extends Serializable with Logging { - - @transient private lazy val client = new AmazonKinesisClient(credentials) - - def fetch(sc: SparkContext): Array[(BlockId, SequenceNumberRange)] = { - sc.makeRDD(fromSeqNums, fromSeqNums.size).map { - case (shard, fromSeqNum) => fetchPartition(shard, fromSeqNum) - }.collect().flatten - } - - private def fetchPartition( - shard: Shard, - fromSeqNum: Option[String]): Option[(BlockId, SequenceNumberRange)] = { - client.setEndpoint(endpointUrl, "kinesis", regionName) - - val endTime = System.currentTimeMillis + readTimeoutMs - def timeLeft = math.max(endTime - System.currentTimeMillis, 0) - - val buffer = new ArrayBuffer[Array[Byte]] - var firstSeqNumber: String = null - var lastSeqNumber: String = fromSeqNum.orNull - var lastIterator: String = null - try { - logDebug(s"Trying to fetch data from $shard, from seq num $lastSeqNumber") - - while (timeLeft > 0) { - val (records, nextIterator) = retryOrTimeout("getting shard iterator", timeLeft) { - if (lastIterator == null) { - lastIterator = if (lastSeqNumber != null) { - getKinesisIterator(shard, ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber) - } else { - if (initialPositionInStream == InitialPositionInStream.LATEST) { - getKinesisIterator(shard, ShardIteratorType.LATEST, lastSeqNumber) - } else { - getKinesisIterator(shard, ShardIteratorType.TRIM_HORIZON, lastSeqNumber) - } - } - } - getRecordsAndNextKinesisIterator(lastIterator) - } - - records.foreach { record => - buffer += JavaUtils.bufferToArray(record.getData()) - if (firstSeqNumber == null) { - firstSeqNumber = record.getSequenceNumber - } - lastSeqNumber = record.getSequenceNumber - } - - lastIterator = nextIterator - } - - if (buffer.nonEmpty) { - val blockId = KinesisSource.nextBlockId - SparkEnv.get.blockManager.putIterator(blockId, buffer.iterator, StorageLevel.MEMORY_ONLY) - val range = SequenceNumberRange( - shard.streamName, shard.shardId, firstSeqNumber, lastSeqNumber) - logDebug(s"Received block $blockId having range $range from shard $shard") - Some(blockId -> range) - } else { - None - } - } catch { - case NonFatal(e) => - logWarning(s"Error fetching data from shard $shard", e) - None - } - } - - /** - * Get the records starting from using a Kinesis shard iterator (which is a progress handle - * to get records from Kinesis), and get the next shard iterator for next consumption. - */ - private def getRecordsAndNextKinesisIterator( - shardIterator: String): (Seq[Record], String) = { - val getRecordsRequest = new GetRecordsRequest().withShardIterator(shardIterator) - getRecordsRequest.setRequestCredentials(credentials) - val getRecordsResult = client.getRecords(getRecordsRequest) - // De-aggregate records, if KPL was used in producing the records. The KCL automatically - // handles de-aggregation during regular operation. This code path is used during recovery - val records = UserRecord.deaggregate(getRecordsResult.getRecords) - logTrace( - s"Got ${records.size()} records and next iterator ${getRecordsResult.getNextShardIterator}") - (records.asScala, getRecordsResult.getNextShardIterator) - } - - /** - * Get the Kinesis shard iterator for getting records starting from or after the given - * sequence number. - */ - private def getKinesisIterator( - shard: Shard, - iteratorType: ShardIteratorType, - sequenceNumber: String): String = { - val getShardIteratorRequest = new GetShardIteratorRequest() - .withStreamName(shard.streamName) - .withShardId(shard.shardId) - .withShardIteratorType(iteratorType.toString) - .withStartingSequenceNumber(sequenceNumber) - getShardIteratorRequest.setRequestCredentials(credentials) - val getShardIteratorResult = client.getShardIterator(getShardIteratorRequest) - logTrace(s"Shard $shard: Got iterator ${getShardIteratorResult.getShardIterator}") - getShardIteratorResult.getShardIterator - } - - /** Helper method to retry Kinesis API request with exponential backoff and timeouts */ - private def retryOrTimeout[T](message: String, retryTimeoutMs: Long)(body: => T): T = { - import KinesisSequenceRangeIterator._ - val startTimeMs = System.currentTimeMillis() - var retryCount = 0 - var waitTimeMs = MIN_RETRY_WAIT_TIME_MS - var result: Option[T] = None - var lastError: Throwable = null - - def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= retryTimeoutMs - def isMaxRetryDone = retryCount >= MAX_RETRIES - - while (result.isEmpty && !isTimedOut && !isMaxRetryDone) { - if (retryCount > 0) { - // wait only if this is a retry - Thread.sleep(waitTimeMs) - waitTimeMs *= 2 // if you have waited, then double wait time for next round - } - try { - result = Some(body) - } catch { - case NonFatal(t) => - lastError = t - t match { - case ptee: ProvisionedThroughputExceededException => - logWarning(s"Error while $message [attempt = ${retryCount + 1}]", ptee) - case e: Throwable => - throw new SparkException(s"Error while $message", e) - } - } - retryCount += 1 - } - result.getOrElse { - if (isTimedOut) { - throw new SparkException( - s"Timed out after $retryTimeoutMs ms while $message, last exception: ", lastError) - } else { - throw new SparkException( - s"Gave up after $retryCount retries while $message, last exception: ", lastError) - } - } - } -} From 6a01b28215106a6f63976e9331ea14ea5d41435e Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 22 Feb 2016 15:42:36 -0800 Subject: [PATCH 07/16] Add stress test and fix a bug in compareTo --- .../streaming/kinesis/KinesisSource.scala | 2 +- .../streaming/kinesis/KinesisTestUtils.scala | 7 +++ .../kinesis/KinesisSourceSuite.scala | 45 +++++++++++++++---- .../org/apache/spark/sql/StreamTest.scala | 10 ++--- 4 files changed, 50 insertions(+), 14 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala index 64e93bfbcf444..9108a3043b729 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala @@ -46,7 +46,7 @@ private[kinesis] case class KinesisSourceOffset(seqNums: Map[Shard, String]) (this.seqNums.get(shard).map(BigInt(_)), that.seqNums.get(shard).map(BigInt(_))) match { case (Some(thisNum), Some(thatNum)) => thisNum.compare(thatNum) case (None, _) => -1 // new shard started by resharding - case (_, None) => -1 // old shard got eliminated by resharding + case (_, None) => 1 // old shard got eliminated by resharding } } diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 300a2416229cc..95e3e2f3e8f6d 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -54,6 +54,8 @@ private[kinesis] class KinesisTestUtils extends Logging { @volatile private var _streamName: String = _ + private val shadeIdToLatestSeqNum = mutable.HashMap[String, String]() + protected lazy val kinesisClient = { val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials()) client.setEndpoint(endpointUrl) @@ -105,9 +107,14 @@ private[kinesis] class KinesisTestUtils extends Logging { val producer = getProducer(aggregate) val shardIdToSeqNumbers = producer.sendData(streamName, testData) logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}") + shardIdToSeqNumbers.foreach { case (shardId, seq) => + shadeIdToLatestSeqNum(shardId) = seq.last._2 + } shardIdToSeqNumbers.toMap } + def getLatestSeqNumsOfShards(): Map[String, String] = shadeIdToLatestSeqNum.toMap + /** * Expose a Python friendly API. */ diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala index 71056ea51e892..fa02e0d3a47a2 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.kinesis import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkEnv import org.apache.spark.sql.{AnalysisException, StreamTest} @@ -25,9 +26,7 @@ import org.apache.spark.sql.execution.streaming.{Offset, Source, StreamingRelati import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.storage.StreamBlockId -class KinesisSourceSuite extends StreamTest with SharedSQLContext with KinesisFunSuite { - - import testImplicits._ +class KinesisSourceTest extends StreamTest with SharedSQLContext { case class AddKinesisData( testUtils: KPLBasedKinesisTestUtils, @@ -35,17 +34,20 @@ class KinesisSourceSuite extends StreamTest with SharedSQLContext with KinesisFu data: Seq[Int]) extends AddData { override def addData(): Offset = { - val shardIdToSeqNums = testUtils.pushData(data, false).map { case (shardId, info) => - (Shard(testUtils.streamName, shardId), info.last._2) + testUtils.pushData(data, false) + val shardIdToSeqNums = testUtils.getLatestSeqNumsOfShards().map { case (shardId, seqNum) => + (Shard(testUtils.streamName, shardId), seqNum) } - assert(shardIdToSeqNums.size === testUtils.streamShardCount, - s"Data must be send to all ${testUtils.streamShardCount} " + - s"shards of stream ${testUtils.streamName}") KinesisSourceOffset(shardIdToSeqNums) } override def source: Source = kinesisSource } +} + +class KinesisSourceSuite extends KinesisSourceTest with KinesisFunSuite { + + import testImplicits._ testIfEnabled("basic receiving") { var streamBlocksInLastBatch: Seq[StreamBlockId] = Seq.empty @@ -143,3 +145,30 @@ class KinesisSourceSuite extends StreamTest with SharedSQLContext with KinesisFu "neither a RelationProvider nor a FSBasedRelationProvider.;") } } + +class KinesisSourceStressTestSuite extends KinesisSourceTest with KinesisFunSuite { + + import testImplicits._ + + override val streamingTimeout = 60.seconds + + test("kinesis source stress test") { + val testUtils = new KPLBasedKinesisTestUtils + testUtils.createStream() + try { + val kinesisSource = new KinesisSource( + sqlContext, + testUtils.regionName, + testUtils.endpointUrl, + Set(testUtils.streamName), + InitialPositionInStream.TRIM_HORIZON) + + val ds = kinesisSource.toDS[String]().map(_.toInt + 1) + runStressTest(ds, data => { + AddKinesisData(testUtils, kinesisSource, data) + }) + } finally { + testUtils.deleteStream() + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index bb5135826e2f3..8a83eeff3f2ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -71,7 +71,7 @@ trait StreamTest extends QueryTest with Timeouts { } /** How long to wait for an active stream to catch up when checking a result. */ - val streamingTimeout = 10.seconds + def streamingTimeout: Span = 10.seconds /** A trait for actions that can be performed while testing a streaming DataFrame. */ trait StreamAction @@ -380,10 +380,10 @@ trait StreamTest extends QueryTest with Timeouts { pos += 1 } } catch { - case _: InterruptedException if streamDeathCause != null => - failTest("Stream Thread Died") - case _: org.scalatest.exceptions.TestFailedDueToTimeoutException => - failTest("Timed out waiting for stream") + case e: InterruptedException if streamDeathCause != null => + failTest("Stream Thread Died", e) + case e: org.scalatest.exceptions.TestFailedDueToTimeoutException => + failTest("Timed out waiting for stream", e) } finally { if (currentStream != null && currentStream.microBatchThread.isAlive) { currentStream.stop() From 435201e39495e08037d48759ec3268bad63f4c93 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 22 Feb 2016 22:09:57 -0800 Subject: [PATCH 08/16] Better option error messages and unit tests; address other comments --- .../streaming/kinesis/DefaultSource.scala | 74 +++++--- .../kinesis/KinesisDataFetcher.scala | 29 ++-- .../streaming/kinesis/KinesisSource.scala | 32 ++-- .../streaming/kinesis/KinesisTestUtils.scala | 6 +- .../kinesis/KinesisSourceSuite.scala | 158 ++++++++++++------ .../org/apache/spark/sql/StreamTest.scala | 2 +- 6 files changed, 187 insertions(+), 114 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/DefaultSource.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/DefaultSource.scala index 3f8c74d4a05c1..58cdc79ee7b95 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/DefaultSource.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/DefaultSource.scala @@ -17,7 +17,11 @@ package org.apache.spark.streaming.kinesis +import com.amazonaws.ClientConfiguration +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.regions.RegionUtils import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.util.{AwsHostNameUtils, HttpUtils} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap @@ -35,15 +39,32 @@ class DefaultSource extends StreamSourceProvider with DataSourceRegister { providerName: String, parameters: Map[String, String]): Source = { val caseInsensitiveOptions = new CaseInsensitiveMap(parameters) - val regionName = caseInsensitiveOptions.getOrElse("regionName", { - throw new IllegalArgumentException("Option 'regionName' is not specified") - }) - val endpointUrl = caseInsensitiveOptions.getOrElse("endpointUrl", { - throw new IllegalArgumentException("Option 'endpointUrl' is not specified") - }) - val streamNames = caseInsensitiveOptions.getOrElse("streamNames", { - throw new IllegalArgumentException("Option 'streamNames' is not specified") - }).split(',').toSet + + val streams = caseInsensitiveOptions.getOrElse("streams", { + throw new IllegalArgumentException("Option 'streams' is not specified") + }).split(",", -1).toSet + if (streams.isEmpty || streams.exists(_.isEmpty)) { + throw new IllegalArgumentException( + "Option 'streams' is invalid. Please use comma separated string (e.g., 'stream1,stream2')") + } + + val regionOption = caseInsensitiveOptions.get("region") + val endpointOption = caseInsensitiveOptions.get("endpoint") + val (region, endpoint) = (regionOption, endpointOption) match { + case (Some(_region), Some(_endpoint)) => + if (RegionUtils.getRegionByEndpoint(_endpoint).getName != _region) { + throw new IllegalArgumentException( + s"'region'(${_region}) doesn't match to 'endpoint'(${_endpoint})") + } + (_region, _endpoint) + case (Some(_region), None) => + (_region, RegionUtils.getRegion(_region).getServiceEndpoint("kinesis")) + case (None, Some(_endpoint)) => + (RegionUtils.getRegionByEndpoint(_endpoint).getName, _endpoint) + case (None, None) => + throw new IllegalArgumentException("Either 'region' or 'endpoint' should be specified") + } + val initialPosInStream = caseInsensitiveOptions.getOrElse("initialPosInStream", "LATEST") match { case "LATEST" => InitialPositionInStream.LATEST @@ -52,28 +73,27 @@ class DefaultSource extends StreamSourceProvider with DataSourceRegister { throw new IllegalArgumentException(s"Unknown value of option initialPosInStream: $pos") } - if ((caseInsensitiveOptions.get("AWS_ACCESS_KEY_ID").nonEmpty - && caseInsensitiveOptions.get("AWS_SECRET_ACCESS_KEY").isEmpty)) { - throw new IllegalArgumentException( - s"AWS_ACCESS_KEY_ID is set but AWS_SECRET_ACCESS_KEY is not found") - } - if ((caseInsensitiveOptions.get("AWS_ACCESS_KEY_ID").isEmpty - && caseInsensitiveOptions.get("AWS_SECRET_ACCESS_KEY").nonEmpty)) { - throw new IllegalArgumentException( - s"AWS_SECRET_ACCESS_KEY is set but AWS_ACCESS_KEY_ID is not found") + val accessKeyOption = caseInsensitiveOptions.get("accessKey") + val secretKeyOption = caseInsensitiveOptions.get("secretKey") + val credentials = (accessKeyOption, secretKeyOption) match { + case (Some(accessKey), Some(secretKey)) => + new SerializableAWSCredentials(accessKey, secretKey) + case (Some(accessKey), None) => + throw new IllegalArgumentException( + s"'accessKey' is set but 'secretKey' is not found") + case (None, Some(secretKey)) => + throw new IllegalArgumentException( + s"'secretKey' is set but 'accessKey' is not found") + case (None, None) => + SerializableAWSCredentials(new DefaultAWSCredentialsProviderChain().getCredentials()) } - val awsCredentialsOption = - for (accessKeyId <- caseInsensitiveOptions.get("AWS_ACCESS_KEY_ID"); - secretKey <- caseInsensitiveOptions.get("AWS_SECRET_ACCESS_KEY")) - yield new SerializableAWSCredentials(accessKeyId, secretKey) new KinesisSource( sqlContext, - regionName, - endpointUrl, - streamNames, + region, + endpoint, + streams, initialPosInStream, - awsCredentialsOption - ) + credentials) } } diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala index 6318533843d6d..71e07b37ce77f 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala @@ -33,14 +33,16 @@ import org.apache.spark.storage.{BlockId, StorageLevel} * However, this class runs in the driver so could be a bottleneck. */ private[kinesis] class KinesisDataFetcher( - credentials: SerializableAWSCredentials, - endpointUrl: String, - regionName: String, - fromSeqNums: Seq[(Shard, Option[String])], - initialPositionInStream: InitialPositionInStream, - readTimeoutMs: Long = 2000L + credentials: SerializableAWSCredentials, + endpointUrl: String, + fromSeqNums: Seq[(Shard, Option[String])], + initialPositionInStream: InitialPositionInStream, + readTimeoutMs: Long = 2000L ) extends Serializable with Logging { + /** + * Use lazy because the client needs to be created in executors + */ @transient private lazy val client = new AmazonKinesisClient(credentials) def fetch(sc: SparkContext): Array[(BlockId, SequenceNumberRange)] = { @@ -50,9 +52,9 @@ private[kinesis] class KinesisDataFetcher( } private def fetchPartition( - shard: Shard, - fromSeqNum: Option[String]): Option[(BlockId, SequenceNumberRange)] = { - client.setEndpoint(endpointUrl, "kinesis", regionName) + shard: Shard, + fromSeqNum: Option[String]): Option[(BlockId, SequenceNumberRange)] = { + client.setEndpoint(endpointUrl) val endTime = System.currentTimeMillis + readTimeoutMs def timeLeft = math.max(endTime - System.currentTimeMillis, 0) @@ -112,8 +114,7 @@ private[kinesis] class KinesisDataFetcher( * Get the records starting from using a Kinesis shard iterator (which is a progress handle * to get records from Kinesis), and get the next shard iterator for next consumption. */ - private def getRecordsAndNextKinesisIterator( - shardIterator: String): (Seq[Record], String) = { + private def getRecordsAndNextKinesisIterator(shardIterator: String): (Seq[Record], String) = { val getRecordsRequest = new GetRecordsRequest().withShardIterator(shardIterator) getRecordsRequest.setRequestCredentials(credentials) val getRecordsResult = client.getRecords(getRecordsRequest) @@ -130,9 +131,9 @@ private[kinesis] class KinesisDataFetcher( * sequence number. */ private def getKinesisIterator( - shard: Shard, - iteratorType: ShardIteratorType, - sequenceNumber: String): String = { + shard: Shard, + iteratorType: ShardIteratorType, + sequenceNumber: String): String = { val getShardIteratorRequest = new GetShardIteratorRequest() .withStreamName(shard.streamName) .withShardId(shard.shardId) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala index 9108a3043b729..8adf3f86974c8 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala @@ -69,8 +69,8 @@ private[kinesis] class KinesisSource( regionName: String, endpointUrl: String, streamNames: Set[String], - initialPosInStream: InitialPositionInStream = InitialPositionInStream.LATEST, - awsCredentialsOption: Option[SerializableAWSCredentials] = None) extends Source { + initialPosInStream: InitialPositionInStream, + awsCredentials: SerializableAWSCredentials) extends Source { // How long we should wait before calling `fetchShards()`. Because DescribeStream has a limit of // 10 transactions per second per account, we should not request too frequently. @@ -81,20 +81,14 @@ private[kinesis] class KinesisSource( implicit private val encoder = ExpressionEncoder[Array[Byte]] - private val credentials = SerializableAWSCredentials( - awsCredentialsOption.getOrElse(new DefaultAWSCredentialsProviderChain().getCredentials()) - ) - - private val client = new AmazonKinesisClient(credentials) - client.setEndpoint(endpointUrl, "kinesis", regionName) + private val client = new AmazonKinesisClient(awsCredentials) + client.setEndpoint(endpointUrl) private var cachedBlocks = new mutable.HashSet[BlockId] override val schema: StructType = encoder.schema override def getNextBatch(start: Option[Offset]): Option[Batch] = { - val startOffset = start.map(_.asInstanceOf[KinesisSourceOffset]) - val now = System.currentTimeMillis() if (now - lastFetchShardsTimeMS < FETCH_SHARDS_INTERVAL_MS) { // Because DescribeStream has a limit of 10 transactions per second per account, we should not @@ -102,18 +96,17 @@ private[kinesis] class KinesisSource( return None } lastFetchShardsTimeMS = now + + val startOffset = start.map(_.asInstanceOf[KinesisSourceOffset]) val shards = fetchShards() // Get the starting seq number of each shard if available val fromSeqNums = shards.map { shard => shard -> startOffset.flatMap(_.seqNums.get(shard)) } /** Prefetch Kinesis data from the starting seq nums */ - val prefetchedData = new KinesisDataFetcher( - credentials, - endpointUrl, - regionName, - fromSeqNums, - initialPosInStream).fetch(sqlContext.sparkContext) + val prefetchedData = + new KinesisDataFetcher(awsCredentials, endpointUrl, fromSeqNums, initialPosInStream) + .fetch(sqlContext.sparkContext) if (prefetchedData.nonEmpty) { val prefetechedRanges = prefetchedData.map(_._2) @@ -169,6 +162,13 @@ private[kinesis] class KinesisSource( throw e1 } } + + override def toString: String = s"KinesisSource[" + + s"regionName=$regionName, " + + s"endpointUrl=$endpointUrl, " + + s"streamNames=${streamNames.mkString(",")}, " + + s"initialPosInStream=$initialPosInStream" + + s"]" } private[kinesis] object KinesisSource { diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 95e3e2f3e8f6d..0c5ab3293335c 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -54,7 +54,7 @@ private[kinesis] class KinesisTestUtils extends Logging { @volatile private var _streamName: String = _ - private val shadeIdToLatestSeqNum = mutable.HashMap[String, String]() + private val shardIdToLatestSeqNum = mutable.HashMap[String, String]() protected lazy val kinesisClient = { val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials()) @@ -108,12 +108,12 @@ private[kinesis] class KinesisTestUtils extends Logging { val shardIdToSeqNumbers = producer.sendData(streamName, testData) logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}") shardIdToSeqNumbers.foreach { case (shardId, seq) => - shadeIdToLatestSeqNum(shardId) = seq.last._2 + shardIdToLatestSeqNum(shardId) = seq.last._2 } shardIdToSeqNumbers.toMap } - def getLatestSeqNumsOfShards(): Map[String, String] = shadeIdToLatestSeqNum.toMap + def getLatestSeqNumsOfShards(): Map[String, String] = shardIdToLatestSeqNum.toMap /** * Expose a Python friendly API. diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala index fa02e0d3a47a2..d40f8e897f965 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.streaming.kinesis import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream -import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkEnv import org.apache.spark.sql.{AnalysisException, StreamTest} @@ -43,13 +42,23 @@ class KinesisSourceTest extends StreamTest with SharedSQLContext { override def source: Source = kinesisSource } + + def createKinesisSourceForTest(testUtils: KPLBasedKinesisTestUtils): KinesisSource = { + new KinesisSource( + sqlContext, + testUtils.regionName, + testUtils.endpointUrl, + Set(testUtils.streamName), + InitialPositionInStream.TRIM_HORIZON, + SerializableAWSCredentials(KinesisTestUtils.getAWSCredentials())) + } } class KinesisSourceSuite extends KinesisSourceTest with KinesisFunSuite { import testImplicits._ - testIfEnabled("basic receiving") { + testIfEnabled("basic receiving and failover") { var streamBlocksInLastBatch: Seq[StreamBlockId] = Seq.empty def assertStreamBlocks: Boolean = { @@ -64,12 +73,7 @@ class KinesisSourceSuite extends KinesisSourceTest with KinesisFunSuite { val testUtils = new KPLBasedKinesisTestUtils testUtils.createStream() try { - val kinesisSource = new KinesisSource( - sqlContext, - testUtils.regionName, - testUtils.endpointUrl, - Set(testUtils.streamName), - InitialPositionInStream.TRIM_HORIZON) + val kinesisSource = createKinesisSourceForTest(testUtils) val mapped = kinesisSource.toDS[Array[Byte]]().map((bytes: Array[Byte]) => new String(bytes).toInt + 1) val testData = 1 to 10 @@ -77,7 +81,9 @@ class KinesisSourceSuite extends KinesisSourceTest with KinesisFunSuite { AddKinesisData(testUtils, kinesisSource, testData), CheckAnswer((1 to 10).map(_ + 1): _*), Assert(assertStreamBlocks, "Old stream blocks should be cleaned"), + StopStream, AddKinesisData(testUtils, kinesisSource, 11 to 20), + StartStream, CheckAnswer((1 to 20).map(_ + 1): _*), Assert(assertStreamBlocks, "Old stream blocks should be cleaned"), AddKinesisData(testUtils, kinesisSource, 21 to 30), @@ -89,60 +95,114 @@ class KinesisSourceSuite extends KinesisSourceTest with KinesisFunSuite { } } - testIfEnabled("failover") { - val testUtils = new KPLBasedKinesisTestUtils - testUtils.createStream() - try { - val kinesisSource = new KinesisSource( - sqlContext, - testUtils.regionName, - testUtils.endpointUrl, - Set(testUtils.streamName), - InitialPositionInStream.TRIM_HORIZON) - val mapped = - kinesisSource.toDS[Array[Byte]]().map((bytes: Array[Byte]) => new String(bytes).toInt + 1) - testStream(mapped)( - AddKinesisData(testUtils, kinesisSource, 1 to 10), - CheckAnswer((1 to 10).map(_ + 1): _*), - StopStream, - AddKinesisData(testUtils, kinesisSource, 11 to 20), - StartStream, - CheckAnswer((1 to 20).map(_ + 1): _*), - AddKinesisData(testUtils, kinesisSource, 21 to 30), - CheckAnswer((1 to 30).map(_ + 1): _*) - ) - } finally { - testUtils.deleteStream() - } - } - testIfEnabled("DataFrameReader") { val testUtils = new KPLBasedKinesisTestUtils testUtils.createStream() try { val df = sqlContext.read - .option("regionName", testUtils.regionName) - .option("endpointUrl", testUtils.endpointUrl) - .option("streamNames", testUtils.streamName) - .option("initialPosInStream", "TRIM_HORIZON") + .option("region", testUtils.regionName) + .option("endpoint", testUtils.endpointUrl) + .option("streams", testUtils.streamName) + .option("initialPosInStream", InitialPositionInStream.TRIM_HORIZON.name()) .kinesis().stream() - val sources = df.queryExecution.analyzed - .collect { + val sources = df.queryExecution.analyzed.collect { case StreamingRelation(s: KinesisSource, _) => s } assert(sources.size === 1) + + // streams + assertExceptionAndMessage[IllegalArgumentException]("Option 'streams' is not specified") { + sqlContext.read.kinesis().stream() + } + assertExceptionAndMessage[IllegalArgumentException]( + "Option 'streams' is invalid. Please use comma separated string (e.g., 'stream1,stream2')" + ) { + sqlContext.read.option("streams", "").kinesis().stream() + } + assertExceptionAndMessage[IllegalArgumentException]( + "Option 'streams' is invalid. Please use comma separated string (e.g., 'stream1,stream2')" + ) { + sqlContext.read.option("streams", "a,").kinesis().stream() + } + assertExceptionAndMessage[IllegalArgumentException]( + "Option 'streams' is invalid. Please use comma separated string (e.g., 'stream1,stream2')" + ) { + sqlContext.read.option("streams", ",a").kinesis().stream() + } + + // region and endpoint + // Setting either endpoint or region is fine + sqlContext.read + .option("streams", testUtils.streamName) + .option("endpoint", testUtils.endpointUrl) + .kinesis().stream() + sqlContext.read + .option("streams", testUtils.streamName) + .option("region", testUtils.regionName) + .kinesis().stream() + + assertExceptionAndMessage[IllegalArgumentException]( + "Either 'region' or 'endpoint' should be specified") { + sqlContext.read.option("streams", testUtils.streamName).kinesis().stream() + } + assertExceptionAndMessage[IllegalArgumentException]( + s"'region'(invalid-region) doesn't match to 'endpoint'(${testUtils.endpointUrl})") { + sqlContext.read + .option("streams", testUtils.streamName) + .option("region", "invalid-region") + .option("endpoint", testUtils.endpointUrl) + .kinesis().stream() + } + + // initialPosInStream + assertExceptionAndMessage[IllegalArgumentException]( + "Unknown value of option initialPosInStream: invalid") { + sqlContext.read + .option("streams", testUtils.streamName) + .option("endpoint", testUtils.endpointUrl) + .option("initialPosInStream", "invalid") + .kinesis().stream() + } + + // accessKey and secretKey + assertExceptionAndMessage[IllegalArgumentException]( + "'accessKey' is set but 'secretKey' is not found") { + sqlContext.read + .option("streams", testUtils.streamName) + .option("endpoint", testUtils.endpointUrl) + .option("initialPosInStream", InitialPositionInStream.TRIM_HORIZON.name()) + .option("accessKey", "test") + .kinesis().stream() + } + assertExceptionAndMessage[IllegalArgumentException]( + "'secretKey' is set but 'accessKey' is not found") { + sqlContext.read + .option("streams", testUtils.streamName) + .option("endpoint", testUtils.endpointUrl) + .option("initialPosInStream", InitialPositionInStream.TRIM_HORIZON.name()) + .option("secretKey", "test") + .kinesis().stream() + } } finally { testUtils.deleteStream() } } testIfEnabled("call kinesis when not using stream") { - val e = intercept[AnalysisException] { + val expectedMessage = "org.apache.spark.streaming.kinesis.DefaultSource is " + + "neither a RelationProvider nor a FSBasedRelationProvider.;" + assertExceptionAndMessage[AnalysisException](expectedMessage) { sqlContext.read.kinesis().load() } - assert(e.getMessage === "org.apache.spark.streaming.kinesis.DefaultSource is " + - "neither a RelationProvider nor a FSBasedRelationProvider.;") + } + + private def assertExceptionAndMessage[T <: Exception : Manifest]( + expectedMessage: String)(body: => Unit): Unit = { + val e = intercept[T] { + body + } + assert(e.getMessage === expectedMessage) } } @@ -150,19 +210,11 @@ class KinesisSourceStressTestSuite extends KinesisSourceTest with KinesisFunSuit import testImplicits._ - override val streamingTimeout = 60.seconds - test("kinesis source stress test") { val testUtils = new KPLBasedKinesisTestUtils testUtils.createStream() try { - val kinesisSource = new KinesisSource( - sqlContext, - testUtils.regionName, - testUtils.endpointUrl, - Set(testUtils.streamName), - InitialPositionInStream.TRIM_HORIZON) - + val kinesisSource = createKinesisSourceForTest(testUtils) val ds = kinesisSource.toDS[String]().map(_.toInt + 1) runStressTest(ds, data => { AddKinesisData(testUtils, kinesisSource, data) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index 8a83eeff3f2ae..fbf6f6ba49600 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -71,7 +71,7 @@ trait StreamTest extends QueryTest with Timeouts { } /** How long to wait for an active stream to catch up when checking a result. */ - def streamingTimeout: Span = 10.seconds + val streamingTimeout = 10.seconds /** A trait for actions that can be performed while testing a streaming DataFrame. */ trait StreamAction From 3629cd95ce272eca7167fe946a7c8cb4a98a6675 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 23 Feb 2016 15:52:30 -0800 Subject: [PATCH 09/16] Address more --- .../streaming/kinesis/DefaultSource.scala | 38 +++++++++---- .../kinesis/KinesisDataFetcher.scala | 14 +++++ .../streaming/kinesis/KinesisSource.scala | 8 +-- .../kinesis/KinesisSourceSuite.scala | 55 ++++++++++--------- 4 files changed, 70 insertions(+), 45 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/DefaultSource.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/DefaultSource.scala index 58cdc79ee7b95..8be8dd7df5dc4 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/DefaultSource.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/DefaultSource.scala @@ -17,11 +17,10 @@ package org.apache.spark.streaming.kinesis -import com.amazonaws.ClientConfiguration +import com.amazonaws.AmazonClientException import com.amazonaws.auth.DefaultAWSCredentialsProviderChain import com.amazonaws.regions.RegionUtils import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream -import com.amazonaws.util.{AwsHostNameUtils, HttpUtils} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap @@ -40,12 +39,15 @@ class DefaultSource extends StreamSourceProvider with DataSourceRegister { parameters: Map[String, String]): Source = { val caseInsensitiveOptions = new CaseInsensitiveMap(parameters) - val streams = caseInsensitiveOptions.getOrElse("streams", { - throw new IllegalArgumentException("Option 'streams' is not specified") + val streams = caseInsensitiveOptions.getOrElse("stream", { + throw new IllegalArgumentException( + "Option 'stream' must be specified. Examples: " + + """option("stream", "stream1"), option("stream", "stream1,stream2")""") }).split(",", -1).toSet + if (streams.isEmpty || streams.exists(_.isEmpty)) { throw new IllegalArgumentException( - "Option 'streams' is invalid. Please use comma separated string (e.g., 'stream1,stream2')") + "Option 'stream' is invalid, as stream names cannot be empty.") } val regionOption = caseInsensitiveOptions.get("region") @@ -62,15 +64,20 @@ class DefaultSource extends StreamSourceProvider with DataSourceRegister { case (None, Some(_endpoint)) => (RegionUtils.getRegionByEndpoint(_endpoint).getName, _endpoint) case (None, None) => - throw new IllegalArgumentException("Either 'region' or 'endpoint' should be specified") + throw new IllegalArgumentException( + "Either option 'region' or option 'endpoint' must be specified. Examples: " + + """option("region", "us-west-2"), """ + + """option("endpoint", "https://kinesis.us-west-2.amazonaws.com")""") } val initialPosInStream = - caseInsensitiveOptions.getOrElse("initialPosInStream", "LATEST") match { - case "LATEST" => InitialPositionInStream.LATEST - case "TRIM_HORIZON" => InitialPositionInStream.TRIM_HORIZON + caseInsensitiveOptions.getOrElse("position", InitialPositionInStream.LATEST.name) match { + case pos if pos.toUpperCase == InitialPositionInStream.LATEST.name => + InitialPositionInStream.LATEST + case pos if pos.toUpperCase == InitialPositionInStream.TRIM_HORIZON.name => + InitialPositionInStream.TRIM_HORIZON case pos => - throw new IllegalArgumentException(s"Unknown value of option initialPosInStream: $pos") + throw new IllegalArgumentException(s"Unknown value of option 'position': $pos") } val accessKeyOption = caseInsensitiveOptions.get("accessKey") @@ -85,7 +92,16 @@ class DefaultSource extends StreamSourceProvider with DataSourceRegister { throw new IllegalArgumentException( s"'secretKey' is set but 'accessKey' is not found") case (None, None) => - SerializableAWSCredentials(new DefaultAWSCredentialsProviderChain().getCredentials()) + try { + SerializableAWSCredentials(new DefaultAWSCredentialsProviderChain().getCredentials()) + } catch { + case _: AmazonClientException => + throw new IllegalArgumentException( + "No credential found using default AWS provider chain. Specify credentials using " + + "options 'accessKey' and 'secretKey'. Examples: " + + """option("accessKey", "your-aws-access-key"), """ + + """option("secretKey", "your-aws-secret-key")""") + } } new KinesisSource( diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala index 71e07b37ce77f..5fc9c9765da0a 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala @@ -45,12 +45,26 @@ private[kinesis] class KinesisDataFetcher( */ @transient private lazy val client = new AmazonKinesisClient(credentials) + /** + * Launch a Spark job to fetch latest data from the specified `shard`s. This method will try to + * fetch arriving data in `readTimeoutMs` milliseconds so as to get the latest sequence numbers. + * New data will be pushed to the block manager to avoid fetching them again. + * + * This is a workaround since Kinesis doesn't provider an API to fetch the latest sequence number. + */ def fetch(sc: SparkContext): Array[(BlockId, SequenceNumberRange)] = { sc.makeRDD(fromSeqNums, fromSeqNums.size).map { case (shard, fromSeqNum) => fetchPartition(shard, fromSeqNum) }.collect().flatten } + /** + * Fetch latest data from the specified `shard` since `fromSeqNum`. This method will try to fetch + * arriving data in `readTimeoutMs` milliseconds so as to get the latest sequence number. New data + * will be pushed to the block manager to avoid fetching them again. + * + * This is a workaround since Kinesis doesn't provider an API to fetch the latest sequence number. + */ private def fetchPartition( shard: Shard, fromSeqNum: Option[String]): Option[(BlockId, SequenceNumberRange)] = { diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala index 8adf3f86974c8..4e8199d5d3c09 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala @@ -23,7 +23,6 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import com.amazonaws.AbortedException -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream @@ -163,12 +162,7 @@ private[kinesis] class KinesisSource( } } - override def toString: String = s"KinesisSource[" + - s"regionName=$regionName, " + - s"endpointUrl=$endpointUrl, " + - s"streamNames=${streamNames.mkString(",")}, " + - s"initialPosInStream=$initialPosInStream" + - s"]" + override def toString: String = s"KinesisSource[streamNames=${streamNames.mkString(",")}]" } private[kinesis] object KinesisSource { diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala index d40f8e897f965..08c527fd3b772 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala @@ -102,8 +102,8 @@ class KinesisSourceSuite extends KinesisSourceTest with KinesisFunSuite { val df = sqlContext.read .option("region", testUtils.regionName) .option("endpoint", testUtils.endpointUrl) - .option("streams", testUtils.streamName) - .option("initialPosInStream", InitialPositionInStream.TRIM_HORIZON.name()) + .option("stream", testUtils.streamName) + .option("position", InitialPositionInStream.TRIM_HORIZON.name()) .kinesis().stream() val sources = df.queryExecution.analyzed.collect { @@ -111,57 +111,58 @@ class KinesisSourceSuite extends KinesisSourceTest with KinesisFunSuite { } assert(sources.size === 1) - // streams - assertExceptionAndMessage[IllegalArgumentException]("Option 'streams' is not specified") { + // stream + assertExceptionAndMessage[IllegalArgumentException]( + "Option 'stream' must be specified. Examples: " + + """option("stream", "stream1"), option("stream", "stream1,stream2")""") { sqlContext.read.kinesis().stream() } assertExceptionAndMessage[IllegalArgumentException]( - "Option 'streams' is invalid. Please use comma separated string (e.g., 'stream1,stream2')" - ) { - sqlContext.read.option("streams", "").kinesis().stream() + "Option 'stream' is invalid, as stream names cannot be empty.") { + sqlContext.read.option("stream", "").kinesis().stream() } assertExceptionAndMessage[IllegalArgumentException]( - "Option 'streams' is invalid. Please use comma separated string (e.g., 'stream1,stream2')" - ) { - sqlContext.read.option("streams", "a,").kinesis().stream() + "Option 'stream' is invalid, as stream names cannot be empty.") { + sqlContext.read.option("stream", "a,").kinesis().stream() } assertExceptionAndMessage[IllegalArgumentException]( - "Option 'streams' is invalid. Please use comma separated string (e.g., 'stream1,stream2')" - ) { - sqlContext.read.option("streams", ",a").kinesis().stream() + "Option 'stream' is invalid, as stream names cannot be empty.") { + sqlContext.read.option("stream", ",a").kinesis().stream() } // region and endpoint // Setting either endpoint or region is fine sqlContext.read - .option("streams", testUtils.streamName) + .option("stream", testUtils.streamName) .option("endpoint", testUtils.endpointUrl) .kinesis().stream() sqlContext.read - .option("streams", testUtils.streamName) + .option("stream", testUtils.streamName) .option("region", testUtils.regionName) .kinesis().stream() assertExceptionAndMessage[IllegalArgumentException]( - "Either 'region' or 'endpoint' should be specified") { - sqlContext.read.option("streams", testUtils.streamName).kinesis().stream() + "Either option 'region' or option 'endpoint' must be specified. Examples: " + + """option("region", "us-west-2"), """ + + """option("endpoint", "https://kinesis.us-west-2.amazonaws.com")""") { + sqlContext.read.option("stream", testUtils.streamName).kinesis().stream() } assertExceptionAndMessage[IllegalArgumentException]( s"'region'(invalid-region) doesn't match to 'endpoint'(${testUtils.endpointUrl})") { sqlContext.read - .option("streams", testUtils.streamName) + .option("stream", testUtils.streamName) .option("region", "invalid-region") .option("endpoint", testUtils.endpointUrl) .kinesis().stream() } - // initialPosInStream + // position assertExceptionAndMessage[IllegalArgumentException]( - "Unknown value of option initialPosInStream: invalid") { + "Unknown value of option 'position': invalid") { sqlContext.read - .option("streams", testUtils.streamName) + .option("stream", testUtils.streamName) .option("endpoint", testUtils.endpointUrl) - .option("initialPosInStream", "invalid") + .option("position", "invalid") .kinesis().stream() } @@ -169,18 +170,18 @@ class KinesisSourceSuite extends KinesisSourceTest with KinesisFunSuite { assertExceptionAndMessage[IllegalArgumentException]( "'accessKey' is set but 'secretKey' is not found") { sqlContext.read - .option("streams", testUtils.streamName) + .option("stream", testUtils.streamName) .option("endpoint", testUtils.endpointUrl) - .option("initialPosInStream", InitialPositionInStream.TRIM_HORIZON.name()) + .option("position", InitialPositionInStream.TRIM_HORIZON.name()) .option("accessKey", "test") .kinesis().stream() } assertExceptionAndMessage[IllegalArgumentException]( "'secretKey' is set but 'accessKey' is not found") { sqlContext.read - .option("streams", testUtils.streamName) + .option("stream", testUtils.streamName) .option("endpoint", testUtils.endpointUrl) - .option("initialPosInStream", InitialPositionInStream.TRIM_HORIZON.name()) + .option("position", InitialPositionInStream.TRIM_HORIZON.name()) .option("secretKey", "test") .kinesis().stream() } @@ -210,7 +211,7 @@ class KinesisSourceStressTestSuite extends KinesisSourceTest with KinesisFunSuit import testImplicits._ - test("kinesis source stress test") { + testIfEnabled("kinesis source stress test") { val testUtils = new KPLBasedKinesisTestUtils testUtils.createStream() try { From d3e7887d8eadd8615891599a4b705da02d3d3c2c Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 23 Feb 2016 22:05:10 -0800 Subject: [PATCH 10/16] Fix the block id and generate block id in the driver --- .../streaming/kinesis/KinesisDataFetcher.scala | 10 +++++----- .../spark/streaming/kinesis/KinesisSource.scala | 7 ++++--- .../streaming/kinesis/KinesisSourceSuite.scala | 16 ++++++++++------ 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala index 5fc9c9765da0a..f138cfe566634 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala @@ -25,7 +25,7 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord import com.amazonaws.services.kinesis.model._ -import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException} +import org.apache.spark._ import org.apache.spark.network.util.JavaUtils import org.apache.spark.storage.{BlockId, StorageLevel} @@ -35,7 +35,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel} private[kinesis] class KinesisDataFetcher( credentials: SerializableAWSCredentials, endpointUrl: String, - fromSeqNums: Seq[(Shard, Option[String])], + fromSeqNums: Seq[(Shard, Option[String], BlockId)], initialPositionInStream: InitialPositionInStream, readTimeoutMs: Long = 2000L ) extends Serializable with Logging { @@ -54,7 +54,7 @@ private[kinesis] class KinesisDataFetcher( */ def fetch(sc: SparkContext): Array[(BlockId, SequenceNumberRange)] = { sc.makeRDD(fromSeqNums, fromSeqNums.size).map { - case (shard, fromSeqNum) => fetchPartition(shard, fromSeqNum) + case (shard, fromSeqNum, blockId) => fetchPartition(shard, fromSeqNum, blockId) }.collect().flatten } @@ -67,7 +67,8 @@ private[kinesis] class KinesisDataFetcher( */ private def fetchPartition( shard: Shard, - fromSeqNum: Option[String]): Option[(BlockId, SequenceNumberRange)] = { + fromSeqNum: Option[String], + blockId: BlockId): Option[(BlockId, SequenceNumberRange)] = { client.setEndpoint(endpointUrl) val endTime = System.currentTimeMillis + readTimeoutMs @@ -108,7 +109,6 @@ private[kinesis] class KinesisDataFetcher( } if (buffer.nonEmpty) { - val blockId = KinesisSource.nextBlockId SparkEnv.get.blockManager.putIterator(blockId, buffer.iterator, StorageLevel.MEMORY_ONLY) val range = SequenceNumberRange( shard.streamName, shard.shardId, firstSeqNumber, lastSeqNumber) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala index 4e8199d5d3c09..e44e9eb776372 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala @@ -104,8 +104,9 @@ private[kinesis] class KinesisSource( /** Prefetch Kinesis data from the starting seq nums */ val prefetchedData = - new KinesisDataFetcher(awsCredentials, endpointUrl, fromSeqNums, initialPosInStream) - .fetch(sqlContext.sparkContext) + new KinesisDataFetcher(awsCredentials, endpointUrl, fromSeqNums.map { case (shard, seqNum) => + (shard, seqNum, KinesisSource.nextBlockId) + }, initialPosInStream).fetch(sqlContext.sparkContext) if (prefetchedData.nonEmpty) { val prefetechedRanges = prefetchedData.map(_._2) @@ -169,5 +170,5 @@ private[kinesis] object KinesisSource { private val nextId = new AtomicLong(0) - def nextBlockId: StreamBlockId = StreamBlockId(-1, nextId.getAndIncrement) + def nextBlockId: StreamBlockId = StreamBlockId(Int.MaxValue, nextId.getAndIncrement) } diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala index 08c527fd3b772..6affe0a94ea1f 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala @@ -62,12 +62,16 @@ class KinesisSourceSuite extends KinesisSourceTest with KinesisFunSuite { var streamBlocksInLastBatch: Seq[StreamBlockId] = Seq.empty def assertStreamBlocks: Boolean = { - // Assume the test runs in local mode and there is only one BlockManager. - val streamBlocks = - SparkEnv.get.blockManager.getMatchingBlockIds(_.isInstanceOf[StreamBlockId]) - val cleaned = streamBlocks.intersect(streamBlocksInLastBatch).isEmpty - streamBlocksInLastBatch = streamBlocks.map(_.asInstanceOf[StreamBlockId]) - cleaned + if (sqlContext.sparkContext.isLocal) { + // Only test this one in local mode so that we can assume there is only one BlockManager + val streamBlocks = + SparkEnv.get.blockManager.getMatchingBlockIds(_.isInstanceOf[StreamBlockId]) + val cleaned = streamBlocks.intersect(streamBlocksInLastBatch).isEmpty + streamBlocksInLastBatch = streamBlocks.map(_.asInstanceOf[StreamBlockId]) + cleaned + } else { + true + } } val testUtils = new KPLBasedKinesisTestUtils From e058a64773fcf3dbd17b5a6d45e1d462980458c1 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 24 Feb 2016 11:38:31 -0800 Subject: [PATCH 11/16] Refactor --- .../spark/streaming/kinesis/KinesisSource.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala index e44e9eb776372..77c6a70235686 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisSource.scala @@ -102,11 +102,19 @@ private[kinesis] class KinesisSource( // Get the starting seq number of each shard if available val fromSeqNums = shards.map { shard => shard -> startOffset.flatMap(_.seqNums.get(shard)) } - /** Prefetch Kinesis data from the starting seq nums */ + // Assign a unique block id for each shard + val fromSeqNumsWithBlockId = fromSeqNums.map { case (shard, seqNum) => + val uniqueBlockId = KinesisSource.nextBlockId + (shard, seqNum, uniqueBlockId) + } + + // Prefetch Kinesis data from the starting seq nums val prefetchedData = - new KinesisDataFetcher(awsCredentials, endpointUrl, fromSeqNums.map { case (shard, seqNum) => - (shard, seqNum, KinesisSource.nextBlockId) - }, initialPosInStream).fetch(sqlContext.sparkContext) + new KinesisDataFetcher( + awsCredentials, + endpointUrl, + fromSeqNumsWithBlockId, + initialPosInStream).fetch(sqlContext.sparkContext) if (prefetchedData.nonEmpty) { val prefetechedRanges = prefetchedData.map(_._2) From aef40275300e6a47112849582be47da78954b1ae Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 24 Feb 2016 11:43:13 -0800 Subject: [PATCH 12/16] Add abstract --- .../org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala index 6affe0a94ea1f..9aab481d1ecd6 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.streaming.{Offset, Source, StreamingRelati import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.storage.StreamBlockId -class KinesisSourceTest extends StreamTest with SharedSQLContext { +abstract class KinesisSourceTest extends StreamTest with SharedSQLContext { case class AddKinesisData( testUtils: KPLBasedKinesisTestUtils, From f85608f180ca3182eb44b0958d0ef440fa257437 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 24 Feb 2016 11:54:11 -0800 Subject: [PATCH 13/16] Downgrade KCL to 1.4.0 --- extras/kinesis-asl-assembly/pom.xml | 6 ++++++ pom.xml | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml index d1c38c7ca5d69..911b00e2b579f 100644 --- a/extras/kinesis-asl-assembly/pom.xml +++ b/extras/kinesis-asl-assembly/pom.xml @@ -47,6 +47,12 @@ ${project.version} provided + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + provided + diff --git a/pom.xml b/pom.xml index 244355b080221..88b9ab2a37ce3 100644 --- a/pom.xml +++ b/pom.xml @@ -154,9 +154,9 @@ 1.7.7 hadoop2 0.7.1 - 1.6.1 + 1.4.0 - 0.10.2 + 0.10.1 4.3.2 From 2ae6285def32d824944f870e8fa357612065ca18 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 24 Feb 2016 12:30:30 -0800 Subject: [PATCH 14/16] Address comments for tests --- .../streaming/kinesis/KinesisTestUtils.scala | 4 +- .../kinesis/KinesisSourceSuite.scala | 174 +++++++++--------- 2 files changed, 87 insertions(+), 91 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 0c5ab3293335c..8d32c5c5aee64 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -42,7 +42,7 @@ import org.apache.spark.Logging private[kinesis] class KinesisTestUtils extends Logging { val endpointUrl = KinesisTestUtils.endpointUrl - val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName() + val regionName = KinesisTestUtils.regionName val streamShardCount = 2 private val createStreamTimeoutSeconds = 300 @@ -216,6 +216,8 @@ private[kinesis] object KinesisTestUtils { url } + lazy val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName() + def isAWSCredentialsPresent: Boolean = { Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess } diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala index 9aab481d1ecd6..a3cf02b3d96f3 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala @@ -99,105 +99,99 @@ class KinesisSourceSuite extends KinesisSourceTest with KinesisFunSuite { } } - testIfEnabled("DataFrameReader") { - val testUtils = new KPLBasedKinesisTestUtils - testUtils.createStream() - try { - val df = sqlContext.read - .option("region", testUtils.regionName) - .option("endpoint", testUtils.endpointUrl) - .option("stream", testUtils.streamName) - .option("position", InitialPositionInStream.TRIM_HORIZON.name()) - .kinesis().stream() - - val sources = df.queryExecution.analyzed.collect { - case StreamingRelation(s: KinesisSource, _) => s - } - assert(sources.size === 1) - - // stream - assertExceptionAndMessage[IllegalArgumentException]( - "Option 'stream' must be specified. Examples: " + - """option("stream", "stream1"), option("stream", "stream1,stream2")""") { - sqlContext.read.kinesis().stream() - } - assertExceptionAndMessage[IllegalArgumentException]( - "Option 'stream' is invalid, as stream names cannot be empty.") { - sqlContext.read.option("stream", "").kinesis().stream() - } - assertExceptionAndMessage[IllegalArgumentException]( - "Option 'stream' is invalid, as stream names cannot be empty.") { - sqlContext.read.option("stream", "a,").kinesis().stream() - } - assertExceptionAndMessage[IllegalArgumentException]( - "Option 'stream' is invalid, as stream names cannot be empty.") { - sqlContext.read.option("stream", ",a").kinesis().stream() + test("DataFrameReader") { + val df = sqlContext.read + .option("endpoint", KinesisTestUtils.endpointUrl) + .option("stream", "stream1") + .option("accessKey", "accessKey") + .option("secretKey", "secretKey") + .option("position", InitialPositionInStream.TRIM_HORIZON.name()) + .kinesis().stream() + + val sources = df.queryExecution.analyzed.collect { + case StreamingRelation(s: KinesisSource, _) => s } + assert(sources.size === 1) - // region and endpoint - // Setting either endpoint or region is fine + // stream + assertExceptionAndMessage[IllegalArgumentException]( + "Option 'stream' must be specified.") { + sqlContext.read.kinesis().stream() + } + assertExceptionAndMessage[IllegalArgumentException]( + "Option 'stream' is invalid, as stream names cannot be empty.") { + sqlContext.read.option("stream", "").kinesis().stream() + } + assertExceptionAndMessage[IllegalArgumentException]( + "Option 'stream' is invalid, as stream names cannot be empty.") { + sqlContext.read.option("stream", "a,").kinesis().stream() + } + assertExceptionAndMessage[IllegalArgumentException]( + "Option 'stream' is invalid, as stream names cannot be empty.") { + sqlContext.read.option("stream", ",a").kinesis().stream() + } + + // region and endpoint + // Setting either endpoint or region is fine + sqlContext.read + .option("stream", "stream1") + .option("endpoint", KinesisTestUtils.endpointUrl) + .option("accessKey", "accessKey") + .option("secretKey", "secretKey") + .kinesis().stream() + sqlContext.read + .option("stream", "stream1") + .option("region", KinesisTestUtils.regionName) + .option("accessKey", "accessKey") + .option("secretKey", "secretKey") + .kinesis().stream() + + assertExceptionAndMessage[IllegalArgumentException]( + "Either option 'region' or option 'endpoint' must be specified.") { + sqlContext.read.option("stream", "stream1").kinesis().stream() + } + assertExceptionAndMessage[IllegalArgumentException]( + s"'region'(invalid-region) doesn't match to 'endpoint'(${KinesisTestUtils.endpointUrl})") { sqlContext.read - .option("stream", testUtils.streamName) - .option("endpoint", testUtils.endpointUrl) + .option("stream", "stream1") + .option("region", "invalid-region") + .option("endpoint", KinesisTestUtils.endpointUrl) .kinesis().stream() + } + + // position + assertExceptionAndMessage[IllegalArgumentException]( + "Unknown value of option 'position': invalid") { sqlContext.read - .option("stream", testUtils.streamName) - .option("region", testUtils.regionName) + .option("stream", "stream1") + .option("endpoint", KinesisTestUtils.endpointUrl) + .option("position", "invalid") .kinesis().stream() + } - assertExceptionAndMessage[IllegalArgumentException]( - "Either option 'region' or option 'endpoint' must be specified. Examples: " + - """option("region", "us-west-2"), """ + - """option("endpoint", "https://kinesis.us-west-2.amazonaws.com")""") { - sqlContext.read.option("stream", testUtils.streamName).kinesis().stream() - } - assertExceptionAndMessage[IllegalArgumentException]( - s"'region'(invalid-region) doesn't match to 'endpoint'(${testUtils.endpointUrl})") { - sqlContext.read - .option("stream", testUtils.streamName) - .option("region", "invalid-region") - .option("endpoint", testUtils.endpointUrl) - .kinesis().stream() - } - - // position - assertExceptionAndMessage[IllegalArgumentException]( - "Unknown value of option 'position': invalid") { - sqlContext.read - .option("stream", testUtils.streamName) - .option("endpoint", testUtils.endpointUrl) - .option("position", "invalid") - .kinesis().stream() - } - - // accessKey and secretKey - assertExceptionAndMessage[IllegalArgumentException]( - "'accessKey' is set but 'secretKey' is not found") { - sqlContext.read - .option("stream", testUtils.streamName) - .option("endpoint", testUtils.endpointUrl) - .option("position", InitialPositionInStream.TRIM_HORIZON.name()) - .option("accessKey", "test") - .kinesis().stream() - } - assertExceptionAndMessage[IllegalArgumentException]( - "'secretKey' is set but 'accessKey' is not found") { - sqlContext.read - .option("stream", testUtils.streamName) - .option("endpoint", testUtils.endpointUrl) - .option("position", InitialPositionInStream.TRIM_HORIZON.name()) - .option("secretKey", "test") - .kinesis().stream() - } - } finally { - testUtils.deleteStream() + // accessKey and secretKey + assertExceptionAndMessage[IllegalArgumentException]( + "'accessKey' is set but 'secretKey' is not found") { + sqlContext.read + .option("stream", "stream1") + .option("endpoint", KinesisTestUtils.endpointUrl) + .option("position", InitialPositionInStream.TRIM_HORIZON.name()) + .option("accessKey", "test") + .kinesis().stream() + } + assertExceptionAndMessage[IllegalArgumentException]( + "'secretKey' is set but 'accessKey' is not found") { + sqlContext.read + .option("stream", "stream1") + .option("endpoint", KinesisTestUtils.endpointUrl) + .option("position", InitialPositionInStream.TRIM_HORIZON.name()) + .option("secretKey", "test") + .kinesis().stream() } } - testIfEnabled("call kinesis when not using stream") { - val expectedMessage = "org.apache.spark.streaming.kinesis.DefaultSource is " + - "neither a RelationProvider nor a FSBasedRelationProvider.;" - assertExceptionAndMessage[AnalysisException](expectedMessage) { + test("call kinesis when not using stream") { + intercept[AnalysisException] { sqlContext.read.kinesis().load() } } @@ -207,7 +201,7 @@ class KinesisSourceSuite extends KinesisSourceTest with KinesisFunSuite { val e = intercept[T] { body } - assert(e.getMessage === expectedMessage) + assert(e.getMessage.contains(expectedMessage)) } } From b55260ec378cdc532f15d600b46d8c4c5e22539e Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 24 Feb 2016 12:51:28 -0800 Subject: [PATCH 15/16] Fix indent --- .../apache/spark/streaming/kinesis/KinesisSourceSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala index a3cf02b3d96f3..29336051d089d 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisSourceSuite.scala @@ -109,8 +109,8 @@ class KinesisSourceSuite extends KinesisSourceTest with KinesisFunSuite { .kinesis().stream() val sources = df.queryExecution.analyzed.collect { - case StreamingRelation(s: KinesisSource, _) => s - } + case StreamingRelation(s: KinesisSource, _) => s + } assert(sources.size === 1) // stream From 8ac76ca29429d01563908b3a6612e88774842993 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 24 Feb 2016 12:53:03 -0800 Subject: [PATCH 16/16] Fix indent --- .../apache/spark/streaming/kinesis/KinesisDataFetcher.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala index f138cfe566634..90d2e2bd5471a 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDataFetcher.scala @@ -37,8 +37,7 @@ private[kinesis] class KinesisDataFetcher( endpointUrl: String, fromSeqNums: Seq[(Shard, Option[String], BlockId)], initialPositionInStream: InitialPositionInStream, - readTimeoutMs: Long = 2000L -) extends Serializable with Logging { + readTimeoutMs: Long = 2000L) extends Serializable with Logging { /** * Use lazy because the client needs to be created in executors