From ec36a0ee4c129c60c6d4a511ee807dc21f292b4a Mon Sep 17 00:00:00 2001 From: uncleGen Date: Tue, 22 Nov 2016 22:36:23 +0800 Subject: [PATCH 1/3] SPARK-18560: Receiver data can not be dataSerialized properly. --- .../streaming/JavaCustomReceiver.java | 3 +- .../kafka/ReliableKafkaReceiver.scala | 11 +-- .../streaming/kinesis/KinesisReceiver.scala | 21 ++++-- .../streaming/receiver/BlockGenerator.scala | 34 +++++---- .../streaming/receiver/ReceivedBlock.scala | 12 ++-- .../receiver/ReceivedBlockHandler.scala | 17 ++--- .../spark/streaming/receiver/Receiver.scala | 16 +++-- .../receiver/ReceiverSupervisor.scala | 13 ++-- .../receiver/ReceiverSupervisorImpl.scala | 27 +++---- .../streaming/scheduler/ReceiverTracker.scala | 18 +++-- .../spark/streaming/JavaReceiverAPISuite.java | 2 +- .../streaming/ReceivedBlockHandlerSuite.scala | 72 ++++++++++--------- .../spark/streaming/ReceiverSuite.scala | 24 ++++--- .../receiver/BlockGeneratorSuite.scala | 33 +++++---- .../scheduler/ReceiverTrackerSuite.scala | 7 +- 15 files changed, 191 insertions(+), 119 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java index e20b94d5b03f2..046f644edc566 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java @@ -20,6 +20,7 @@ import com.google.common.io.Closeables; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; @@ -103,7 +104,7 @@ public Integer call(Integer i1, Integer i2) { int port = -1; public JavaCustomReceiver(String host_ , int port_) { - super(StorageLevel.MEMORY_AND_DISK_2()); + super(String.class, StorageLevel.MEMORY_AND_DISK_2()); host = host_; port = port_; } diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index 39abe3c3e29d0..c821d66c85dfc 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -21,6 +21,7 @@ import java.util.Properties import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor} import scala.collection.{mutable, Map} +import scala.collection.mutable.ArrayBuffer import scala.reflect.{classTag, ClassTag} import kafka.common.TopicAndPartition @@ -82,7 +83,7 @@ class ReliableKafkaReceiver[ * Manage the BlockGenerator in receiver itself for better managing block store and offset * commit. */ - private var blockGenerator: BlockGenerator = null + private var blockGenerator: BlockGenerator[(K, V)] = null /** Thread pool running the handlers for receiving message from multiple topics and partitions. */ private var messageHandlerThreadPool: ThreadPoolExecutor = null @@ -275,9 +276,9 @@ class ReliableKafkaReceiver[ } /** Class to handle blocks generated by the block generator. */ - private final class GeneratedBlockHandler extends BlockGeneratorListener { + private final class GeneratedBlockHandler extends BlockGeneratorListener[(K, V)] { - def onAddData(data: Any, metadata: Any): Unit = { + def onAddData(data: (K, V), metadata: Any): Unit = { // Update the offset of the data that was added to the generator if (metadata != null) { val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)] @@ -290,7 +291,7 @@ class ReliableKafkaReceiver[ rememberBlockOffsets(blockId) } - def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { + def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[(K, V)]): Unit = { // Store block and commit the blocks offset storeBlockAndCommitOffset(blockId, arrayBuffer) } @@ -298,5 +299,7 @@ class ReliableKafkaReceiver[ def onError(message: String, throwable: Throwable): Unit = { reportError(message, throwable) } + + override def onAddData(data: ArrayBuffer[(K, V)], metadata: Any): Unit = {} } } diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 858368d135b6a..3d01a0e657033 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -21,6 +21,8 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag import scala.util.control.NonFatal import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, DefaultAWSCredentialsProviderChain} @@ -81,7 +83,7 @@ case class SerializableAWSCredentials(accessKeyId: String, secretKey: String) * @param awsCredentialsOption Optional AWS credentials, used when user directly specifies * the credentials */ -private[kinesis] class KinesisReceiver[T]( +private[kinesis] class KinesisReceiver[T: ClassTag]( val streamName: String, endpointUrl: String, regionName: String, @@ -116,7 +118,7 @@ private[kinesis] class KinesisReceiver[T]( @volatile private var workerThread: Thread = null /** BlockGenerator used to generates blocks out of Kinesis data */ - @volatile private var blockGenerator: BlockGenerator = null + @volatile private var blockGenerator: BlockGenerator[T] = null /** * Sequence number ranges added to the current block being generated. @@ -327,14 +329,23 @@ private[kinesis] class KinesisReceiver[T]( * - When the currently active block is ready to sealed (not more records), this handler * keep track of the list of ranges added into this block in another H */ - private class GeneratedBlockHandler extends BlockGeneratorListener { + private class GeneratedBlockHandler extends BlockGeneratorListener[T] { /** * Callback method called after a data item is added into the BlockGenerator. * The data addition, block generation, and calls to onAddData and onGenerateBlock * are all synchronized through the same lock. */ - def onAddData(data: Any, metadata: Any): Unit = { + def onAddData(data: T, metadata: Any): Unit = { + rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange]) + } + + /** + * Callback method called after a data item is added into the BlockGenerator. + * The data addition, block generation, and calls to onAddData and onGenerateBlock + * are all synchronized through the same lock. + */ + override def onAddData(data: ArrayBuffer[T], metadata: Any): Unit = { rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange]) } @@ -348,7 +359,7 @@ private[kinesis] class KinesisReceiver[T]( } /** Callback method called when a block is ready to be pushed / stored. */ - def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { + def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[T]): Unit = { storeBlockWithRanges(blockId, arrayBuffer.asInstanceOf[mutable.ArrayBuffer[T]]) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index 90309c0145ae1..2177ad8209bc0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.receiver import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging @@ -28,7 +29,7 @@ import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.util.{Clock, SystemClock} /** Listener object for BlockGenerator events */ -private[streaming] trait BlockGeneratorListener { +private[streaming] abstract class BlockGeneratorListener[T: ClassTag] { /** * Called after a data item is added into the BlockGenerator. The data addition and this * callback are synchronized with the block generation and its associated callback, @@ -37,8 +38,17 @@ private[streaming] trait BlockGeneratorListener { * that will be useful when a block is generated. Any long blocking operation in this callback * will hurt the throughput. */ - def onAddData(data: Any, metadata: Any): Unit + def onAddData(data: T, metadata: Any): Unit + /** + * Called after multi data items are added into the BlockGenerator. The data addition and this + * callback are synchronized with the block generation and its associated callback, + * so block generation waits for the active data addition+callback to complete. This is useful + * for updating metadata on successful buffering of multi data items, specifically that metadata + * that will be useful when a block is generated. Any long blocking operation in this callback + * will hurt the throughput. + */ + def onAddData(data: ArrayBuffer[T], metadata: Any): Unit /** * Called when a new block of data is generated by the block generator. The block generation * and this callback are synchronized with the data addition and its associated callback, so @@ -55,7 +65,7 @@ private[streaming] trait BlockGeneratorListener { * thread, that is not synchronized with any other callbacks. Hence it is okay to do long * blocking operation in this callback. */ - def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]): Unit + def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[T]): Unit /** * Called when an error has occurred in the BlockGenerator. Can be called form many places @@ -74,14 +84,14 @@ private[streaming] trait BlockGeneratorListener { * Note: Do not create BlockGenerator instances directly inside receivers. Use * `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it. */ -private[streaming] class BlockGenerator( - listener: BlockGeneratorListener, +private[streaming] class BlockGenerator[T: ClassTag]( + listener: BlockGeneratorListener[T], receiverId: Int, conf: SparkConf, clock: Clock = new SystemClock() ) extends RateLimiter(conf) with Logging { - private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any]) + private case class Block(id: StreamBlockId, buffer: ArrayBuffer[T]) /** * The BlockGenerator can be in 5 possible states, in the order as follows. @@ -109,7 +119,7 @@ private[streaming] class BlockGenerator( private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize) private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } - @volatile private var currentBuffer = new ArrayBuffer[Any] + @volatile private var currentBuffer = new ArrayBuffer[T] @volatile private var state = Initialized /** Start block generating and pushing threads. */ @@ -158,7 +168,7 @@ private[streaming] class BlockGenerator( /** * Push a single data item into the buffer. */ - def addData(data: Any): Unit = { + def addData(data: T): Unit = { if (state == Active) { waitToPush() synchronized { @@ -179,7 +189,7 @@ private[streaming] class BlockGenerator( * Push a single data item into the buffer. After buffering the data, the * `BlockGeneratorListener.onAddData` callback will be called. */ - def addDataWithCallback(data: Any, metadata: Any): Unit = { + def addDataWithCallback(data: T, metadata: Any): Unit = { if (state == Active) { waitToPush() synchronized { @@ -202,10 +212,10 @@ private[streaming] class BlockGenerator( * `BlockGeneratorListener.onAddData` callback will be called. Note that all the data items * are atomically added to the buffer, and are hence guaranteed to be present in a single block. */ - def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = { + def addMultipleDataWithCallback(dataIterator: Iterator[T], metadata: Any): Unit = { if (state == Active) { // Unroll iterator into a temp buffer, and wait for pushing in the process - val tempBuffer = new ArrayBuffer[Any] + val tempBuffer = new ArrayBuffer[T] dataIterator.foreach { data => waitToPush() tempBuffer += data @@ -236,7 +246,7 @@ private[streaming] class BlockGenerator( synchronized { if (currentBuffer.nonEmpty) { val newBlockBuffer = currentBuffer - currentBuffer = new ArrayBuffer[Any] + currentBuffer = new ArrayBuffer[T] val blockId = StreamBlockId(receiverId, time - blockIntervalMs) listener.onGenerateBlock(blockId) newBlock = new Block(blockId, newBlockBuffer) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala index 8c3a7977beae3..3228bb5f0178f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala @@ -21,15 +21,19 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.language.existentials +import scala.reflect.ClassTag /** Trait representing a received block */ -private[streaming] sealed trait ReceivedBlock +private[streaming] abstract class ReceivedBlock[T: ClassTag] /** class representing a block received as an ArrayBuffer */ -private[streaming] case class ArrayBufferBlock(arrayBuffer: ArrayBuffer[_]) extends ReceivedBlock +private[streaming] case class ArrayBufferBlock[T: ClassTag](arrayBuffer: ArrayBuffer[T]) + extends ReceivedBlock[T] /** class representing a block received as an Iterator */ -private[streaming] case class IteratorBlock(iterator: Iterator[_]) extends ReceivedBlock +private[streaming] case class IteratorBlock[T: ClassTag](iterator: Iterator[T]) + extends ReceivedBlock[T] /** class representing a block received as a ByteBuffer */ -private[streaming] case class ByteBufferBlock(byteBuffer: ByteBuffer) extends ReceivedBlock +private[streaming] case class ByteBufferBlock[T: ClassTag](byteBuffer: ByteBuffer) + extends ReceivedBlock[T] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 80c07958b41f2..22e7313bc34d1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.receiver import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.language.{existentials, postfixOps} +import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -42,10 +43,10 @@ private[streaming] trait ReceivedBlockStoreResult { } /** Trait that represents a class that handles the storage of blocks received by receiver */ -private[streaming] trait ReceivedBlockHandler { +private[streaming] abstract class ReceivedBlockHandler[T: ClassTag] { /** Store a received block with the given block id and return related metadata */ - def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult + def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock[T]): ReceivedBlockStoreResult /** Cleanup old blocks older than the given threshold time */ def cleanupOldBlocks(threshTime: Long): Unit @@ -66,11 +67,11 @@ private[streaming] case class BlockManagerBasedStoreResult( * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which * stores the received blocks into a block manager with the specified storage level. */ -private[streaming] class BlockManagerBasedBlockHandler( +private[streaming] class BlockManagerBasedBlockHandler[T: ClassTag]( blockManager: BlockManager, storageLevel: StorageLevel) - extends ReceivedBlockHandler with Logging { + extends ReceivedBlockHandler[T] with Logging { - def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { + def storeBlock(blockId: StreamBlockId, block: ReceivedBlock[T]): ReceivedBlockStoreResult = { var numRecords: Option[Long] = None @@ -122,7 +123,7 @@ private[streaming] case class WriteAheadLogBasedStoreResult( * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which * stores the received blocks in both, a write ahead log and a block manager. */ -private[streaming] class WriteAheadLogBasedBlockHandler( +private[streaming] class WriteAheadLogBasedBlockHandler[T: ClassTag]( blockManager: BlockManager, serializerManager: SerializerManager, streamId: Int, @@ -131,7 +132,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( hadoopConf: Configuration, checkpointDir: String, clock: Clock = new SystemClock - ) extends ReceivedBlockHandler with Logging { + ) extends ReceivedBlockHandler[T] with Logging { private val blockStoreTimeout = conf.getInt( "spark.streaming.receiver.blockStoreTimeout", 30).seconds @@ -168,7 +169,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( * It does this in parallel, using Scala Futures, and returns only after the block has * been stored in both places. */ - def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { + def storeBlock(blockId: StreamBlockId, block: ReceivedBlock[T]): ReceivedBlockStoreResult = { var numRecords = Option.empty[Long] // Serialize the block so that it can be inserted into both diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index d91a64df321a6..52a6f2d11269a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -21,6 +21,7 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConverters._ +import scala.reflect.{classTag, ClassTag} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.storage.StorageLevel @@ -83,7 +84,12 @@ import org.apache.spark.storage.StorageLevel * }}} */ @DeveloperApi -abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable { +abstract class Receiver[T](val storageLevel: StorageLevel)(implicit t: ClassTag[T]) + extends Serializable { + + def this(clz: Class[T], storageLevel: StorageLevel) { + this(storageLevel)(ClassTag.AnyRef.asInstanceOf[ClassTag[T]]) + } /** * This method is called by the system when the receiver is started. This function @@ -257,7 +263,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable private var id: Int = -1 /** Handler object that runs the receiver. This is instantiated lazily in the worker. */ - @transient private var _supervisor: ReceiverSupervisor = null + @transient private var _supervisor: ReceiverSupervisor[T] = null /** Set the ID of the DStream that this receiver is associated with. */ private[streaming] def setReceiverId(_id: Int) { @@ -265,17 +271,19 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable } /** Attach Network Receiver executor to this receiver. */ - private[streaming] def attachSupervisor(exec: ReceiverSupervisor) { + private[streaming] def attachSupervisor(exec: ReceiverSupervisor[T]) { assert(_supervisor == null) _supervisor = exec } /** Get the attached supervisor. */ - private[streaming] def supervisor: ReceiverSupervisor = { + private[streaming] def supervisor: ReceiverSupervisor[T] = { assert(_supervisor != null, "A ReceiverSupervisor has not been attached to the receiver yet. Maybe you are starting " + "some computation in the receiver before the Receiver.onStart() has been called.") _supervisor } + + def getClassTag: ClassTag[T] = t } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index faf6db82d5b18..fd7d9a0ca28b1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch import scala.collection.mutable.ArrayBuffer import scala.concurrent._ +import scala.reflect.ClassTag import scala.util.control.NonFatal import org.apache.spark.SparkConf @@ -33,8 +34,8 @@ import org.apache.spark.util.{ThreadUtils, Utils} * Abstract class that is responsible for supervising a Receiver in the worker. * It provides all the necessary interfaces for handling the data received by the receiver. */ -private[streaming] abstract class ReceiverSupervisor( - receiver: Receiver[_], +private[streaming] abstract class ReceiverSupervisor[T: ClassTag]( + receiver: Receiver[T], conf: SparkConf ) extends Logging { @@ -70,7 +71,7 @@ private[streaming] abstract class ReceiverSupervisor( @volatile private[streaming] var receiverState = Initialized /** Push a single data item to backend data store. */ - def pushSingle(data: Any): Unit + def pushSingle(data: T): Unit /** Store the bytes of received data as a data block into Spark's memory. */ def pushBytes( @@ -81,14 +82,14 @@ private[streaming] abstract class ReceiverSupervisor( /** Store an iterator of received data as a data block into Spark's memory. */ def pushIterator( - iterator: Iterator[_], + iterator: Iterator[T], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId] ): Unit /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ def pushArrayBuffer( - arrayBuffer: ArrayBuffer[_], + arrayBuffer: ArrayBuffer[T], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId] ): Unit @@ -100,7 +101,7 @@ private[streaming] abstract class ReceiverSupervisor( * Note: Do not explicitly start or stop the `BlockGenerator`, the `ReceiverSupervisorImpl` * will take care of it. */ - def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator + def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener[T]): BlockGenerator[T] /** Report errors. */ def reportError(message: String, throwable: Throwable): Unit diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 5ba09a54af18d..0e87b53eb2d2a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag import com.google.common.base.Throwables import org.apache.hadoop.conf.Configuration @@ -42,8 +43,8 @@ import org.apache.spark.util.RpcUtils * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]] * object that is used to divide the received data stream into blocks of data. */ -private[streaming] class ReceiverSupervisorImpl( - receiver: Receiver[_], +private[streaming] class ReceiverSupervisorImpl[T: ClassTag]( + receiver: Receiver[T], env: SparkEnv, hadoopConf: Configuration, checkpointDirOption: Option[String] @@ -52,7 +53,7 @@ private[streaming] class ReceiverSupervisorImpl( private val host = SparkEnv.get.blockManager.blockManagerId.host private val executorId = SparkEnv.get.blockManager.blockManagerId.executorId - private val receivedBlockHandler: ReceivedBlockHandler = { + private val receivedBlockHandler: ReceivedBlockHandler[T] = { if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { if (checkpointDirOption.isEmpty) { throw new SparkException( @@ -94,11 +95,13 @@ private[streaming] class ReceiverSupervisorImpl( /** Unique block ids if one wants to add blocks directly */ private val newBlockId = new AtomicLong(System.currentTimeMillis()) - private val registeredBlockGenerators = new ConcurrentLinkedQueue[BlockGenerator]() + private val registeredBlockGenerators = new ConcurrentLinkedQueue[BlockGenerator[T]]() /** Divides received data records into data blocks for pushing in BlockManager. */ - private val defaultBlockGeneratorListener = new BlockGeneratorListener { - def onAddData(data: Any, metadata: Any): Unit = { } + private val defaultBlockGeneratorListener = new BlockGeneratorListener[T] { + def onAddData(data: T, metadata: Any): Unit = { } + + def onAddData(data: ArrayBuffer[T], metadata: Any): Unit = {} def onGenerateBlock(blockId: StreamBlockId): Unit = { } @@ -106,7 +109,7 @@ private[streaming] class ReceiverSupervisorImpl( reportError(message, throwable) } - def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { + def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[T]) { pushArrayBuffer(arrayBuffer, None, Some(blockId)) } } @@ -116,13 +119,13 @@ private[streaming] class ReceiverSupervisorImpl( override private[streaming] def getCurrentRateLimit: Long = defaultBlockGenerator.getCurrentLimit /** Push a single record of received data into block generator. */ - def pushSingle(data: Any) { + def pushSingle(data: T) { defaultBlockGenerator.addData(data) } /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ def pushArrayBuffer( - arrayBuffer: ArrayBuffer[_], + arrayBuffer: ArrayBuffer[T], metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { @@ -131,7 +134,7 @@ private[streaming] class ReceiverSupervisorImpl( /** Store an iterator of received data as a data block into Spark's memory. */ def pushIterator( - iterator: Iterator[_], + iterator: Iterator[T], metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { @@ -149,7 +152,7 @@ private[streaming] class ReceiverSupervisorImpl( /** Store block and report it to driver */ def pushAndReportBlock( - receivedBlock: ReceivedBlock, + receivedBlock: ReceivedBlock[T], metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { @@ -193,7 +196,7 @@ private[streaming] class ReceiverSupervisorImpl( } override def createBlockGenerator( - blockGeneratorListener: BlockGeneratorListener): BlockGenerator = { + blockGeneratorListener: BlockGeneratorListener[T]): BlockGenerator[T] = { // Cleanup BlockGenerators that have already been stopped val stoppedGenerators = registeredBlockGenerators.asScala.filter{ _.isStopped() } stoppedGenerators.foreach(registeredBlockGenerators.remove(_)) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 8f55d982a904c..d1676e548fe79 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -22,8 +22,11 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.collection.mutable.HashMap import scala.concurrent.ExecutionContext import scala.language.existentials +import scala.reflect.ClassTag import scala.util.{Failure, Success} +import org.apache.hadoop.conf.Configuration + import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD @@ -34,7 +37,6 @@ import org.apache.spark.streaming.receiver._ import org.apache.spark.streaming.util.WriteAheadLogUtils import org.apache.spark.util.{SerializableConfiguration, ThreadUtils, Utils} - /** Enumeration to identify current state of a Receiver */ private[streaming] object ReceiverState extends Enumeration { type ReceiverState = Value @@ -595,6 +597,15 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // Function to start the receiver on the worker node val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { + def getReceiverSupervisor[T: ClassTag]( + receiver: Receiver[_], + env: SparkEnv, + hadoopConf: Configuration, + checkpointDirOption: Option[String]): ReceiverSupervisorImpl[T] = { + val r = receiver.asInstanceOf[Receiver[T]] + new ReceiverSupervisorImpl[T](r, env, hadoopConf, checkpointDirOption) + } + if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") @@ -602,8 +613,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) - val supervisor = new ReceiverSupervisorImpl( - receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) + val supervisor = getReceiverSupervisor(receiver, SparkEnv.get, + serializableHadoopConf.value, checkpointDirOption)(receiver.getClassTag) supervisor.start() supervisor.awaitTermination() } else { @@ -669,5 +680,4 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false logInfo("Sent stop signal to all " + receiverTrackingInfos.size + " receivers") } } - } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java index 091ccbfd85cad..b68d5f516f983 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java @@ -103,7 +103,7 @@ private static class JavaSocketReceiver extends Receiver { private int port = -1; JavaSocketReceiver(String host_ , int port_) { - super(StorageLevel.MEMORY_AND_DISK()); + super(String.class, StorageLevel.MEMORY_AND_DISK()); host = host_; port = port_; } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index f2241936000a0..b6e9de985da21 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming import java.io.File import java.nio.ByteBuffer +import scala.collection.immutable.Range import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.postfixOps @@ -111,8 +112,9 @@ class ReceivedBlockHandlerSuite } test("BlockManagerBasedBlockHandler - store blocks") { - withBlockManagerBasedBlockHandler { handler => - testBlockStoring(handler) { case (data, blockIds, storeResults) => + withBlockManagerBasedBlockHandler[String] { handler => + val data = Seq.tabulate(100) { _.toString } + testBlockStoring(handler, data) { case (data, blockIds, storeResults) => // Verify the data in block manager is correct val storedData = blockIds.flatMap { blockId => blockManager @@ -132,14 +134,16 @@ class ReceivedBlockHandlerSuite } test("BlockManagerBasedBlockHandler - handle errors in storing block") { - withBlockManagerBasedBlockHandler { handler => - testErrorHandling(handler) + withBlockManagerBasedBlockHandler[Int] { handler => + val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 } + testErrorHandling(handler, iterator) } } test("WriteAheadLogBasedBlockHandler - store blocks") { - withWriteAheadLogBasedBlockHandler { handler => - testBlockStoring(handler) { case (data, blockIds, storeResults) => + withWriteAheadLogBasedBlockHandler[String] { handler => + val data = Seq.tabulate(100) { _.toString } + testBlockStoring(handler, data) { case (data, blockIds, storeResults) => // Verify the data in block manager is correct val storedData = blockIds.flatMap { blockId => blockManager @@ -172,13 +176,14 @@ class ReceivedBlockHandlerSuite } test("WriteAheadLogBasedBlockHandler - handle errors in storing block") { - withWriteAheadLogBasedBlockHandler { handler => - testErrorHandling(handler) + withWriteAheadLogBasedBlockHandler[Int] { handler => + val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 } + testErrorHandling(handler, iterator) } } test("WriteAheadLogBasedBlockHandler - clean old blocks") { - withWriteAheadLogBasedBlockHandler { handler => + withWriteAheadLogBasedBlockHandler[Range.Inclusive] { handler => val blocks = Seq.tabulate(10) { i => IteratorBlock(Iterator(1 to i)) } storeBlocks(handler, blocks) @@ -227,7 +232,7 @@ class ReceivedBlockHandlerSuite // BlockManager will not be able to unroll this block // and hence it will not tryToPut this block, resulting the SparkException storageLevel = StorageLevel.MEMORY_ONLY - withBlockManagerBasedBlockHandler { handler => + withBlockManagerBasedBlockHandler[Array[Byte]] { handler => val thrown = intercept[SparkException] { storeSingleBlock(handler, IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator)) } @@ -285,9 +290,9 @@ class ReceivedBlockHandlerSuite * Test storing of data using different types of Handler, StorageLevel and ReceivedBlocks * and verify the correct record count */ - private def testRecordcount(isBlockManagedBasedBlockHandler: Boolean, + private def testRecordcount[T: ClassTag](isBlockManagedBasedBlockHandler: Boolean, sLevel: StorageLevel, - receivedBlock: ReceivedBlock, + receivedBlock: ReceivedBlock[T], bManager: BlockManager, expectedNumRecords: Option[Long] ) { @@ -297,7 +302,7 @@ class ReceivedBlockHandlerSuite try { if (isBlockManagedBasedBlockHandler) { // test received block with BlockManager based handler - withBlockManagerBasedBlockHandler { handler => + withBlockManagerBasedBlockHandler[T] { handler => val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock) bId = blockId assert(blockStoreResult.numRecords === expectedNumRecords, @@ -307,7 +312,7 @@ class ReceivedBlockHandlerSuite } } else { // test received block with WAL based handler - withWriteAheadLogBasedBlockHandler { handler => + withWriteAheadLogBasedBlockHandler[T] { handler => val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock) bId = blockId assert(blockStoreResult.numRecords === expectedNumRecords, @@ -326,11 +331,12 @@ class ReceivedBlockHandlerSuite * Test storing of data using different forms of ReceivedBlocks and verify that they succeeded * using the given verification function */ - private def testBlockStoring(receivedBlockHandler: ReceivedBlockHandler) - (verifyFunc: (Seq[String], Seq[StreamBlockId], Seq[ReceivedBlockStoreResult]) => Unit) { - val data = Seq.tabulate(100) { _.toString } + private def testBlockStoring[T: ClassTag]( + receivedBlockHandler: ReceivedBlockHandler[T], + data: Seq[T]) + (verifyFunc: (Seq[T], Seq[StreamBlockId], Seq[ReceivedBlockStoreResult]) => Unit) { - def storeAndVerify(blocks: Seq[ReceivedBlock]) { + def storeAndVerify(blocks: Seq[ReceivedBlock[T]]) { blocks should not be empty val (blockIds, storeResults) = storeBlocks(receivedBlockHandler, blocks) withClue(s"Testing with ${blocks.head.getClass.getSimpleName}s:") { @@ -342,21 +348,21 @@ class ReceivedBlockHandlerSuite } } - def dataToByteBuffer(b: Seq[String]) = + def dataToByteBuffer(b: Seq[T]) = serializerManager.dataSerialize(generateBlockId, b.iterator) val blocks = data.grouped(10).toSeq storeAndVerify(blocks.map { b => IteratorBlock(b.toIterator) }) storeAndVerify(blocks.map { b => ArrayBufferBlock(new ArrayBuffer ++= b) }) - storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b).toByteBuffer) }) + storeAndVerify(blocks.map { b => ByteBufferBlock[T](dataToByteBuffer(b).toByteBuffer) }) } /** Test error handling when blocks that cannot be stored */ - private def testErrorHandling(receivedBlockHandler: ReceivedBlockHandler) { + private def testErrorHandling[T: ClassTag](receivedBlockHandler: ReceivedBlockHandler[T], + iterator: Iterator[T]) { // Handle error in iterator (e.g. divide-by-zero error) intercept[Exception] { - val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 } receivedBlockHandler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator)) } @@ -368,15 +374,17 @@ class ReceivedBlockHandlerSuite } /** Instantiate a BlockManagerBasedBlockHandler and run a code with it */ - private def withBlockManagerBasedBlockHandler(body: BlockManagerBasedBlockHandler => Unit) { + private def withBlockManagerBasedBlockHandler[T: ClassTag] + (body: BlockManagerBasedBlockHandler[T] => Unit) { body(new BlockManagerBasedBlockHandler(blockManager, storageLevel)) } /** Instantiate a WriteAheadLogBasedBlockHandler and run a code with it */ - private def withWriteAheadLogBasedBlockHandler(body: WriteAheadLogBasedBlockHandler => Unit) { + private def withWriteAheadLogBasedBlockHandler[T: ClassTag] + (body: WriteAheadLogBasedBlockHandler[T] => Unit) { require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = false) === 1) - val receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, serializerManager, - 1, storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock) + val receivedBlockHandler = new WriteAheadLogBasedBlockHandler[T](blockManager, + serializerManager, 1, storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock) try { body(receivedBlockHandler) } finally { @@ -385,9 +393,9 @@ class ReceivedBlockHandlerSuite } /** Store blocks using a handler */ - private def storeBlocks( - receivedBlockHandler: ReceivedBlockHandler, - blocks: Seq[ReceivedBlock] + private def storeBlocks[T: ClassTag]( + receivedBlockHandler: ReceivedBlockHandler[T], + blocks: Seq[ReceivedBlock[T]] ): (Seq[StreamBlockId], Seq[ReceivedBlockStoreResult]) = { val blockIds = Seq.fill(blocks.size)(generateBlockId()) val storeResults = blocks.zip(blockIds).map { @@ -401,9 +409,9 @@ class ReceivedBlockHandlerSuite } /** Store single block using a handler */ - private def storeSingleBlock( - handler: ReceivedBlockHandler, - block: ReceivedBlock + private def storeSingleBlock[T: ClassTag]( + handler: ReceivedBlockHandler[T], + block: ReceivedBlock[T] ): (StreamBlockId, ReceivedBlockStoreResult) = { val blockId = generateBlockId val blockStoreResult = handler.storeBlock(blockId, block) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 1b1e21f6e5bab..3aea520d5c1c9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -23,6 +23,7 @@ import java.util.concurrent.Semaphore import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts @@ -130,7 +131,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { } ignore("block generator throttling") { - val blockGeneratorListener = new FakeBlockGeneratorListener + val blockGeneratorListener = new FakeBlockGeneratorListener[Int] val blockIntervalMs = 100 val maxRate = 1001 val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms"). @@ -278,8 +279,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { * Instead of storing the data in the BlockManager, it stores all the data in a local buffer * that can used for verifying that the data has been forwarded correctly. */ - class FakeReceiverSupervisor(receiver: FakeReceiver) - extends ReceiverSupervisor(receiver, new SparkConf()) { + class FakeReceiverSupervisor[T: ClassTag](receiver: FakeReceiver) + extends ReceiverSupervisor[T](receiver.asInstanceOf[Receiver[T]], new SparkConf()) { val singles = new ArrayBuffer[Any] val byteBuffers = new ArrayBuffer[ByteBuffer] val iterators = new ArrayBuffer[Iterator[_]] @@ -292,7 +293,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { arrayBuffers.isEmpty && errors.isEmpty } - def pushSingle(data: Any) { + def pushSingle(data: T) { singles += data } @@ -304,14 +305,14 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { } def pushIterator( - iterator: Iterator[_], + iterator: Iterator[T], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId]) { iterators += iterator } def pushArrayBuffer( - arrayBuffer: ArrayBuffer[_], + arrayBuffer: ArrayBuffer[T], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId]) { arrayBuffers += arrayBuffer @@ -324,7 +325,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { override protected def onReceiverStart(): Boolean = true override def createBlockGenerator( - blockGeneratorListener: BlockGeneratorListener): BlockGenerator = { + blockGeneratorListener: BlockGeneratorListener[T]): BlockGenerator[T] = { null } } @@ -332,16 +333,19 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { /** * An implementation of BlockGeneratorListener that is used to test the BlockGenerator. */ - class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener { + class FakeBlockGeneratorListener[T: ClassTag](pushDelay: Long = 0) + extends BlockGeneratorListener[T] { // buffer of data received as ArrayBuffers val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] val errors = new ArrayBuffer[Throwable] - def onAddData(data: Any, metadata: Any) { } + def onAddData(data: T, metadata: Any) { } + + def onAddData(data: ArrayBuffer[T], metadata: Any) { } def onGenerateBlock(blockId: StreamBlockId) { } - def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { + def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[T]) { val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int]) arrayBuffers += bufferOfInts Thread.sleep(0) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala index a1d0561bf308a..627b84cf8e037 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala @@ -21,13 +21,15 @@ import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.language.reflectiveCalls +import scala.reflect.ClassTag -import org.scalatest.BeforeAndAfter -import org.scalatest.Matchers._ import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts._ import org.scalatest.time.SpanSugar._ +import org.scalatest.BeforeAndAfter +import org.scalatest.Matchers._ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.storage.StreamBlockId @@ -37,7 +39,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { private val blockIntervalMs = 10 private val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms") - @volatile private var blockGenerator: BlockGenerator = null + @volatile private var blockGenerator: BlockGenerator[Int] = null after { if (blockGenerator != null) { @@ -46,7 +48,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { } test("block generation and data callbacks") { - val listener = new TestBlockGeneratorListener + val listener = new TestBlockGeneratorListener[Int] val clock = new ManualClock() require(blockIntervalMs > 5) @@ -141,7 +143,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { } test("stop ensures correct shutdown") { - val listener = new TestBlockGeneratorListener + val listener = new TestBlockGeneratorListener[Int] val clock = new ManualClock() blockGenerator = new BlockGenerator(listener, 0, conf, clock) require(listener.onGenerateBlockCalled === false) @@ -201,10 +203,10 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { } test("block push errors are reported") { - val listener = new TestBlockGeneratorListener { + val listener = new TestBlockGeneratorListener[Int] { @volatile var errorReported = false override def onPushBlock( - blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { + blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[Int]): Unit = { throw new SparkException("test") } override def onError(message: String, throwable: Throwable): Unit = { @@ -225,7 +227,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { * Helper method to stop the block generator with manual clock in a different thread, * so that the main thread can advance the clock that allows the stopping to proceed. */ - private def stopBlockGenerator(blockGenerator: BlockGenerator): Thread = { + private def stopBlockGenerator[T: ClassTag](blockGenerator: BlockGenerator[T]): Thread = { val thread = new Thread() { override def run(): Unit = { blockGenerator.stop() @@ -236,15 +238,15 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { } /** A listener for BlockGenerator that records the data in the callbacks */ - private class TestBlockGeneratorListener extends BlockGeneratorListener { - val pushedData = new ConcurrentLinkedQueue[Any] - val addedData = new ConcurrentLinkedQueue[Any] + private class TestBlockGeneratorListener[T: ClassTag] extends BlockGeneratorListener[T] { + val pushedData = new ConcurrentLinkedQueue[T] + val addedData = new ConcurrentLinkedQueue[T] val addedMetadata = new ConcurrentLinkedQueue[Any] @volatile var onGenerateBlockCalled = false @volatile var onAddDataCalled = false @volatile var onPushBlockCalled = false - override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { + override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[T]): Unit = { pushedData.addAll(arrayBuffer.asJava) onPushBlockCalled = true } @@ -252,10 +254,15 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { override def onGenerateBlock(blockId: StreamBlockId): Unit = { onGenerateBlockCalled = true } - override def onAddData(data: Any, metadata: Any): Unit = { + override def onAddData(data: T, metadata: Any): Unit = { addedData.add(data) addedMetadata.add(metadata) onAddDataCalled = true } + def onAddData(data: ArrayBuffer[T], metadata: Any): Unit = { + data.foreach(data => addedData.add(data)) + addedMetadata.add(metadata) + onAddDataCalled = true + } } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index df122ac090c3e..76f808306d594 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -148,11 +148,12 @@ private[streaming] class RateTestReceiver(receiverId: Int, host: Option[String] extends Receiver[Int](StorageLevel.MEMORY_ONLY) { private lazy val customBlockGenerator = supervisor.createBlockGenerator( - new BlockGeneratorListener { - override def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]): Unit = {} + new BlockGeneratorListener[Int] { + override def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[Int]): Unit = {} + override def onAddData(data: ArrayBuffer[Int], metadata: Any): Unit = {} override def onError(message: String, throwable: Throwable): Unit = {} override def onGenerateBlock(blockId: StreamBlockId): Unit = {} - override def onAddData(data: Any, metadata: Any): Unit = {} + override def onAddData(data: Int, metadata: Any): Unit = {} } ) From 6f64afdfbf3dc091a9624b5d26cd5a1f1f086659 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 23 Nov 2016 20:22:52 +0800 Subject: [PATCH 2/3] fix mima issue --- project/MimaExcludes.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 12f7ed202b9db..7877bd03f07d6 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -90,6 +90,12 @@ object MimaExcludes { // [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables") + + // [SPARK-18560] Receiver data can not be dataSerialized properly. + ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.streaming.receiver.BlockGeneratorListener") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.streaming.receiver.Receiver.this") + ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.streaming.receiver.ReceivedBlockHandler") + ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.streaming.receiver.ReceivedBlock") ) } From 037f10dc5c76473242ce77a4bb5bcb9387e67b4b Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 23 Nov 2016 20:34:10 +0800 Subject: [PATCH 3/3] update --- project/MimaExcludes.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 7877bd03f07d6..a7d8b4eb212a0 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -89,12 +89,12 @@ object MimaExcludes { ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="), // [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer - ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables") + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables"), // [SPARK-18560] Receiver data can not be dataSerialized properly. - ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.streaming.receiver.BlockGeneratorListener") - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.streaming.receiver.Receiver.this") - ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.streaming.receiver.ReceivedBlockHandler") + ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.streaming.receiver.BlockGeneratorListener"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.streaming.receiver.Receiver.this"), + ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.streaming.receiver.ReceivedBlockHandler"), ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.streaming.receiver.ReceivedBlock") ) }