1717package org .apache .spark .streaming .kinesis
1818
1919import java .util .UUID
20+ import java .util .concurrent .ConcurrentHashMap
2021
2122import scala .collection .JavaConverters ._
2223import scala .collection .mutable
@@ -124,8 +125,7 @@ private[kinesis] class KinesisReceiver[T](
124125 private val seqNumRangesInCurrentBlock = new mutable.ArrayBuffer [SequenceNumberRange ]
125126
126127 /** Sequence number ranges of data added to each generated block */
127- private val blockIdToSeqNumRanges = new mutable.HashMap [StreamBlockId , SequenceNumberRanges ]
128- with mutable.SynchronizedMap [StreamBlockId , SequenceNumberRanges ]
128+ private val blockIdToSeqNumRanges = new ConcurrentHashMap [StreamBlockId , SequenceNumberRanges ]
129129
130130 /**
131131 * The centralized kinesisCheckpointer that checkpoints based on the given checkpointInterval.
@@ -135,8 +135,8 @@ private[kinesis] class KinesisReceiver[T](
135135 /**
136136 * Latest sequence number ranges that have been stored successfully.
137137 * This is used for checkpointing through KCL */
138- private val shardIdToLatestStoredSeqNum = new mutable. HashMap [String , String ]
139- with mutable. SynchronizedMap [ String , String ]
138+ private val shardIdToLatestStoredSeqNum = new ConcurrentHashMap [String , String ]
139+
140140 /**
141141 * This is called when the KinesisReceiver starts and must be non-blocking.
142142 * The KCL creates and manages the receiving/processing thread pool through Worker.run().
@@ -222,7 +222,7 @@ private[kinesis] class KinesisReceiver[T](
222222
223223 /** Get the latest sequence number for the given shard that can be checkpointed through KCL */
224224 private [kinesis] def getLatestSeqNumToCheckpoint (shardId : String ): Option [String ] = {
225- shardIdToLatestStoredSeqNum.get(shardId)
225+ Option ( shardIdToLatestStoredSeqNum.get(shardId) )
226226 }
227227
228228 /**
@@ -257,15 +257,15 @@ private[kinesis] class KinesisReceiver[T](
257257 * for next block. Internally, this is synchronized with `rememberAddedRange()`.
258258 */
259259 private def finalizeRangesForCurrentBlock (blockId : StreamBlockId ): Unit = {
260- blockIdToSeqNumRanges(blockId) = SequenceNumberRanges (seqNumRangesInCurrentBlock.toArray)
260+ blockIdToSeqNumRanges.put (blockId, SequenceNumberRanges (seqNumRangesInCurrentBlock.toArray) )
261261 seqNumRangesInCurrentBlock.clear()
262262 logDebug(s " Generated block $blockId has $blockIdToSeqNumRanges" )
263263 }
264264
265265 /** Store the block along with its associated ranges */
266266 private def storeBlockWithRanges (
267267 blockId : StreamBlockId , arrayBuffer : mutable.ArrayBuffer [T ]): Unit = {
268- val rangesToReportOption = blockIdToSeqNumRanges.remove(blockId)
268+ val rangesToReportOption = Option ( blockIdToSeqNumRanges.remove(blockId) )
269269 if (rangesToReportOption.isEmpty) {
270270 stop(" Error while storing block into Spark, could not find sequence number ranges " +
271271 s " for block $blockId" )
@@ -294,7 +294,7 @@ private[kinesis] class KinesisReceiver[T](
294294 // Note that we are doing this sequentially because the array of sequence number ranges
295295 // is assumed to be
296296 rangesToReport.ranges.foreach { range =>
297- shardIdToLatestStoredSeqNum(range.shardId) = range.toSeqNumber
297+ shardIdToLatestStoredSeqNum.put (range.shardId, range.toSeqNumber)
298298 }
299299 }
300300
0 commit comments