From c060e6b1b811f1e55d4ac0becf38683cfc1fe536 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 11 Apr 2017 19:48:39 -0700 Subject: [PATCH 1/3] ready for jenkins --- .../execution/streaming/StreamExecution.scala | 56 +++++------ .../spark/sql/streaming/StreamTest.scala | 6 ++ .../streaming/StreamingAggregationSuite.scala | 96 ++++++++++--------- 3 files changed, 81 insertions(+), 77 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 8857966676ae..bcf0d970f7ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -284,42 +284,38 @@ class StreamExecution( triggerExecutor.execute(() => { startTrigger() - val continueToRun = - if (isActive) { - reportTimeTaken("triggerExecution") { - if (currentBatchId < 0) { - // We'll do this initialization only once - populateStartOffsets(sparkSessionToRunBatches) - logDebug(s"Stream running from $committedOffsets to $availableOffsets") - } else { - constructNextBatch() - } - if (dataAvailable) { - currentStatus = currentStatus.copy(isDataAvailable = true) - updateStatusMessage("Processing new data") - runBatch(sparkSessionToRunBatches) - } + if (isActive) { + reportTimeTaken("triggerExecution") { + if (currentBatchId < 0) { + // We'll do this initialization only once + populateStartOffsets(sparkSessionToRunBatches) + logDebug(s"Stream running from $committedOffsets to $availableOffsets") + } else { + constructNextBatch() } - // Report trigger as finished and construct progress object. - finishTrigger(dataAvailable) if (dataAvailable) { - // Update committed offsets. - batchCommitLog.add(currentBatchId) - committedOffsets ++= availableOffsets - logDebug(s"batch ${currentBatchId} committed") - // We'll increase currentBatchId after we complete processing current batch's data - currentBatchId += 1 - } else { - currentStatus = currentStatus.copy(isDataAvailable = false) - updateStatusMessage("Waiting for data to arrive") - Thread.sleep(pollingDelayMs) + currentStatus = currentStatus.copy(isDataAvailable = true) + updateStatusMessage("Processing new data") + runBatch(sparkSessionToRunBatches) } - true + } + // Report trigger as finished and construct progress object. + finishTrigger(dataAvailable) + if (dataAvailable) { + // Update committed offsets. + batchCommitLog.add(currentBatchId) + committedOffsets ++= availableOffsets + logDebug(s"batch ${currentBatchId} committed") + // We'll increase currentBatchId after we complete processing current batch's data + currentBatchId += 1 } else { - false + currentStatus = currentStatus.copy(isDataAvailable = false) + updateStatusMessage("Waiting for data to arrive") + Thread.sleep(pollingDelayMs) } + } updateStatusMessage("Waiting for next trigger") - continueToRun + isActive }) updateStatusMessage("Stopped") } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 03aa45b61688..5bc36dd30f6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -277,6 +277,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { def threadState = if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead" + def threadStackTrace = if (currentStream != null && currentStream.microBatchThread.isAlive) { + s"Thread stack trace: ${currentStream.microBatchThread.getStackTrace.mkString("\n")}" + } else { + "" + } def testState = s""" @@ -287,6 +292,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { |Output Mode: $outputMode |Stream state: $currentOffsets |Thread state: $threadState + |$threadStackTrace |${if (streamThreadDeathCause != null) stackTraceToString(streamThreadDeathCause) else ""} | |== Sink == diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index f796a4cb4a39..8b90eefa3ea2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -292,52 +292,54 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte ) } - test("prune results by current_date, complete mode") { - import testImplicits._ - val clock = new StreamManualClock - val tz = TimeZone.getDefault.getID - val inputData = MemoryStream[Long] - val aggregated = - inputData.toDF() - .select(to_utc_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY), tz)) - .toDF("value") - .groupBy($"value") - .agg(count("*")) - .where($"value".cast("date") >= date_sub(current_date(), 10)) - .select(($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)") - testStream(aggregated, Complete)( - StartStream(ProcessingTime("10 day"), triggerClock = clock), - // advance clock to 10 days, should retain all keys - AddData(inputData, 0L, 5L, 5L, 10L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), - // advance clock to 20 days, should retain keys >= 10 - AddData(inputData, 15L, 15L, 20L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), - // advance clock to 30 days, should retain keys >= 20 - AddData(inputData, 85L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((20L, 1), (85L, 1)), - - // bounce stream and ensure correct batch timestamp is used - // i.e., we don't take it from the clock, which is at 90 days. - StopStream, - AssertOnQuery { q => // clear the sink - q.sink.asInstanceOf[MemorySink].clear() - q.batchCommitLog.purge(3) - // advance by 60 days i.e., 90 days total - clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) - true - }, - StartStream(ProcessingTime("10 day"), triggerClock = clock), - // Commit log blown, causing a re-run of the last batch - CheckLastBatch((20L, 1), (85L, 1)), - - // advance clock to 100 days, should retain keys >= 90 - AddData(inputData, 85L, 90L, 100L, 105L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) - ) + for (i <- 0 to 1000) { + test(s"prune results by current_date, complete mode - $i") { + import testImplicits._ + val clock = new StreamManualClock + val tz = TimeZone.getDefault.getID + val inputData = MemoryStream[Long] + val aggregated = + inputData.toDF() + .select(to_utc_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY), tz)) + .toDF("value") + .groupBy($"value") + .agg(count("*")) + .where($"value".cast("date") >= date_sub(current_date(), 10)) + .select(($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)") + testStream(aggregated, Complete)( + StartStream(ProcessingTime("10 day"), triggerClock = clock), + // advance clock to 10 days, should retain all keys + AddData(inputData, 0L, 5L, 5L, 10L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), + // advance clock to 20 days, should retain keys >= 10 + AddData(inputData, 15L, 15L, 20L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), + // advance clock to 30 days, should retain keys >= 20 + AddData(inputData, 85L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((20L, 1), (85L, 1)), + + // bounce stream and ensure correct batch timestamp is used + // i.e., we don't take it from the clock, which is at 90 days. + StopStream, + AssertOnQuery { q => // clear the sink + q.sink.asInstanceOf[MemorySink].clear() + q.batchCommitLog.purge(3) + // advance by 60 days i.e., 90 days total + clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) + true + }, + StartStream(ProcessingTime("10 day"), triggerClock = clock), + // Commit log blown, causing a re-run of the last batch + CheckLastBatch((20L, 1), (85L, 1)), + + // advance clock to 100 days, should retain keys >= 90 + AddData(inputData, 85L, 90L, 100L, 105L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) + ) + } } } From 7064bf46e66cabab9e303b359936b870a23982f7 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 11 Apr 2017 22:36:51 -0700 Subject: [PATCH 2/3] passed tests, remove test retry --- .../streaming/StreamingAggregationSuite.scala | 96 +++++++++---------- 1 file changed, 47 insertions(+), 49 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 8b90eefa3ea2..63849e096b05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -292,54 +292,52 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte ) } - for (i <- 0 to 1000) { - test(s"prune results by current_date, complete mode - $i") { - import testImplicits._ - val clock = new StreamManualClock - val tz = TimeZone.getDefault.getID - val inputData = MemoryStream[Long] - val aggregated = - inputData.toDF() - .select(to_utc_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY), tz)) - .toDF("value") - .groupBy($"value") - .agg(count("*")) - .where($"value".cast("date") >= date_sub(current_date(), 10)) - .select(($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)") - testStream(aggregated, Complete)( - StartStream(ProcessingTime("10 day"), triggerClock = clock), - // advance clock to 10 days, should retain all keys - AddData(inputData, 0L, 5L, 5L, 10L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), - // advance clock to 20 days, should retain keys >= 10 - AddData(inputData, 15L, 15L, 20L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), - // advance clock to 30 days, should retain keys >= 20 - AddData(inputData, 85L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((20L, 1), (85L, 1)), - - // bounce stream and ensure correct batch timestamp is used - // i.e., we don't take it from the clock, which is at 90 days. - StopStream, - AssertOnQuery { q => // clear the sink - q.sink.asInstanceOf[MemorySink].clear() - q.batchCommitLog.purge(3) - // advance by 60 days i.e., 90 days total - clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) - true - }, - StartStream(ProcessingTime("10 day"), triggerClock = clock), - // Commit log blown, causing a re-run of the last batch - CheckLastBatch((20L, 1), (85L, 1)), - - // advance clock to 100 days, should retain keys >= 90 - AddData(inputData, 85L, 90L, 100L, 105L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) - ) - } + test(s"prune results by current_date, complete mode") { + import testImplicits._ + val clock = new StreamManualClock + val tz = TimeZone.getDefault.getID + val inputData = MemoryStream[Long] + val aggregated = + inputData.toDF() + .select(to_utc_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY), tz)) + .toDF("value") + .groupBy($"value") + .agg(count("*")) + .where($"value".cast("date") >= date_sub(current_date(), 10)) + .select(($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)") + testStream(aggregated, Complete)( + StartStream(ProcessingTime("10 day"), triggerClock = clock), + // advance clock to 10 days, should retain all keys + AddData(inputData, 0L, 5L, 5L, 10L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), + // advance clock to 20 days, should retain keys >= 10 + AddData(inputData, 15L, 15L, 20L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), + // advance clock to 30 days, should retain keys >= 20 + AddData(inputData, 85L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((20L, 1), (85L, 1)), + + // bounce stream and ensure correct batch timestamp is used + // i.e., we don't take it from the clock, which is at 90 days. + StopStream, + AssertOnQuery { q => // clear the sink + q.sink.asInstanceOf[MemorySink].clear() + q.batchCommitLog.purge(3) + // advance by 60 days i.e., 90 days total + clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) + true + }, + StartStream(ProcessingTime("10 day"), triggerClock = clock), + // Commit log blown, causing a re-run of the last batch + CheckLastBatch((20L, 1), (85L, 1)), + + // advance clock to 100 days, should retain keys >= 90 + AddData(inputData, 85L, 90L, 100L, 105L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) + ) } } From 4d6e3cb957e5c08a0ba2b62d7a4445cc218f5e83 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 11 Apr 2017 22:40:56 -0700 Subject: [PATCH 3/3] minor --- .../apache/spark/sql/streaming/StreamingAggregationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 63849e096b05..f796a4cb4a39 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -292,7 +292,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte ) } - test(s"prune results by current_date, complete mode") { + test("prune results by current_date, complete mode") { import testImplicits._ val clock = new StreamManualClock val tz = TimeZone.getDefault.getID