Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.io.Closeables;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
Expand Down Expand Up @@ -103,7 +104,7 @@ public Integer call(Integer i1, Integer i2) {
int port = -1;

public JavaCustomReceiver(String host_ , int port_) {
super(StorageLevel.MEMORY_AND_DISK_2());
super(String.class, StorageLevel.MEMORY_AND_DISK_2());
host = host_;
port = port_;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}

import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{classTag, ClassTag}

import kafka.common.TopicAndPartition
Expand Down Expand Up @@ -82,7 +83,7 @@ class ReliableKafkaReceiver[
* Manage the BlockGenerator in receiver itself for better managing block store and offset
* commit.
*/
private var blockGenerator: BlockGenerator = null
private var blockGenerator: BlockGenerator[(K, V)] = null

/** Thread pool running the handlers for receiving message from multiple topics and partitions. */
private var messageHandlerThreadPool: ThreadPoolExecutor = null
Expand Down Expand Up @@ -275,9 +276,9 @@ class ReliableKafkaReceiver[
}

/** Class to handle blocks generated by the block generator. */
private final class GeneratedBlockHandler extends BlockGeneratorListener {
private final class GeneratedBlockHandler extends BlockGeneratorListener[(K, V)] {

def onAddData(data: Any, metadata: Any): Unit = {
def onAddData(data: (K, V), metadata: Any): Unit = {
// Update the offset of the data that was added to the generator
if (metadata != null) {
val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
Expand All @@ -290,13 +291,15 @@ class ReliableKafkaReceiver[
rememberBlockOffsets(blockId)
}

def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[(K, V)]): Unit = {
// Store block and commit the blocks offset
storeBlockAndCommitOffset(blockId, arrayBuffer)
}

def onError(message: String, throwable: Throwable): Unit = {
reportError(message, throwable)
}

override def onAddData(data: ArrayBuffer[(K, V)], metadata: Any): Unit = {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, DefaultAWSCredentialsProviderChain}
Expand Down Expand Up @@ -81,7 +83,7 @@ case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
* @param awsCredentialsOption Optional AWS credentials, used when user directly specifies
* the credentials
*/
private[kinesis] class KinesisReceiver[T](
private[kinesis] class KinesisReceiver[T: ClassTag](
val streamName: String,
endpointUrl: String,
regionName: String,
Expand Down Expand Up @@ -116,7 +118,7 @@ private[kinesis] class KinesisReceiver[T](
@volatile private var workerThread: Thread = null

/** BlockGenerator used to generates blocks out of Kinesis data */
@volatile private var blockGenerator: BlockGenerator = null
@volatile private var blockGenerator: BlockGenerator[T] = null

/**
* Sequence number ranges added to the current block being generated.
Expand Down Expand Up @@ -327,14 +329,23 @@ private[kinesis] class KinesisReceiver[T](
* - When the currently active block is ready to sealed (not more records), this handler
* keep track of the list of ranges added into this block in another H
*/
private class GeneratedBlockHandler extends BlockGeneratorListener {
private class GeneratedBlockHandler extends BlockGeneratorListener[T] {

/**
* Callback method called after a data item is added into the BlockGenerator.
* The data addition, block generation, and calls to onAddData and onGenerateBlock
* are all synchronized through the same lock.
*/
def onAddData(data: Any, metadata: Any): Unit = {
def onAddData(data: T, metadata: Any): Unit = {
rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange])
}

/**
* Callback method called after a data item is added into the BlockGenerator.
* The data addition, block generation, and calls to onAddData and onGenerateBlock
* are all synchronized through the same lock.
*/
override def onAddData(data: ArrayBuffer[T], metadata: Any): Unit = {
rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange])
}

Expand All @@ -348,7 +359,7 @@ private[kinesis] class KinesisReceiver[T](
}

/** Callback method called when a block is ready to be pushed / stored. */
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[T]): Unit = {
storeBlockWithRanges(blockId,
arrayBuffer.asInstanceOf[mutable.ArrayBuffer[T]])
}
Expand Down
8 changes: 7 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ object MimaExcludes {
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="),

// [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables"),

// [SPARK-18560] Receiver data can not be dataSerialized properly.
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.streaming.receiver.BlockGeneratorListener"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.streaming.receiver.Receiver.this"),
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.streaming.receiver.ReceivedBlockHandler"),
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.streaming.receiver.ReceivedBlock")
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.streaming.receiver
import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
Expand All @@ -28,7 +29,7 @@ import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Clock, SystemClock}

/** Listener object for BlockGenerator events */
private[streaming] trait BlockGeneratorListener {
private[streaming] abstract class BlockGeneratorListener[T: ClassTag] {
/**
* Called after a data item is added into the BlockGenerator. The data addition and this
* callback are synchronized with the block generation and its associated callback,
Expand All @@ -37,8 +38,17 @@ private[streaming] trait BlockGeneratorListener {
* that will be useful when a block is generated. Any long blocking operation in this callback
* will hurt the throughput.
*/
def onAddData(data: Any, metadata: Any): Unit
def onAddData(data: T, metadata: Any): Unit

/**
* Called after multi data items are added into the BlockGenerator. The data addition and this
* callback are synchronized with the block generation and its associated callback,
* so block generation waits for the active data addition+callback to complete. This is useful
* for updating metadata on successful buffering of multi data items, specifically that metadata
* that will be useful when a block is generated. Any long blocking operation in this callback
* will hurt the throughput.
*/
def onAddData(data: ArrayBuffer[T], metadata: Any): Unit
/**
* Called when a new block of data is generated by the block generator. The block generation
* and this callback are synchronized with the data addition and its associated callback, so
Expand All @@ -55,7 +65,7 @@ private[streaming] trait BlockGeneratorListener {
* thread, that is not synchronized with any other callbacks. Hence it is okay to do long
* blocking operation in this callback.
*/
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]): Unit
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[T]): Unit

/**
* Called when an error has occurred in the BlockGenerator. Can be called form many places
Expand All @@ -74,14 +84,14 @@ private[streaming] trait BlockGeneratorListener {
* Note: Do not create BlockGenerator instances directly inside receivers. Use
* `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it.
*/
private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,
private[streaming] class BlockGenerator[T: ClassTag](
listener: BlockGeneratorListener[T],
receiverId: Int,
conf: SparkConf,
clock: Clock = new SystemClock()
) extends RateLimiter(conf) with Logging {

private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
private case class Block(id: StreamBlockId, buffer: ArrayBuffer[T])

/**
* The BlockGenerator can be in 5 possible states, in the order as follows.
Expand Down Expand Up @@ -109,7 +119,7 @@ private[streaming] class BlockGenerator(
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }

@volatile private var currentBuffer = new ArrayBuffer[Any]
@volatile private var currentBuffer = new ArrayBuffer[T]
@volatile private var state = Initialized

/** Start block generating and pushing threads. */
Expand Down Expand Up @@ -158,7 +168,7 @@ private[streaming] class BlockGenerator(
/**
* Push a single data item into the buffer.
*/
def addData(data: Any): Unit = {
def addData(data: T): Unit = {
if (state == Active) {
waitToPush()
synchronized {
Expand All @@ -179,7 +189,7 @@ private[streaming] class BlockGenerator(
* Push a single data item into the buffer. After buffering the data, the
* `BlockGeneratorListener.onAddData` callback will be called.
*/
def addDataWithCallback(data: Any, metadata: Any): Unit = {
def addDataWithCallback(data: T, metadata: Any): Unit = {
if (state == Active) {
waitToPush()
synchronized {
Expand All @@ -202,10 +212,10 @@ private[streaming] class BlockGenerator(
* `BlockGeneratorListener.onAddData` callback will be called. Note that all the data items
* are atomically added to the buffer, and are hence guaranteed to be present in a single block.
*/
def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = {
def addMultipleDataWithCallback(dataIterator: Iterator[T], metadata: Any): Unit = {
if (state == Active) {
// Unroll iterator into a temp buffer, and wait for pushing in the process
val tempBuffer = new ArrayBuffer[Any]
val tempBuffer = new ArrayBuffer[T]
dataIterator.foreach { data =>
waitToPush()
tempBuffer += data
Expand Down Expand Up @@ -236,7 +246,7 @@ private[streaming] class BlockGenerator(
synchronized {
if (currentBuffer.nonEmpty) {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
currentBuffer = new ArrayBuffer[T]
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
listener.onGenerateBlock(blockId)
newBlock = new Block(blockId, newBlockBuffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@ import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.language.existentials
import scala.reflect.ClassTag

/** Trait representing a received block */
private[streaming] sealed trait ReceivedBlock
private[streaming] abstract class ReceivedBlock[T: ClassTag]

/** class representing a block received as an ArrayBuffer */
private[streaming] case class ArrayBufferBlock(arrayBuffer: ArrayBuffer[_]) extends ReceivedBlock
private[streaming] case class ArrayBufferBlock[T: ClassTag](arrayBuffer: ArrayBuffer[T])
extends ReceivedBlock[T]

/** class representing a block received as an Iterator */
private[streaming] case class IteratorBlock(iterator: Iterator[_]) extends ReceivedBlock
private[streaming] case class IteratorBlock[T: ClassTag](iterator: Iterator[T])
extends ReceivedBlock[T]

/** class representing a block received as a ByteBuffer */
private[streaming] case class ByteBufferBlock(byteBuffer: ByteBuffer) extends ReceivedBlock
private[streaming] case class ByteBufferBlock[T: ClassTag](byteBuffer: ByteBuffer)
extends ReceivedBlock[T]
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.streaming.receiver
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.{existentials, postfixOps}
import scala.reflect.ClassTag

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand All @@ -42,10 +43,10 @@ private[streaming] trait ReceivedBlockStoreResult {
}

/** Trait that represents a class that handles the storage of blocks received by receiver */
private[streaming] trait ReceivedBlockHandler {
private[streaming] abstract class ReceivedBlockHandler[T: ClassTag] {

/** Store a received block with the given block id and return related metadata */
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock[T]): ReceivedBlockStoreResult

/** Cleanup old blocks older than the given threshold time */
def cleanupOldBlocks(threshTime: Long): Unit
Expand All @@ -66,11 +67,11 @@ private[streaming] case class BlockManagerBasedStoreResult(
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
* stores the received blocks into a block manager with the specified storage level.
*/
private[streaming] class BlockManagerBasedBlockHandler(
private[streaming] class BlockManagerBasedBlockHandler[T: ClassTag](
blockManager: BlockManager, storageLevel: StorageLevel)
extends ReceivedBlockHandler with Logging {
extends ReceivedBlockHandler[T] with Logging {

def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock[T]): ReceivedBlockStoreResult = {

var numRecords: Option[Long] = None

Expand Down Expand Up @@ -122,7 +123,7 @@ private[streaming] case class WriteAheadLogBasedStoreResult(
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
* stores the received blocks in both, a write ahead log and a block manager.
*/
private[streaming] class WriteAheadLogBasedBlockHandler(
private[streaming] class WriteAheadLogBasedBlockHandler[T: ClassTag](
blockManager: BlockManager,
serializerManager: SerializerManager,
streamId: Int,
Expand All @@ -131,7 +132,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
hadoopConf: Configuration,
checkpointDir: String,
clock: Clock = new SystemClock
) extends ReceivedBlockHandler with Logging {
) extends ReceivedBlockHandler[T] with Logging {

private val blockStoreTimeout = conf.getInt(
"spark.streaming.receiver.blockStoreTimeout", 30).seconds
Expand Down Expand Up @@ -168,7 +169,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
* It does this in parallel, using Scala Futures, and returns only after the block has
* been stored in both places.
*/
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock[T]): ReceivedBlockStoreResult = {

var numRecords = Option.empty[Long]
// Serialize the block so that it can be inserted into both
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
import scala.reflect.{classTag, ClassTag}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -83,7 +84,12 @@ import org.apache.spark.storage.StorageLevel
* }}}
*/
@DeveloperApi
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
abstract class Receiver[T](val storageLevel: StorageLevel)(implicit t: ClassTag[T])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering will this change the signature of Receiver and break the existing code by user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know, this change will indeed broke custom receiver implementation in Java, just like the update in JavaCustomReceiver.java. Besides, in my humble opinion, I did not encounter any problem in my local test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so in that case we should carefully think about this change, at least keep compatible with existing code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is really trickier to add ClassTag in many place to support automatically-pick-best-serializer feature.

extends Serializable {

def this(clz: Class[T], storageLevel: StorageLevel) {
this(storageLevel)(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
}

/**
* This method is called by the system when the receiver is started. This function
Expand Down Expand Up @@ -257,25 +263,27 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
private var id: Int = -1

/** Handler object that runs the receiver. This is instantiated lazily in the worker. */
@transient private var _supervisor: ReceiverSupervisor = null
@transient private var _supervisor: ReceiverSupervisor[T] = null

/** Set the ID of the DStream that this receiver is associated with. */
private[streaming] def setReceiverId(_id: Int) {
id = _id
}

/** Attach Network Receiver executor to this receiver. */
private[streaming] def attachSupervisor(exec: ReceiverSupervisor) {
private[streaming] def attachSupervisor(exec: ReceiverSupervisor[T]) {
assert(_supervisor == null)
_supervisor = exec
}

/** Get the attached supervisor. */
private[streaming] def supervisor: ReceiverSupervisor = {
private[streaming] def supervisor: ReceiverSupervisor[T] = {
assert(_supervisor != null,
"A ReceiverSupervisor has not been attached to the receiver yet. Maybe you are starting " +
"some computation in the receiver before the Receiver.onStart() has been called.")
_supervisor
}

def getClassTag: ClassTag[T] = t
}

Loading