Skip to content

Commit 1ee8eb4

Browse files
brkyvztdas
authored andcommitted
[SPARK-7745] Change asserts to requires for user input checks in Spark Streaming
Assertions can be turned off. `require` throws an `IllegalArgumentException` which makes more sense when it's a user set variable. Author: Burak Yavuz <[email protected]> Closes #6271 from brkyvz/streaming-require and squashes the following commits: d249484 [Burak Yavuz] fix merge conflict 264adb8 [Burak Yavuz] addressed comments v1.0 6161350 [Burak Yavuz] fix tests 16aa766 [Burak Yavuz] changed more assertions to more meaningful errors afd923d [Burak Yavuz] changed some assertions to require
1 parent 947ea1c commit 1ee8eb4

File tree

7 files changed

+38
-38
lines changed

7 files changed

+38
-38
lines changed

streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
157157

158158
def validate() {
159159
this.synchronized {
160-
assert(batchDuration != null, "Batch duration has not been set")
160+
require(batchDuration != null, "Batch duration has not been set")
161161
// assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
162162
// " is very low")
163-
assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute")
163+
require(getOutputStreams().size > 0, "No output operations registered, so nothing to execute")
164164
}
165165
}
166166

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ class StreamingContext private[streaming] (
156156
cp_.graph.restoreCheckpointData()
157157
cp_.graph
158158
} else {
159-
assert(batchDur_ != null, "Batch duration for streaming context cannot be null")
159+
require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
160160
val newGraph = new DStreamGraph()
161161
newGraph.setBatchDuration(batchDur_)
162162
newGraph
@@ -462,7 +462,8 @@ class StreamingContext private[streaming] (
462462
directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf)
463463
val data = br.map { case (k, v) =>
464464
val bytes = v.getBytes
465-
assert(bytes.length == recordLength, "Byte array does not have correct length")
465+
require(bytes.length == recordLength, "Byte array does not have correct length. " +
466+
s"${bytes.length} did not equal recordLength: $recordLength")
466467
bytes
467468
}
468469
data
@@ -568,7 +569,7 @@ class StreamingContext private[streaming] (
568569
/**
569570
* Start the execution of the streams.
570571
*
571-
* @throws SparkException if the StreamingContext is already stopped.
572+
* @throws IllegalStateException if the StreamingContext is already stopped.
572573
*/
573574
def start(): Unit = synchronized {
574575
state match {
@@ -587,7 +588,7 @@ class StreamingContext private[streaming] (
587588
case ACTIVE =>
588589
logWarning("StreamingContext has already been started")
589590
case STOPPED =>
590-
throw new SparkException("StreamingContext has already been stopped")
591+
throw new IllegalStateException("StreamingContext has already been stopped")
591592
}
592593
}
593594

@@ -689,7 +690,7 @@ object StreamingContext extends Logging {
689690
private def assertNoOtherContextIsActive(): Unit = {
690691
ACTIVATION_LOCK.synchronized {
691692
if (activeContext.get() != null) {
692-
throw new SparkException(
693+
throw new IllegalStateException(
693694
"Only one StreamingContext may be started in this JVM. " +
694695
"Currently running StreamingContext was started at" +
695696
activeContext.get.startSite.get.longForm)

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ private[python] object PythonTransformFunctionSerializer {
109109
}
110110

111111
def serialize(func: PythonTransformFunction): Array[Byte] = {
112-
assert(serializer != null, "Serializer has not been registered!")
112+
require(serializer != null, "Serializer has not been registered!")
113113
// get the id of PythonTransformFunction in py4j
114114
val h = Proxy.getInvocationHandler(func.asInstanceOf[Proxy])
115115
val f = h.getClass().getDeclaredField("id")
@@ -119,7 +119,7 @@ private[python] object PythonTransformFunctionSerializer {
119119
}
120120

121121
def deserialize(bytes: Array[Byte]): PythonTransformFunction = {
122-
assert(serializer != null, "Serializer has not been registered!")
122+
require(serializer != null, "Serializer has not been registered!")
123123
serializer.loads(bytes)
124124
}
125125
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -217,53 +217,52 @@ abstract class DStream[T: ClassTag] (
217217
case StreamingContextState.INITIALIZED =>
218218
// good to go
219219
case StreamingContextState.ACTIVE =>
220-
throw new SparkException(
220+
throw new IllegalStateException(
221221
"Adding new inputs, transformations, and output operations after " +
222222
"starting a context is not supported")
223223
case StreamingContextState.STOPPED =>
224-
throw new SparkException(
224+
throw new IllegalStateException(
225225
"Adding new inputs, transformations, and output operations after " +
226226
"stopping a context is not supported")
227227
}
228228
}
229229

230230
private[streaming] def validateAtStart() {
231-
assert(rememberDuration != null, "Remember duration is set to null")
231+
require(rememberDuration != null, "Remember duration is set to null")
232232

233-
assert(
233+
require(
234234
!mustCheckpoint || checkpointDuration != null,
235235
"The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." +
236236
" Please use DStream.checkpoint() to set the interval."
237237
)
238238

239-
assert(
239+
require(
240240
checkpointDuration == null || context.sparkContext.checkpointDir.isDefined,
241-
"The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" +
242-
" or SparkContext.checkpoint() to set the checkpoint directory."
241+
"The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()."
243242
)
244243

245-
assert(
244+
require(
246245
checkpointDuration == null || checkpointDuration >= slideDuration,
247246
"The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
248247
checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " +
249248
"Please set it to at least " + slideDuration + "."
250249
)
251250

252-
assert(
251+
require(
253252
checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration),
254253
"The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
255254
checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " +
256-
"Please set it to a multiple " + slideDuration + "."
255+
"Please set it to a multiple of " + slideDuration + "."
257256
)
258257

259-
assert(
258+
require(
260259
checkpointDuration == null || storageLevel != StorageLevel.NONE,
261260
"" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " +
262261
"level has not been set to enable persisting. Please use DStream.persist() to set the " +
263262
"storage level to use memory for better checkpointing performance."
264263
)
265264

266-
assert(
265+
require(
267266
checkpointDuration == null || rememberDuration > checkpointDuration,
268267
"The remember duration for " + this.getClass.getSimpleName + " has been set to " +
269268
rememberDuration + " which is not more than the checkpoint interval (" +
@@ -272,7 +271,7 @@ abstract class DStream[T: ClassTag] (
272271

273272
val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf)
274273
logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
275-
assert(
274+
require(
276275
metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
277276
"It seems you are doing some DStream window operation or setting a checkpoint interval " +
278277
"which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
@@ -633,8 +632,8 @@ abstract class DStream[T: ClassTag] (
633632
* 'this' DStream will be registered as an output stream and therefore materialized.
634633
*/
635634
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
636-
// because the DStream is reachable from the outer object here, and because
637-
// DStreams can't be serialized with closures, we can't proactively check
635+
// because the DStream is reachable from the outer object here, and because
636+
// DStreams can't be serialized with closures, we can't proactively check
638637
// it for serializability and so we pass the optional false to SparkContext.clean
639638
new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()
640639
}
@@ -644,8 +643,8 @@ abstract class DStream[T: ClassTag] (
644643
* on each RDD of 'this' DStream.
645644
*/
646645
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
647-
// because the DStream is reachable from the outer object here, and because
648-
// DStreams can't be serialized with closures, we can't proactively check
646+
// because the DStream is reachable from the outer object here, and because
647+
// DStreams can't be serialized with closures, we can't proactively check
649648
// it for serializability and so we pass the optional false to SparkContext.clean
650649
val cleanedF = context.sparkContext.clean(transformFunc, false)
651650
transform((r: RDD[T], t: Time) => cleanedF(r))
@@ -656,8 +655,8 @@ abstract class DStream[T: ClassTag] (
656655
* on each RDD of 'this' DStream.
657656
*/
658657
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope {
659-
// because the DStream is reachable from the outer object here, and because
660-
// DStreams can't be serialized with closures, we can't proactively check
658+
// because the DStream is reachable from the outer object here, and because
659+
// DStreams can't be serialized with closures, we can't proactively check
661660
// it for serializability and so we pass the optional false to SparkContext.clean
662661
val cleanedF = context.sparkContext.clean(transformFunc, false)
663662
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
@@ -674,8 +673,8 @@ abstract class DStream[T: ClassTag] (
674673
def transformWith[U: ClassTag, V: ClassTag](
675674
other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
676675
): DStream[V] = ssc.withScope {
677-
// because the DStream is reachable from the outer object here, and because
678-
// DStreams can't be serialized with closures, we can't proactively check
676+
// because the DStream is reachable from the outer object here, and because
677+
// DStreams can't be serialized with closures, we can't proactively check
679678
// it for serializability and so we pass the optional false to SparkContext.clean
680679
val cleanedF = ssc.sparkContext.clean(transformFunc, false)
681680
transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
@@ -688,8 +687,8 @@ abstract class DStream[T: ClassTag] (
688687
def transformWith[U: ClassTag, V: ClassTag](
689688
other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
690689
): DStream[V] = ssc.withScope {
691-
// because the DStream is reachable from the outer object here, and because
692-
// DStreams can't be serialized with closures, we can't proactively check
690+
// because the DStream is reachable from the outer object here, and because
691+
// DStreams can't be serialized with closures, we can't proactively check
693692
// it for serializability and so we pass the optional false to SparkContext.clean
694693
val cleanedF = ssc.sparkContext.clean(transformFunc, false)
695694
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
4040
partitioner: Partitioner
4141
) extends DStream[(K,V)](parent.ssc) {
4242

43-
assert(_windowDuration.isMultipleOf(parent.slideDuration),
43+
require(_windowDuration.isMultipleOf(parent.slideDuration),
4444
"The window duration of ReducedWindowedDStream (" + _windowDuration + ") " +
4545
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
4646
)
4747

48-
assert(_slideDuration.isMultipleOf(parent.slideDuration),
48+
require(_slideDuration.isMultipleOf(parent.slideDuration),
4949
"The slide duration of ReducedWindowedDStream (" + _slideDuration + ") " +
5050
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
5151
)

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ private[streaming] class ReceivedBlockTracker(
153153
* returns only after the files are cleaned up.
154154
*/
155155
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
156-
assert(cleanupThreshTime.milliseconds < clock.getTimeMillis())
156+
require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
157157
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
158158
logInfo("Deleting batches " + timesToCleanup)
159159
writeToLog(BatchCleanupEvent(timesToCleanup))

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
182182
ssc = new StreamingContext(master, appName, batchDuration)
183183
addInputStream(ssc).register()
184184
ssc.stop()
185-
intercept[SparkException] {
185+
intercept[IllegalStateException] {
186186
ssc.start() // start after stop should throw exception
187187
}
188188
assert(ssc.getState() === StreamingContextState.STOPPED)
@@ -600,7 +600,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
600600
val anotherInput = addInputStream(anotherSsc)
601601
anotherInput.foreachRDD { rdd => rdd.count }
602602

603-
val exception = intercept[SparkException] {
603+
val exception = intercept[IllegalStateException] {
604604
anotherSsc.start()
605605
}
606606
assert(exception.getMessage.contains("StreamingContext"), "Did not get the right exception")
@@ -623,7 +623,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
623623

624624
def testForException(clue: String, expectedErrorMsg: String)(body: => Unit): Unit = {
625625
withClue(clue) {
626-
val ex = intercept[SparkException] {
626+
val ex = intercept[IllegalStateException] {
627627
body
628628
}
629629
assert(ex.getMessage.toLowerCase().contains(expectedErrorMsg))

0 commit comments

Comments
 (0)