Skip to content

Conversation

@zsxwing
Copy link
Member

@zsxwing zsxwing commented Apr 10, 2017

What changes were proposed in this pull request?

This PR fixes the following failure:

sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 
Assert on query failed: 

== Progress ==
   AssertOnQuery(<condition>, )
   StopStream
   AddData to MemoryStream[value#30891]: 1,2
   StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock@35cdc93a,Map())
   CheckAnswer: [6],[3]
   StopStream
=> AssertOnQuery(<condition>, )
   AssertOnQuery(<condition>, )
   StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock@cdb247d,Map())
   CheckAnswer: [6],[3]
   StopStream
   AddData to MemoryStream[value#30891]: 3
   StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock@55394e4d,Map())
   CheckLastBatch: [2]
   StopStream
   AddData to MemoryStream[value#30891]: 0
   StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock@749aa997,Map())
   ExpectFailure[org.apache.spark.SparkException, isFatalError: false]
   AssertOnQuery(<condition>, )
   AssertOnQuery(<condition>, incorrect start offset or end offset on exception)

== Stream ==
Output Mode: Append
Stream state: not started
Thread state: dead


== Sink ==
0: [6] [3]


== Plan ==

         
         
	at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:495)
	at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
	at org.scalatest.Assertions$class.fail(Assertions.scala:1328)
	at org.scalatest.FunSuite.fail(FunSuite.scala:1555)
	at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:347)
	at org.apache.spark.sql.streaming.StreamTest$class.verify$1(StreamTest.scala:318)
	at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:483)
	at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:357)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:357)
	at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:356)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.testStream(StreamingQuerySuite.scala:41)
	at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply$mcV$sp(StreamingQuerySuite.scala:166)
	at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply(StreamingQuerySuite.scala:161)
	at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply(StreamingQuerySuite.scala:161)
	at org.apache.spark.sql.catalyst.util.package$.quietly(package.scala:42)
	at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply$mcV$sp(SQLTestUtils.scala:268)
	at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply(SQLTestUtils.scala:268)
	at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply(SQLTestUtils.scala:268)
	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:68)
	at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfterEach$$super$runTest(StreamingQuerySuite.scala:41)
	at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfter$$super$runTest(StreamingQuerySuite.scala:41)
	at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:200)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.runTest(StreamingQuerySuite.scala:41)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
	at org.scalatest.Suite$class.run(Suite.scala:1424)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:31)
	at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
	at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfter$$super$run(StreamingQuerySuite.scala:41)
	at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.run(StreamingQuerySuite.scala:41)
	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:357)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:502)
	at sbt.ForkMain$Run$2.call(ForkMain.java:296)
	at sbt.ForkMain$Run$2.call(ForkMain.java:286)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

The failure is because CheckAnswer will run once committedOffsets is updated. Then writing the commit log may be interrupted by the following StopStream.

This PR just change the order to write the commit log first.

How was this patch tested?

Jenkins

@zsxwing zsxwing changed the title Write the log first to fix a race contion in tests [SPARK-20282][SS][Tests]Write the commit log first to fix a race contion in tests Apr 10, 2017
if (dataAvailable) {
// Update committed offsets.
committedOffsets ++= availableOffsets
batchCommitLog.add(currentBatchId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is existing, but do we not write to the commit log if there is no new data in the next batch?

Copy link
Member Author

@zsxwing zsxwing Apr 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's because we only update currentBatchId when data is available. cc @tdas

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, when there is a batch with data, it finishes executing and then increments the batch id for the next batch. But if next batch has no data, then the batchid not further increment, and gets eventually used in a future batch with data.

So i think it is correct to update the commit log as soon this batch is done, and before the batch id is incremented for next batch. This change maintains that invariant as far as i think.

@marmbrus
Copy link
Contributor

LGTM, for fixing the issue with the test. We should separately decide if this is really the behavior we want for the commit log.

@SparkQA
Copy link

SparkQA commented Apr 10, 2017

Test build #75670 has finished for PR 17594 at commit 6b63179.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member Author

zsxwing commented Apr 10, 2017

Thanks! Merging to master.

@asfgit asfgit closed this in a35b9d9 Apr 10, 2017
@zsxwing zsxwing deleted the SPARK-20282 branch April 10, 2017 21:14
peter-toth pushed a commit to peter-toth/spark that referenced this pull request Oct 6, 2018
…tion in tests

## What changes were proposed in this pull request?

This PR fixes the following failure:
```
sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException:
Assert on query failed:

== Progress ==
   AssertOnQuery(<condition>, )
   StopStream
   AddData to MemoryStream[value#30891]: 1,2
   StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock35cdc93a,Map())
   CheckAnswer: [6],[3]
   StopStream
=> AssertOnQuery(<condition>, )
   AssertOnQuery(<condition>, )
   StartStream(OneTimeTrigger,org.apache.spark.util.SystemClockcdb247d,Map())
   CheckAnswer: [6],[3]
   StopStream
   AddData to MemoryStream[value#30891]: 3
   StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock55394e4d,Map())
   CheckLastBatch: [2]
   StopStream
   AddData to MemoryStream[value#30891]: 0
   StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock749aa997,Map())
   ExpectFailure[org.apache.spark.SparkException, isFatalError: false]
   AssertOnQuery(<condition>, )
   AssertOnQuery(<condition>, incorrect start offset or end offset on exception)

== Stream ==
Output Mode: Append
Stream state: not started
Thread state: dead

== Sink ==
0: [6] [3]

== Plan ==

	at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:495)
	at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
	at org.scalatest.Assertions$class.fail(Assertions.scala:1328)
	at org.scalatest.FunSuite.fail(FunSuite.scala:1555)
	at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:347)
	at org.apache.spark.sql.streaming.StreamTest$class.verify$1(StreamTest.scala:318)
	at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:483)
	at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:357)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:357)
	at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:356)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.testStream(StreamingQuerySuite.scala:41)
	at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply$mcV$sp(StreamingQuerySuite.scala:166)
	at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply(StreamingQuerySuite.scala:161)
	at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply(StreamingQuerySuite.scala:161)
	at org.apache.spark.sql.catalyst.util.package$.quietly(package.scala:42)
	at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply$mcV$sp(SQLTestUtils.scala:268)
	at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply(SQLTestUtils.scala:268)
	at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply(SQLTestUtils.scala:268)
	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:68)
	at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfterEach$$super$runTest(StreamingQuerySuite.scala:41)
	at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfter$$super$runTest(StreamingQuerySuite.scala:41)
	at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:200)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.runTest(StreamingQuerySuite.scala:41)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
	at org.scalatest.Suite$class.run(Suite.scala:1424)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:31)
	at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
	at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfter$$super$run(StreamingQuerySuite.scala:41)
	at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241)
	at org.apache.spark.sql.streaming.StreamingQuerySuite.run(StreamingQuerySuite.scala:41)
	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:357)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:502)
	at sbt.ForkMain$Run$2.call(ForkMain.java:296)
	at sbt.ForkMain$Run$2.call(ForkMain.java:286)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
```

The failure is because `CheckAnswer` will run once `committedOffsets` is updated. Then writing the commit log may be interrupted by the following `StopStream`.

This PR just change the order to write the commit log first.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes apache#17594 from zsxwing/SPARK-20282.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants