@@ -207,80 +207,91 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
207207
208208 /** Custom MemoryStream that waits for manual clock to reach a time */
209209 val inputData = new MemoryStream [Int ](0 , sqlContext) {
210- // Wait for manual clock to be 100 first time there is data
210+ // getOffset should take 50 ms the first time it is called
211211 override def getOffset : Option [Offset ] = {
212212 val offset = super .getOffset
213213 if (offset.nonEmpty) {
214- clock.waitTillTime(300 )
214+ clock.waitTillTime(1050 )
215215 }
216216 offset
217217 }
218218
219- // Wait for manual clock to be 300 first time there is data
219+ // getBatch should take 100 ms the first time it is called
220220 override def getBatch (start : Option [Offset ], end : Offset ): DataFrame = {
221- clock.waitTillTime(600 )
221+ if (start.isEmpty) clock.waitTillTime(1150 )
222222 super .getBatch(start, end)
223223 }
224224 }
225225
226- // This is to make sure thatquery waits for manual clock to be 600 first time there is data
227- val mapped = inputData.toDS( ).as[Long ].map { x =>
228- clock.waitTillTime(1100 )
226+ // query execution should take 350 ms the first time it is called
227+ val mapped = inputData.toDS.coalesce( 1 ).as[Long ].map { x =>
228+ clock.waitTillTime(1500 ) // this will only wait the first time when clock < 1500
229229 10 / x
230230 }.agg(count(" *" )).as[Long ]
231231
232- case class AssertStreamExecThreadToWaitForClock ( )
232+ case class AssertStreamExecThreadIsWaitingForTime ( targetTime : Long )
233233 extends AssertOnQuery (q => {
234234 eventually(Timeout (streamingTimeout)) {
235235 if (q.exception.isEmpty) {
236- assert(clock.asInstanceOf [ StreamManualClock ].isStreamWaitingAt(clock.getTimeMillis ))
236+ assert(clock.isStreamWaitingFor(targetTime ))
237237 }
238238 }
239239 if (q.exception.isDefined) {
240240 throw q.exception.get
241241 }
242242 true
243- }, " " )
243+ }, " " ) {
244+ override def toString : String = s " AssertStreamExecThreadIsWaitingForTime( $targetTime) "
245+ }
246+
247+ case class AssertClockTime (time : Long )
248+ extends AssertOnQuery (q => clock.getTimeMillis() === time, " " ) {
249+ override def toString : String = s " AssertClockTime( $time) "
250+ }
244251
245252 var lastProgressBeforeStop : StreamingQueryProgress = null
246253
247254 testStream(mapped, OutputMode .Complete )(
248- StartStream (ProcessingTime (100 ), triggerClock = clock),
249- AssertStreamExecThreadToWaitForClock ( ),
255+ StartStream (ProcessingTime (1000 ), triggerClock = clock),
256+ AssertStreamExecThreadIsWaitingForTime ( 1000 ),
250257 AssertOnQuery (_.status.isDataAvailable === false ),
251258 AssertOnQuery (_.status.isTriggerActive === false ),
252259 AssertOnQuery (_.status.message === " Waiting for next trigger" ),
253260 AssertOnQuery (_.recentProgress.count(_.numInputRows > 0 ) === 0 ),
254261
255262 // Test status and progress while offset is being fetched
256263 AddData (inputData, 1 , 2 ),
257- AdvanceManualClock (100 ), // time = 100 to start new trigger, will block on getOffset
258- AssertStreamExecThreadToWaitForClock ( ),
264+ AdvanceManualClock (1000 ), // time = 1000 to start new trigger, will block on getOffset
265+ AssertStreamExecThreadIsWaitingForTime ( 1050 ),
259266 AssertOnQuery (_.status.isDataAvailable === false ),
260267 AssertOnQuery (_.status.isTriggerActive === true ),
261268 AssertOnQuery (_.status.message.startsWith(" Getting offsets from" )),
262269 AssertOnQuery (_.recentProgress.count(_.numInputRows > 0 ) === 0 ),
263270
264271 // Test status and progress while batch is being fetched
265- AdvanceManualClock (200 ), // time = 300 to unblock getOffset, will block on getBatch
266- AssertStreamExecThreadToWaitForClock (),
272+ AdvanceManualClock (50 ), // time = 1050 to unblock getOffset
273+ AssertClockTime (1050 ),
274+ AssertStreamExecThreadIsWaitingForTime (1150 ), // will block on getBatch that needs 1150
267275 AssertOnQuery (_.status.isDataAvailable === true ),
268276 AssertOnQuery (_.status.isTriggerActive === true ),
269277 AssertOnQuery (_.status.message === " Processing new data" ),
270278 AssertOnQuery (_.recentProgress.count(_.numInputRows > 0 ) === 0 ),
271279
272280 // Test status and progress while batch is being processed
273- AdvanceManualClock (300 ), // time = 600 to unblock getBatch, will block in Spark job
281+ AdvanceManualClock (100 ), // time = 1150 to unblock getBatch
282+ AssertClockTime (1150 ),
283+ AssertStreamExecThreadIsWaitingForTime (1500 ), // will block in Spark job that needs 1500
274284 AssertOnQuery (_.status.isDataAvailable === true ),
275285 AssertOnQuery (_.status.isTriggerActive === true ),
276286 AssertOnQuery (_.status.message === " Processing new data" ),
277287 AssertOnQuery (_.recentProgress.count(_.numInputRows > 0 ) === 0 ),
278288
279289 // Test status and progress while batch processing has completed
280- AdvanceManualClock (500 ), // time = 1100 to unblock job
281- AssertOnQuery { _ => clock.getTimeMillis() === 1100 },
290+ AssertOnQuery { _ => clock.getTimeMillis() === 1150 },
291+ AdvanceManualClock (350 ), // time = 1500 to unblock job
292+ AssertClockTime (1500 ),
282293 CheckAnswer (2 ),
283- AssertStreamExecThreadToWaitForClock ( ),
294+ AssertStreamExecThreadIsWaitingForTime ( 2000 ),
284295 AssertOnQuery (_.status.isDataAvailable === true ),
285296 AssertOnQuery (_.status.isTriggerActive === false ),
286297 AssertOnQuery (_.status.message === " Waiting for next trigger" ),
@@ -293,21 +304,21 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
293304 assert(progress.id === query.id)
294305 assert(progress.name === query.name)
295306 assert(progress.batchId === 0 )
296- assert(progress.timestamp === " 1970-01-01T00:00:00.100Z " ) // 100 ms in UTC
307+ assert(progress.timestamp === " 1970-01-01T00:00:01.000Z " ) // 100 ms in UTC
297308 assert(progress.numInputRows === 2 )
298- assert(progress.processedRowsPerSecond === 2 .0 )
309+ assert(progress.processedRowsPerSecond === 4 .0 )
299310
300- assert(progress.durationMs.get(" getOffset" ) === 200 )
301- assert(progress.durationMs.get(" getBatch" ) === 300 )
311+ assert(progress.durationMs.get(" getOffset" ) === 50 )
312+ assert(progress.durationMs.get(" getBatch" ) === 100 )
302313 assert(progress.durationMs.get(" queryPlanning" ) === 0 )
303314 assert(progress.durationMs.get(" walCommit" ) === 0 )
304- assert(progress.durationMs.get(" triggerExecution" ) === 1000 )
315+ assert(progress.durationMs.get(" triggerExecution" ) === 500 )
305316
306317 assert(progress.sources.length === 1 )
307318 assert(progress.sources(0 ).description contains " MemoryStream" )
308319 assert(progress.sources(0 ).startOffset === null )
309320 assert(progress.sources(0 ).endOffset !== null )
310- assert(progress.sources(0 ).processedRowsPerSecond === 2 .0 )
321+ assert(progress.sources(0 ).processedRowsPerSecond === 4 .0 ) // 2 rows processed in 500 ms
311322
312323 assert(progress.stateOperators.length === 1 )
313324 assert(progress.stateOperators(0 ).numRowsUpdated === 1 )
@@ -317,23 +328,27 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
317328 true
318329 },
319330
331+ // Test whether input rate is updated after two batches
332+ AssertStreamExecThreadIsWaitingForTime (2000 ), // blocked waiting for next trigger time
320333 AddData (inputData, 1 , 2 ),
321- AdvanceManualClock (100 ), // allow another trigger
322- AssertStreamExecThreadToWaitForClock (),
334+ AdvanceManualClock (500 ), // allow another trigger
335+ AssertClockTime (2000 ),
336+ AssertStreamExecThreadIsWaitingForTime (3000 ), // will block waiting for next trigger time
323337 CheckAnswer (4 ),
324338 AssertOnQuery (_.status.isDataAvailable === true ),
325339 AssertOnQuery (_.status.isTriggerActive === false ),
326340 AssertOnQuery (_.status.message === " Waiting for next trigger" ),
327341 AssertOnQuery { query =>
328342 assert(query.recentProgress.last.eq(query.lastProgress))
329343 assert(query.lastProgress.batchId === 1 )
330- assert(query.lastProgress.sources(0 ).inputRowsPerSecond === 1.818 )
344+ assert(query.lastProgress.inputRowsPerSecond === 2.0 )
345+ assert(query.lastProgress.sources(0 ).inputRowsPerSecond === 2.0 )
331346 true
332347 },
333348
334349 // Test status and progress after data is not available for a trigger
335- AdvanceManualClock (100 ), // allow another trigger
336- AssertStreamExecThreadToWaitForClock ( ),
350+ AdvanceManualClock (1000 ), // allow another trigger
351+ AssertStreamExecThreadIsWaitingForTime ( 4000 ),
337352 AssertOnQuery (_.status.isDataAvailable === false ),
338353 AssertOnQuery (_.status.isTriggerActive === false ),
339354 AssertOnQuery (_.status.message === " Waiting for next trigger" ),
@@ -350,10 +365,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
350365 AssertOnQuery (_.status.message === " Stopped" ),
351366
352367 // Test status and progress after query terminated with error
353- StartStream (ProcessingTime (100 ), triggerClock = clock),
354- AdvanceManualClock (100 ), // ensure initial trigger completes before AddData
368+ StartStream (ProcessingTime (1000 ), triggerClock = clock),
369+ AdvanceManualClock (1000 ), // ensure initial trigger completes before AddData
355370 AddData (inputData, 0 ),
356- AdvanceManualClock (100 ), // allow another trigger
371+ AdvanceManualClock (1000 ), // allow another trigger
357372 ExpectFailure [SparkException ](),
358373 AssertOnQuery (_.status.isDataAvailable === false ),
359374 AssertOnQuery (_.status.isTriggerActive === false ),
@@ -678,5 +693,5 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
678693
679694object StreamingQuerySuite {
680695 // Singleton reference to clock that does not get serialized in task closures
681- var clock : ManualClock = null
696+ var clock : StreamManualClock = null
682697}
0 commit comments