Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
}

def getSerializer(ct: ClassTag[_]): Serializer = {
if (canUseKryo(ct)) {
// SPARK-18617: As feature in SPARK-13990 can not be applied to Spark Streaming now. The worst
// result is streaming job based on `Receiver` mode can not run on Spark 2.x properly. It may be
// a rational choice to close `kryo auto pick` feature for streaming in the first step.
def getSerializer(ct: ClassTag[_], autoPick: Boolean): Serializer = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document what this means

if (autoPick && canUseKryo(ct)) {
kryoSerializer
} else {
defaultSerializer
Expand Down Expand Up @@ -155,7 +158,8 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
outputStream: OutputStream,
values: Iterator[T]): Unit = {
val byteStream = new BufferedOutputStream(outputStream)
val ser = getSerializer(implicitly[ClassTag[T]]).newInstance()
val autoPick = !blockId.isInstanceOf[StreamBlockId]
val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
}

Expand All @@ -171,7 +175,8 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
classTag: ClassTag[_]): ChunkedByteBuffer = {
val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
val byteStream = new BufferedOutputStream(bbos)
val ser = getSerializer(classTag).newInstance()
val autoPick = !blockId.isInstanceOf[StreamBlockId]
val ser = getSerializer(classTag, autoPick).newInstance()
ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
bbos.toChunkedByteBuffer
}
Expand All @@ -185,7 +190,8 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
inputStream: InputStream)
(classTag: ClassTag[T]): Iterator[T] = {
val stream = new BufferedInputStream(inputStream)
getSerializer(classTag)
val autoPick = !blockId.isInstanceOf[StreamBlockId]
getSerializer(classTag, autoPick)
.newInstance()
.deserializeStream(wrapStream(blockId, stream))
.asIterator.asInstanceOf[Iterator[T]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId}
import org.apache.spark.unsafe.Platform
import org.apache.spark.util.{SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
Expand Down Expand Up @@ -334,7 +334,8 @@ private[spark] class MemoryStore(
val bbos = new ChunkedByteBufferOutputStream(initialMemoryThreshold.toInt, allocator)
redirectableStream.setOutputStream(bbos)
val serializationStream: SerializationStream = {
val ser = serializerManager.getSerializer(classTag).newInstance()
val autoPick = !blockId.isInstanceOf[StreamBlockId]
val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
ser.serializeStream(serializerManager.wrapStream(blockId, redirectableStream))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class PartiallySerializedBlockSuite
spy
}

val serializer = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
val serializer = serializerManager
.getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance()
val redirectableOutputStream = Mockito.spy(new RedirectableOutputStream)
redirectableOutputStream.setOutputStream(bbos)
val serializationStream = Mockito.spy(serializer.serializeStream(redirectableOutputStream))
Expand Down Expand Up @@ -182,7 +183,8 @@ class PartiallySerializedBlockSuite
Mockito.verifyNoMoreInteractions(memoryStore)
Mockito.verify(partiallySerializedBlock.getUnrolledChunkedByteBuffer, atLeastOnce).dispose()

val serializer = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
val serializer = serializerManager
.getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance()
val deserialized =
serializer.deserializeStream(new ByteBufferInputStream(bbos.toByteBuffer)).asIterator.toSeq
assert(deserialized === items)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,28 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
ssc.stop()
}

test("SPARK-18560 Receiver data should be deserialized properly.") {
// Start a two nodes cluster, so receiver will use one node, and Spark jobs will use the
// other one. Then Spark jobs need to fetch remote blocks and it will trigger SPARK-18560.
val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName)
ssc = new StreamingContext(conf, Milliseconds(100))
val input = ssc.receiverStream(new FakeByteArrayReceiver)
input.count().foreachRDD { rdd =>
// Make sure we can read from BlockRDD
if (rdd.collect().headOption.getOrElse(0L) > 0) {
// Stop StreamingContext to unblock "awaitTerminationOrTimeout"
new Thread() {
setDaemon(true)
override def run(): Unit = {
ssc.stop(stopSparkContext = true, stopGracefully = false)
}
}.start()
}
}
ssc.start()
ssc.awaitTerminationOrTimeout(60000)
}

def addInputStream(s: StreamingContext): DStream[Int] = {
val input = (1 to 100).map(i => 1 to i)
val inputStream = new TestInputStream(s, input, 1)
Expand Down Expand Up @@ -869,6 +891,31 @@ object TestReceiver {
val counter = new AtomicInteger(1)
}

class FakeByteArrayReceiver extends Receiver[Array[Byte]](StorageLevel.MEMORY_ONLY) with Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why create a new class? Is there any concern to just use TestReceiver?

Copy link
Contributor Author

@uncleGen uncleGen Nov 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing yes, failure occurs when receiver store Array[Byte] data and the automatic serializer selection would pick JavaSerializer. However, after get from remote executor, the input-stream data will be deserialized with KryoSerializer, leading to the com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, existing unit test could cover other cases besides Array[Byte] type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any class in primitiveAndPrimitiveArrayClassTags can trigger this issue. That's why you can also use the existing TestReceiver. It's Receiver[Int].


val data: Array[Byte] = "test".getBytes
var receivingThreadOption: Option[Thread] = None

override def onStart(): Unit = {
val thread = new Thread() {
override def run() {
logInfo("Receiving started")
while (!isStopped) {
store(data)
}
logInfo("Receiving stopped")
}
}
receivingThreadOption = Some(thread)
thread.start()
}

override def onStop(): Unit = {
// no clean to be done, the receiving thread should stop on it own, so just wait for it.
receivingThreadOption.foreach(_.join())
}
}

/** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */
class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
Expand Down