Skip to content

Commit ec36a0e

Browse files
committed
SPARK-18560: Receiver data can not be dataSerialized properly.
1 parent 6585479 commit ec36a0e

File tree

15 files changed

+191
-119
lines changed

15 files changed

+191
-119
lines changed

examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.common.io.Closeables;
2121

2222
import org.apache.spark.SparkConf;
23+
import org.apache.spark.api.java.JavaSparkContext;
2324
import org.apache.spark.api.java.function.FlatMapFunction;
2425
import org.apache.spark.api.java.function.Function2;
2526
import org.apache.spark.api.java.function.PairFunction;
@@ -103,7 +104,7 @@ public Integer call(Integer i1, Integer i2) {
103104
int port = -1;
104105

105106
public JavaCustomReceiver(String host_ , int port_) {
106-
super(StorageLevel.MEMORY_AND_DISK_2());
107+
super(String.class, StorageLevel.MEMORY_AND_DISK_2());
107108
host = host_;
108109
port = port_;
109110
}

external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.util.Properties
2121
import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
2222

2323
import scala.collection.{mutable, Map}
24+
import scala.collection.mutable.ArrayBuffer
2425
import scala.reflect.{classTag, ClassTag}
2526

2627
import kafka.common.TopicAndPartition
@@ -82,7 +83,7 @@ class ReliableKafkaReceiver[
8283
* Manage the BlockGenerator in receiver itself for better managing block store and offset
8384
* commit.
8485
*/
85-
private var blockGenerator: BlockGenerator = null
86+
private var blockGenerator: BlockGenerator[(K, V)] = null
8687

8788
/** Thread pool running the handlers for receiving message from multiple topics and partitions. */
8889
private var messageHandlerThreadPool: ThreadPoolExecutor = null
@@ -275,9 +276,9 @@ class ReliableKafkaReceiver[
275276
}
276277

277278
/** Class to handle blocks generated by the block generator. */
278-
private final class GeneratedBlockHandler extends BlockGeneratorListener {
279+
private final class GeneratedBlockHandler extends BlockGeneratorListener[(K, V)] {
279280

280-
def onAddData(data: Any, metadata: Any): Unit = {
281+
def onAddData(data: (K, V), metadata: Any): Unit = {
281282
// Update the offset of the data that was added to the generator
282283
if (metadata != null) {
283284
val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
@@ -290,13 +291,15 @@ class ReliableKafkaReceiver[
290291
rememberBlockOffsets(blockId)
291292
}
292293

293-
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
294+
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[(K, V)]): Unit = {
294295
// Store block and commit the blocks offset
295296
storeBlockAndCommitOffset(blockId, arrayBuffer)
296297
}
297298

298299
def onError(message: String, throwable: Throwable): Unit = {
299300
reportError(message, throwable)
300301
}
302+
303+
override def onAddData(data: ArrayBuffer[(K, V)], metadata: Any): Unit = {}
301304
}
302305
}

external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import java.util.concurrent.ConcurrentHashMap
2121

2222
import scala.collection.JavaConverters._
2323
import scala.collection.mutable
24+
import scala.collection.mutable.ArrayBuffer
25+
import scala.reflect.ClassTag
2426
import scala.util.control.NonFatal
2527

2628
import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, DefaultAWSCredentialsProviderChain}
@@ -81,7 +83,7 @@ case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
8183
* @param awsCredentialsOption Optional AWS credentials, used when user directly specifies
8284
* the credentials
8385
*/
84-
private[kinesis] class KinesisReceiver[T](
86+
private[kinesis] class KinesisReceiver[T: ClassTag](
8587
val streamName: String,
8688
endpointUrl: String,
8789
regionName: String,
@@ -116,7 +118,7 @@ private[kinesis] class KinesisReceiver[T](
116118
@volatile private var workerThread: Thread = null
117119

118120
/** BlockGenerator used to generates blocks out of Kinesis data */
119-
@volatile private var blockGenerator: BlockGenerator = null
121+
@volatile private var blockGenerator: BlockGenerator[T] = null
120122

121123
/**
122124
* Sequence number ranges added to the current block being generated.
@@ -327,14 +329,23 @@ private[kinesis] class KinesisReceiver[T](
327329
* - When the currently active block is ready to sealed (not more records), this handler
328330
* keep track of the list of ranges added into this block in another H
329331
*/
330-
private class GeneratedBlockHandler extends BlockGeneratorListener {
332+
private class GeneratedBlockHandler extends BlockGeneratorListener[T] {
331333

332334
/**
333335
* Callback method called after a data item is added into the BlockGenerator.
334336
* The data addition, block generation, and calls to onAddData and onGenerateBlock
335337
* are all synchronized through the same lock.
336338
*/
337-
def onAddData(data: Any, metadata: Any): Unit = {
339+
def onAddData(data: T, metadata: Any): Unit = {
340+
rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange])
341+
}
342+
343+
/**
344+
* Callback method called after a data item is added into the BlockGenerator.
345+
* The data addition, block generation, and calls to onAddData and onGenerateBlock
346+
* are all synchronized through the same lock.
347+
*/
348+
override def onAddData(data: ArrayBuffer[T], metadata: Any): Unit = {
338349
rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange])
339350
}
340351

@@ -348,7 +359,7 @@ private[kinesis] class KinesisReceiver[T](
348359
}
349360

350361
/** Callback method called when a block is ready to be pushed / stored. */
351-
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
362+
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[T]): Unit = {
352363
storeBlockWithRanges(blockId,
353364
arrayBuffer.asInstanceOf[mutable.ArrayBuffer[T]])
354365
}

streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.receiver
2020
import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
2121

2222
import scala.collection.mutable.ArrayBuffer
23+
import scala.reflect.ClassTag
2324

2425
import org.apache.spark.{SparkConf, SparkException}
2526
import org.apache.spark.internal.Logging
@@ -28,7 +29,7 @@ import org.apache.spark.streaming.util.RecurringTimer
2829
import org.apache.spark.util.{Clock, SystemClock}
2930

3031
/** Listener object for BlockGenerator events */
31-
private[streaming] trait BlockGeneratorListener {
32+
private[streaming] abstract class BlockGeneratorListener[T: ClassTag] {
3233
/**
3334
* Called after a data item is added into the BlockGenerator. The data addition and this
3435
* callback are synchronized with the block generation and its associated callback,
@@ -37,8 +38,17 @@ private[streaming] trait BlockGeneratorListener {
3738
* that will be useful when a block is generated. Any long blocking operation in this callback
3839
* will hurt the throughput.
3940
*/
40-
def onAddData(data: Any, metadata: Any): Unit
41+
def onAddData(data: T, metadata: Any): Unit
4142

43+
/**
44+
* Called after multi data items are added into the BlockGenerator. The data addition and this
45+
* callback are synchronized with the block generation and its associated callback,
46+
* so block generation waits for the active data addition+callback to complete. This is useful
47+
* for updating metadata on successful buffering of multi data items, specifically that metadata
48+
* that will be useful when a block is generated. Any long blocking operation in this callback
49+
* will hurt the throughput.
50+
*/
51+
def onAddData(data: ArrayBuffer[T], metadata: Any): Unit
4252
/**
4353
* Called when a new block of data is generated by the block generator. The block generation
4454
* and this callback are synchronized with the data addition and its associated callback, so
@@ -55,7 +65,7 @@ private[streaming] trait BlockGeneratorListener {
5565
* thread, that is not synchronized with any other callbacks. Hence it is okay to do long
5666
* blocking operation in this callback.
5767
*/
58-
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]): Unit
68+
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[T]): Unit
5969

6070
/**
6171
* Called when an error has occurred in the BlockGenerator. Can be called form many places
@@ -74,14 +84,14 @@ private[streaming] trait BlockGeneratorListener {
7484
* Note: Do not create BlockGenerator instances directly inside receivers. Use
7585
* `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it.
7686
*/
77-
private[streaming] class BlockGenerator(
78-
listener: BlockGeneratorListener,
87+
private[streaming] class BlockGenerator[T: ClassTag](
88+
listener: BlockGeneratorListener[T],
7989
receiverId: Int,
8090
conf: SparkConf,
8191
clock: Clock = new SystemClock()
8292
) extends RateLimiter(conf) with Logging {
8393

84-
private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
94+
private case class Block(id: StreamBlockId, buffer: ArrayBuffer[T])
8595

8696
/**
8797
* The BlockGenerator can be in 5 possible states, in the order as follows.
@@ -109,7 +119,7 @@ private[streaming] class BlockGenerator(
109119
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
110120
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
111121

112-
@volatile private var currentBuffer = new ArrayBuffer[Any]
122+
@volatile private var currentBuffer = new ArrayBuffer[T]
113123
@volatile private var state = Initialized
114124

115125
/** Start block generating and pushing threads. */
@@ -158,7 +168,7 @@ private[streaming] class BlockGenerator(
158168
/**
159169
* Push a single data item into the buffer.
160170
*/
161-
def addData(data: Any): Unit = {
171+
def addData(data: T): Unit = {
162172
if (state == Active) {
163173
waitToPush()
164174
synchronized {
@@ -179,7 +189,7 @@ private[streaming] class BlockGenerator(
179189
* Push a single data item into the buffer. After buffering the data, the
180190
* `BlockGeneratorListener.onAddData` callback will be called.
181191
*/
182-
def addDataWithCallback(data: Any, metadata: Any): Unit = {
192+
def addDataWithCallback(data: T, metadata: Any): Unit = {
183193
if (state == Active) {
184194
waitToPush()
185195
synchronized {
@@ -202,10 +212,10 @@ private[streaming] class BlockGenerator(
202212
* `BlockGeneratorListener.onAddData` callback will be called. Note that all the data items
203213
* are atomically added to the buffer, and are hence guaranteed to be present in a single block.
204214
*/
205-
def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = {
215+
def addMultipleDataWithCallback(dataIterator: Iterator[T], metadata: Any): Unit = {
206216
if (state == Active) {
207217
// Unroll iterator into a temp buffer, and wait for pushing in the process
208-
val tempBuffer = new ArrayBuffer[Any]
218+
val tempBuffer = new ArrayBuffer[T]
209219
dataIterator.foreach { data =>
210220
waitToPush()
211221
tempBuffer += data
@@ -236,7 +246,7 @@ private[streaming] class BlockGenerator(
236246
synchronized {
237247
if (currentBuffer.nonEmpty) {
238248
val newBlockBuffer = currentBuffer
239-
currentBuffer = new ArrayBuffer[Any]
249+
currentBuffer = new ArrayBuffer[T]
240250
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
241251
listener.onGenerateBlock(blockId)
242252
newBlock = new Block(blockId, newBlockBuffer)

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,19 @@ import java.nio.ByteBuffer
2121

2222
import scala.collection.mutable.ArrayBuffer
2323
import scala.language.existentials
24+
import scala.reflect.ClassTag
2425

2526
/** Trait representing a received block */
26-
private[streaming] sealed trait ReceivedBlock
27+
private[streaming] abstract class ReceivedBlock[T: ClassTag]
2728

2829
/** class representing a block received as an ArrayBuffer */
29-
private[streaming] case class ArrayBufferBlock(arrayBuffer: ArrayBuffer[_]) extends ReceivedBlock
30+
private[streaming] case class ArrayBufferBlock[T: ClassTag](arrayBuffer: ArrayBuffer[T])
31+
extends ReceivedBlock[T]
3032

3133
/** class representing a block received as an Iterator */
32-
private[streaming] case class IteratorBlock(iterator: Iterator[_]) extends ReceivedBlock
34+
private[streaming] case class IteratorBlock[T: ClassTag](iterator: Iterator[T])
35+
extends ReceivedBlock[T]
3336

3437
/** class representing a block received as a ByteBuffer */
35-
private[streaming] case class ByteBufferBlock(byteBuffer: ByteBuffer) extends ReceivedBlock
38+
private[streaming] case class ByteBufferBlock[T: ClassTag](byteBuffer: ByteBuffer)
39+
extends ReceivedBlock[T]

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.receiver
2020
import scala.concurrent.{ExecutionContext, Future}
2121
import scala.concurrent.duration._
2222
import scala.language.{existentials, postfixOps}
23+
import scala.reflect.ClassTag
2324

2425
import org.apache.hadoop.conf.Configuration
2526
import org.apache.hadoop.fs.Path
@@ -42,10 +43,10 @@ private[streaming] trait ReceivedBlockStoreResult {
4243
}
4344

4445
/** Trait that represents a class that handles the storage of blocks received by receiver */
45-
private[streaming] trait ReceivedBlockHandler {
46+
private[streaming] abstract class ReceivedBlockHandler[T: ClassTag] {
4647

4748
/** Store a received block with the given block id and return related metadata */
48-
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult
49+
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock[T]): ReceivedBlockStoreResult
4950

5051
/** Cleanup old blocks older than the given threshold time */
5152
def cleanupOldBlocks(threshTime: Long): Unit
@@ -66,11 +67,11 @@ private[streaming] case class BlockManagerBasedStoreResult(
6667
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
6768
* stores the received blocks into a block manager with the specified storage level.
6869
*/
69-
private[streaming] class BlockManagerBasedBlockHandler(
70+
private[streaming] class BlockManagerBasedBlockHandler[T: ClassTag](
7071
blockManager: BlockManager, storageLevel: StorageLevel)
71-
extends ReceivedBlockHandler with Logging {
72+
extends ReceivedBlockHandler[T] with Logging {
7273

73-
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
74+
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock[T]): ReceivedBlockStoreResult = {
7475

7576
var numRecords: Option[Long] = None
7677

@@ -122,7 +123,7 @@ private[streaming] case class WriteAheadLogBasedStoreResult(
122123
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
123124
* stores the received blocks in both, a write ahead log and a block manager.
124125
*/
125-
private[streaming] class WriteAheadLogBasedBlockHandler(
126+
private[streaming] class WriteAheadLogBasedBlockHandler[T: ClassTag](
126127
blockManager: BlockManager,
127128
serializerManager: SerializerManager,
128129
streamId: Int,
@@ -131,7 +132,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
131132
hadoopConf: Configuration,
132133
checkpointDir: String,
133134
clock: Clock = new SystemClock
134-
) extends ReceivedBlockHandler with Logging {
135+
) extends ReceivedBlockHandler[T] with Logging {
135136

136137
private val blockStoreTimeout = conf.getInt(
137138
"spark.streaming.receiver.blockStoreTimeout", 30).seconds
@@ -168,7 +169,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
168169
* It does this in parallel, using Scala Futures, and returns only after the block has
169170
* been stored in both places.
170171
*/
171-
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
172+
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock[T]): ReceivedBlockStoreResult = {
172173

173174
var numRecords = Option.empty[Long]
174175
// Serialize the block so that it can be inserted into both

streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
2121

2222
import scala.collection.mutable.ArrayBuffer
2323
import scala.collection.JavaConverters._
24+
import scala.reflect.{classTag, ClassTag}
2425

2526
import org.apache.spark.annotation.DeveloperApi
2627
import org.apache.spark.storage.StorageLevel
@@ -83,7 +84,12 @@ import org.apache.spark.storage.StorageLevel
8384
* }}}
8485
*/
8586
@DeveloperApi
86-
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
87+
abstract class Receiver[T](val storageLevel: StorageLevel)(implicit t: ClassTag[T])
88+
extends Serializable {
89+
90+
def this(clz: Class[T], storageLevel: StorageLevel) {
91+
this(storageLevel)(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
92+
}
8793

8894
/**
8995
* This method is called by the system when the receiver is started. This function
@@ -257,25 +263,27 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
257263
private var id: Int = -1
258264

259265
/** Handler object that runs the receiver. This is instantiated lazily in the worker. */
260-
@transient private var _supervisor: ReceiverSupervisor = null
266+
@transient private var _supervisor: ReceiverSupervisor[T] = null
261267

262268
/** Set the ID of the DStream that this receiver is associated with. */
263269
private[streaming] def setReceiverId(_id: Int) {
264270
id = _id
265271
}
266272

267273
/** Attach Network Receiver executor to this receiver. */
268-
private[streaming] def attachSupervisor(exec: ReceiverSupervisor) {
274+
private[streaming] def attachSupervisor(exec: ReceiverSupervisor[T]) {
269275
assert(_supervisor == null)
270276
_supervisor = exec
271277
}
272278

273279
/** Get the attached supervisor. */
274-
private[streaming] def supervisor: ReceiverSupervisor = {
280+
private[streaming] def supervisor: ReceiverSupervisor[T] = {
275281
assert(_supervisor != null,
276282
"A ReceiverSupervisor has not been attached to the receiver yet. Maybe you are starting " +
277283
"some computation in the receiver before the Receiver.onStart() has been called.")
278284
_supervisor
279285
}
286+
287+
def getClassTag: ClassTag[T] = t
280288
}
281289

0 commit comments

Comments
 (0)