Skip to content

Commit 500b456

Browse files
committed
Do not replicate streaming block when WAL is enabled
1 parent 5e7a6dc commit 500b456

File tree

1 file changed

+19
-1
lines changed

1 file changed

+19
-1
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,24 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
121121
private val maxFailures = conf.getInt(
122122
"spark.streaming.receiver.writeAheadLog.maxFailures", 3)
123123

124+
private val effectiveStorageLevel = {
125+
if (storageLevel.deserialized) {
126+
logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
127+
s" write ahead log is enabled, change to serialization false")
128+
}
129+
if (storageLevel.replication > 1) {
130+
logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
131+
s"write ahead log is enabled, change to replication 1")
132+
}
133+
134+
StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
135+
}
136+
137+
if (storageLevel != effectiveStorageLevel) {
138+
logWarning(s"User defined storage level $storageLevel is changed to effective storage level " +
139+
s"$effectiveStorageLevel when write ahead log is enabled")
140+
}
141+
124142
// Manages rolling log files
125143
private val logManager = new WriteAheadLogManager(
126144
checkpointDirToLogDir(checkpointDir, streamId),
@@ -156,7 +174,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
156174
// Store the block in block manager
157175
val storeInBlockManagerFuture = Future {
158176
val putResult =
159-
blockManager.putBytes(blockId, serializedBlock, storageLevel, tellMaster = true)
177+
blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
160178
if (!putResult.map { _._1 }.contains(blockId)) {
161179
throw new SparkException(
162180
s"Could not store $blockId to block manager with storage level $storageLevel")

0 commit comments

Comments
 (0)