From 403d8740dc66c1cf626c813e40fcc38de60625ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Wed, 9 Dec 2015 18:33:44 +0100 Subject: [PATCH 1/3] SPARK-11193 - Use a Java ConcurrentHashMap instead of the SynchronizedMap trait in order to avoid Kryo serializer issue --- .../streaming/kinesis/KinesisReceiver.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 05080835fc4ad..02999d2addfdc 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.kinesis import java.util.UUID +import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable @@ -124,8 +125,7 @@ private[kinesis] class KinesisReceiver[T]( private val seqNumRangesInCurrentBlock = new mutable.ArrayBuffer[SequenceNumberRange] /** Sequence number ranges of data added to each generated block */ - private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId, SequenceNumberRanges] - with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges] + private val blockIdToSeqNumRanges = new ConcurrentHashMap[StreamBlockId, SequenceNumberRanges] /** * The centralized kinesisCheckpointer that checkpoints based on the given checkpointInterval. @@ -135,8 +135,8 @@ private[kinesis] class KinesisReceiver[T]( /** * Latest sequence number ranges that have been stored successfully. * This is used for checkpointing through KCL */ - private val shardIdToLatestStoredSeqNum = new mutable.HashMap[String, String] - with mutable.SynchronizedMap[String, String] + private val shardIdToLatestStoredSeqNum = new ConcurrentHashMap[String, String] + /** * This is called when the KinesisReceiver starts and must be non-blocking. * The KCL creates and manages the receiving/processing thread pool through Worker.run(). @@ -222,7 +222,7 @@ private[kinesis] class KinesisReceiver[T]( /** Get the latest sequence number for the given shard that can be checkpointed through KCL */ private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = { - shardIdToLatestStoredSeqNum.get(shardId) + return Option[String]{ shardIdToLatestStoredSeqNum.get(shardId) } } /** @@ -257,7 +257,7 @@ private[kinesis] class KinesisReceiver[T]( * for next block. Internally, this is synchronized with `rememberAddedRange()`. */ private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = { - blockIdToSeqNumRanges(blockId) = SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray) + blockIdToSeqNumRanges.put(blockId, SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray)) seqNumRangesInCurrentBlock.clear() logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges") } @@ -265,7 +265,7 @@ private[kinesis] class KinesisReceiver[T]( /** Store the block along with its associated ranges */ private def storeBlockWithRanges( blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[T]): Unit = { - val rangesToReportOption = blockIdToSeqNumRanges.remove(blockId) + val rangesToReportOption = Option[SequenceNumberRanges]{ blockIdToSeqNumRanges.remove(blockId) } if (rangesToReportOption.isEmpty) { stop("Error while storing block into Spark, could not find sequence number ranges " + s"for block $blockId") @@ -294,7 +294,7 @@ private[kinesis] class KinesisReceiver[T]( // Note that we are doing this sequentially because the array of sequence number ranges // is assumed to be rangesToReport.ranges.foreach { range => - shardIdToLatestStoredSeqNum(range.shardId) = range.toSeqNumber + shardIdToLatestStoredSeqNum.put(range.shardId, range.toSeqNumber) } } From 61221af56a9629761af6152808a8ea1f4772919d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Wed, 9 Dec 2015 18:48:51 +0100 Subject: [PATCH 2/3] SPARK-11193 - Improve Option usage --- .../org/apache/spark/streaming/kinesis/KinesisReceiver.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 02999d2addfdc..59fc5cc3fb7a4 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -222,7 +222,7 @@ private[kinesis] class KinesisReceiver[T]( /** Get the latest sequence number for the given shard that can be checkpointed through KCL */ private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = { - return Option[String]{ shardIdToLatestStoredSeqNum.get(shardId) } + return Option(shardIdToLatestStoredSeqNum.get(shardId)) } /** @@ -265,7 +265,7 @@ private[kinesis] class KinesisReceiver[T]( /** Store the block along with its associated ranges */ private def storeBlockWithRanges( blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[T]): Unit = { - val rangesToReportOption = Option[SequenceNumberRanges]{ blockIdToSeqNumRanges.remove(blockId) } + val rangesToReportOption = Option(blockIdToSeqNumRanges.remove(blockId)) if (rangesToReportOption.isEmpty) { stop("Error while storing block into Spark, could not find sequence number ranges " + s"for block $blockId") From 5cec00777800882d0c7d1a5dd6fb8e7bf302a116 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Wed, 9 Dec 2015 19:10:07 +0100 Subject: [PATCH 3/3] SPARK-11193 - Cleanup return keyword in getLatestSeqNumToCheckpoint() function --- .../org/apache/spark/streaming/kinesis/KinesisReceiver.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 59fc5cc3fb7a4..80edda59e1719 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -222,7 +222,7 @@ private[kinesis] class KinesisReceiver[T]( /** Get the latest sequence number for the given shard that can be checkpointed through KCL */ private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = { - return Option(shardIdToLatestStoredSeqNum.get(shardId)) + Option(shardIdToLatestStoredSeqNum.get(shardId)) } /**