Skip to content

Commit ccf010f

Browse files
SaintBacchussrowen
authored andcommitted
[SPARK-8367] [STREAMING] Add a limit for 'spark.streaming.blockInterval` since a data loss bug.
Bug had reported in the jira [SPARK-8367](https://issues.apache.org/jira/browse/SPARK-8367) The relution is limitting the configuration `spark.streaming.blockInterval` to a positive number. Author: huangzhaowei <[email protected]> Author: huangzhaowei <[email protected]> Closes apache#6818 from SaintBacchus/SPARK-8367 and squashes the following commits: c9d1927 [huangzhaowei] Update BlockGenerator.scala bd3f71a [huangzhaowei] Use requre instead of if 3d17796 [huangzhaowei] [SPARK_8367][Streaming]Add a limit for 'spark.streaming.blockInterval' since a data loss bug.
1 parent bc76a0f commit ccf010f

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
2424
import org.apache.spark.{Logging, SparkConf}
2525
import org.apache.spark.storage.StreamBlockId
2626
import org.apache.spark.streaming.util.RecurringTimer
27-
import org.apache.spark.util.{SystemClock, Utils}
27+
import org.apache.spark.util.SystemClock
2828

2929
/** Listener object for BlockGenerator events */
3030
private[streaming] trait BlockGeneratorListener {
@@ -80,6 +80,8 @@ private[streaming] class BlockGenerator(
8080

8181
private val clock = new SystemClock()
8282
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
83+
require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value")
84+
8385
private val blockIntervalTimer =
8486
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
8587
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)

0 commit comments

Comments
 (0)