Skip to content

Conversation

@huanliwang-db
Copy link
Contributor

  • When we close the hdfs state store, we should only remove the entry from loadedMaps rather than doing the active data cleanup. JVM GC should be able to help us GC those objects.
  • we should wait for the maintenance thread to stop before unloading the providers.

There are two race conditions between state store snapshotting and state store unloading which could result in query failure and potential data corruption.

Case 1:

  1. the maintenance thread pool encounters some issues and call the stopMaintenanceTask, this function further calls threadPool.stop. However, this function doesn't wait for the stop operation to be completed and move to do the state store unload and clear.
  2. the provider unload will close the state store which clear the values of loadedMaps for HDFS backed state store.
  3. if the not-yet-stop maintenance thread is still running and trying to do the snapshot, but the data in the underlying HDFSBackedStateStoreMap has been removed. if this snapshot process completes successfully, then we will write corrupted data and the following batches will consume this corrupted data.

Case 2:

  1. In executor_1, the maintenance thread is going to do the snapshot for state_store_1, it retrieves the HDFSBackedStateStoreMap object from the loadedMaps, after this, the maintenance thread releases the lock of the loadedMaps.
  2. state_store_1 is loaded in another executor, e.g. executor_2.
  3. another state store, state_store_2, is loaded on executor_1 and reportActiveStoreInstance to driver.
  4. executor_1 does the unload for those no longer active state store which clears the data entries in the HDFSBackedStateStoreMap
  5. the snapshotting thread is terminated and uploads the incomplete snapshot to cloud because the iterator doesn't have next element after doing the clear.
  6. future batches are consuming the corrupted data.

No

[info] Run completed in 2 minutes, 55 seconds.
[info] Total number of tests run: 153
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 153, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 271 s (04:31), completed May 2, 2024, 6:26:33 PM

before this change

[info] - state store unload/close happens during the maintenance *** FAILED *** (648 milliseconds)
[info]   Vector("a1", "a10", "a11", "a12", "a13", "a14", "a15", "a16", "a17", "a18", "a19", "a2", "a20", "a3", "a4", "a5", "a6", "a7", "a8", "a9") did not equal ArrayBuffer("a8") (StateStoreSuite.scala:414)
[info]   Analysis:
[info]   Vector1(0: "a1" -> "a8", 1: "a10" -> , 2: "a11" -> , 3: "a12" -> , 4: "a13" -> , 5: "a14" -> , 6: "a15" -> , 7: "a16" -> , 8: "a17" -> , 9: "a18" -> , 10: "a19" -> , 11: "a2" -> , 12: "a20" -> , 13: "a3" -> , 14: "a4" -> , 15: "a5" -> , 16: "a6" -> , 17: "a7" -> , 18: "a8" -> , 19: "a9" -> )
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.$anonfun$new$39(StateStoreSuite.scala:414)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuiteBase.tryWithProviderResource(StateStoreSuite.scala:1663)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.$anonfun$new$38(StateStoreSuite.scala:394)
18:32:09.694 WARN org.apache.spark.sql.execution.streaming.state.StateStoreSuite:

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.StateStoreSuite, threads: ForkJoinPool.commonPool-worker-1 (daemon=true) =====
[info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info]   at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.org$scalatest$BeforeAndAfter$$super$runTest(StateStoreSuite.scala:90)
[info]   at org.scalatest.BeforeAndAfter.runTest(BeforeAndAfter.scala:213)
[info]   at org.scalatest.BeforeAndAfter.runTest$(BeforeAndAfter.scala:203)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.runTest(StateStoreSuite.scala:90)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:334)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.org$scalatest$BeforeAndAfter$$super$run(StateStoreSuite.scala:90)
[info]   at org.scalatest.BeforeAndAfter.run(BeforeAndAfter.scala:273)
[info]   at org.scalatest.BeforeAndAfter.run$(BeforeAndAfter.scala:271)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.run(StateStoreSuite.scala:90)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)
[info] Run completed in 2 seconds, 4 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***

No

Closes #46351 from huanliwang-db/race.

Authored-by: Huanli Wang [email protected]

…g and snapshotting

* When we close the hdfs state store, we should only remove the entry from `loadedMaps` rather than doing the active data cleanup. JVM GC should be able to help us GC those objects.
* we should wait for the maintenance thread to stop before unloading the providers.

There are two race conditions between state store snapshotting and state store unloading which could result in query failure and potential data corruption.

Case 1:
1. the maintenance thread pool encounters some issues and call the [stopMaintenanceTask,](https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774) this function further calls [threadPool.stop.](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587) However, this function doesn't wait for the stop operation to be completed and move to do the state store [unload and clear.](https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778)
2. the provider unload will [close the state store](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721) which [clear the values of loadedMaps](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355) for HDFS backed state store.
3. if the not-yet-stop maintenance thread is still running and trying to do the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has been removed. if this snapshot process completes successfully, then we will write corrupted data and the following batches will consume this corrupted data.

Case 2:

1. In executor_1, the maintenance thread is going to do the snapshot for state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the loadedMaps, after this, the maintenance thread [releases the lock of the loadedMaps](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751).
2. state_store_1 is loaded in another executor, e.g. executor_2.
3. another state store, state_store_2, is loaded on executor_1 and [reportActiveStoreInstance](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871) to driver.
4. executor_1 does the [unload](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713) for those no longer active state store which clears the data entries in the `HDFSBackedStateStoreMap`
5. the snapshotting thread is terminated and uploads the incomplete snapshot to cloud because the [iterator doesn't have next element](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634) after doing the clear.
6. future batches are consuming the corrupted data.

No

```
[info] Run completed in 2 minutes, 55 seconds.
[info] Total number of tests run: 153
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 153, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 271 s (04:31), completed May 2, 2024, 6:26:33 PM
```
before this change

```
[info] - state store unload/close happens during the maintenance *** FAILED *** (648 milliseconds)
[info]   Vector("a1", "a10", "a11", "a12", "a13", "a14", "a15", "a16", "a17", "a18", "a19", "a2", "a20", "a3", "a4", "a5", "a6", "a7", "a8", "a9") did not equal ArrayBuffer("a8") (StateStoreSuite.scala:414)
[info]   Analysis:
[info]   Vector1(0: "a1" -> "a8", 1: "a10" -> , 2: "a11" -> , 3: "a12" -> , 4: "a13" -> , 5: "a14" -> , 6: "a15" -> , 7: "a16" -> , 8: "a17" -> , 9: "a18" -> , 10: "a19" -> , 11: "a2" -> , 12: "a20" -> , 13: "a3" -> , 14: "a4" -> , 15: "a5" -> , 16: "a6" -> , 17: "a7" -> , 18: "a8" -> , 19: "a9" -> )
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.$anonfun$new$39(StateStoreSuite.scala:414)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuiteBase.tryWithProviderResource(StateStoreSuite.scala:1663)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.$anonfun$new$38(StateStoreSuite.scala:394)
18:32:09.694 WARN org.apache.spark.sql.execution.streaming.state.StateStoreSuite:

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.StateStoreSuite, threads: ForkJoinPool.commonPool-worker-1 (daemon=true) =====
[info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info]   at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.org$scalatest$BeforeAndAfter$$super$runTest(StateStoreSuite.scala:90)
[info]   at org.scalatest.BeforeAndAfter.runTest(BeforeAndAfter.scala:213)
[info]   at org.scalatest.BeforeAndAfter.runTest$(BeforeAndAfter.scala:203)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.runTest(StateStoreSuite.scala:90)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:334)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.org$scalatest$BeforeAndAfter$$super$run(StateStoreSuite.scala:90)
[info]   at org.scalatest.BeforeAndAfter.run(BeforeAndAfter.scala:273)
[info]   at org.scalatest.BeforeAndAfter.run$(BeforeAndAfter.scala:271)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.run(StateStoreSuite.scala:90)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)
[info] Run completed in 2 seconds, 4 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***

```

No

Closes apache#46351 from huanliwang-db/race.

Authored-by: Huanli Wang <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
@huanliwang-db
Copy link
Contributor Author

huanliwang-db commented May 8, 2024

@HeartSaVioR the test fails consistently in [Run / Linters, licenses, dependencies and documentation generation](https://github.com/huanliwang-db/spark/actions/runs/8978966866/job/24740187884#logs)

_w/spark/spark/python/docs/source/reference/api/pyspark.BarrierTaskContext.cpus.rst, ..., /__w/spark/spark/python/docs/source/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.status.rst, /__w/spark/spark/python/docs/source/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.stop.rst, /__w/spark/spark/python/docs/source/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryListener.rst, /__w/spark/spark/python/docs/source/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryManager.active.rst, /__w/spark/spark/python/docs/source/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryManager.addListener.rst, /__w/spark/spark/python/docs/source/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryManager.awaitAnyTermination.rst, /__w/spark/spark/python/docs/source/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryManager.get.rst, /__w/spark/spark/python/docs/source/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryManager.removeListener.rst, /__w/spark/spark/python/docs/source/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryManager.resetTerminated.rst, /__w/spark/spark/python/docs/source/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryManager.rst
building [mo]: targets for 0 po files that are out of date
building [html]: targets for 73 source files that are out of date
updating environment: [new config] 2226 added, 0 changed, 0 removed
reading sources... [  0%] development/contributing

Exception occurred:
  File "/usr/local/lib/python3.9/dist-packages/nbsphinx/__init__.py", line 1316, in apply
    for section in self.document.findall(docutils.nodes.section):
AttributeError: 'document' object has no attribute 'findall'
The full traceback has been saved in /tmp/sphinx-err-0gpl3bwv.log, if you want to report the issue to the developers.
Please also report this if it was a user error, so that a better error message can be provided next time.
A bug report can be filed in the tracker at <https://github.com/sphinx-doc/sphinx/issues>. Thanks!
make: *** [Makefile:35: html] Error 2
                    ------------------------------------------------
      Jekyll 4.2.1   Please append `--trace` to the `build` command 
                     for any additional information or backtrace. 
                    ------------------------------------------------
/__w/spark/spark/docs/_plugins/copy_api_dirs.rb:130:in `<top (required)>': Python doc generation failed (RuntimeError)
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/external.rb:60:in `require'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/external.rb:60:in `block in require_with_graceful_fail'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/external.rb:57:in `each'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/external.rb:57:in `require_with_graceful_fail'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/plugin_manager.rb:89:in `block in require_plugin_files'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/plugin_manager.rb:87:in `each'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/plugin_manager.rb:87:in `require_plugin_files'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/plugin_manager.rb:21:in `conscientious_require'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/site.rb:131:in `setup'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/site.rb:36:in `initialize'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/commands/build.rb:30:in `new'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/commands/build.rb:30:in `process'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/command.rb:91:in `block in process_with_graceful_fail'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/command.rb:91:in `each'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/command.rb:91:in `process_with_graceful_fail'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/commands/build.rb:18:in `block (2 levels) in init_with_program'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/mercenary-0.4.0/lib/mercenary/command.rb:221:in `block in execute'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/mercenary-0.4.0/lib/mercenary/command.rb:221:in `each'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/mercenary-0.4.0/lib/mercenary/command.rb:221:in `execute'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/mercenary-0.4.0/lib/mercenary/program.rb:44:in `go'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/mercenary-0.4.0/lib/mercenary.rb:21:in `program'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/exe/jekyll:15:in `<top (required)>'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/bin/jekyll:23:in `load'
	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/bin/jekyll:23:in `<main>'
Error: Process completed with exit code 1.

do you have any insights on what could be the issue?

@HeartSaVioR
Copy link
Contributor

The linter does not seem to be an issue of this PR. Let me merge this and revert if it fails after merge.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to 3.5.

HeartSaVioR pushed a commit that referenced this pull request May 15, 2024
…oading and snapshotting

* When we close the hdfs state store, we should only remove the entry from `loadedMaps` rather than doing the active data cleanup. JVM GC should be able to help us GC those objects.
* we should wait for the maintenance thread to stop before unloading the providers.

There are two race conditions between state store snapshotting and state store unloading which could result in query failure and potential data corruption.

Case 1:
1. the maintenance thread pool encounters some issues and call the [stopMaintenanceTask,](https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774) this function further calls [threadPool.stop.](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587) However, this function doesn't wait for the stop operation to be completed and move to do the state store [unload and clear.](https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778)
2. the provider unload will [close the state store](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721) which [clear the values of loadedMaps](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355) for HDFS backed state store.
3. if the not-yet-stop maintenance thread is still running and trying to do the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has been removed. if this snapshot process completes successfully, then we will write corrupted data and the following batches will consume this corrupted data.

Case 2:

1. In executor_1, the maintenance thread is going to do the snapshot for state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the loadedMaps, after this, the maintenance thread [releases the lock of the loadedMaps](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751).
2. state_store_1 is loaded in another executor, e.g. executor_2.
3. another state store, state_store_2, is loaded on executor_1 and [reportActiveStoreInstance](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871) to driver.
4. executor_1 does the [unload](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713) for those no longer active state store which clears the data entries in the `HDFSBackedStateStoreMap`
5. the snapshotting thread is terminated and uploads the incomplete snapshot to cloud because the [iterator doesn't have next element](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634) after doing the clear.
6. future batches are consuming the corrupted data.

No

```
[info] Run completed in 2 minutes, 55 seconds.
[info] Total number of tests run: 153
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 153, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 271 s (04:31), completed May 2, 2024, 6:26:33 PM
```
before this change

```
[info] - state store unload/close happens during the maintenance *** FAILED *** (648 milliseconds)
[info]   Vector("a1", "a10", "a11", "a12", "a13", "a14", "a15", "a16", "a17", "a18", "a19", "a2", "a20", "a3", "a4", "a5", "a6", "a7", "a8", "a9") did not equal ArrayBuffer("a8") (StateStoreSuite.scala:414)
[info]   Analysis:
[info]   Vector1(0: "a1" -> "a8", 1: "a10" -> , 2: "a11" -> , 3: "a12" -> , 4: "a13" -> , 5: "a14" -> , 6: "a15" -> , 7: "a16" -> , 8: "a17" -> , 9: "a18" -> , 10: "a19" -> , 11: "a2" -> , 12: "a20" -> , 13: "a3" -> , 14: "a4" -> , 15: "a5" -> , 16: "a6" -> , 17: "a7" -> , 18: "a8" -> , 19: "a9" -> )
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.$anonfun$new$39(StateStoreSuite.scala:414)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuiteBase.tryWithProviderResource(StateStoreSuite.scala:1663)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.$anonfun$new$38(StateStoreSuite.scala:394)
18:32:09.694 WARN org.apache.spark.sql.execution.streaming.state.StateStoreSuite:

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.StateStoreSuite, threads: ForkJoinPool.commonPool-worker-1 (daemon=true) =====
[info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info]   at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.org$scalatest$BeforeAndAfter$$super$runTest(StateStoreSuite.scala:90)
[info]   at org.scalatest.BeforeAndAfter.runTest(BeforeAndAfter.scala:213)
[info]   at org.scalatest.BeforeAndAfter.runTest$(BeforeAndAfter.scala:203)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.runTest(StateStoreSuite.scala:90)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:334)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.org$scalatest$BeforeAndAfter$$super$run(StateStoreSuite.scala:90)
[info]   at org.scalatest.BeforeAndAfter.run(BeforeAndAfter.scala:273)
[info]   at org.scalatest.BeforeAndAfter.run$(BeforeAndAfter.scala:271)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.run(StateStoreSuite.scala:90)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)
[info] Run completed in 2 seconds, 4 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***

```

No

Closes #46351 from huanliwang-db/race.

Authored-by: Huanli Wang <huanli.wangdatabricks.com>

Closes #46415 from huanliwang-db/race-3.5.

Authored-by: Huanli Wang <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
@huanliwang-db
Copy link
Contributor Author

@HeartSaVioR let me know if there is a conflict for spark 3.4. I can create a new change for 3.4 if needed

HeartSaVioR pushed a commit that referenced this pull request May 18, 2024
…oading and snapshotting

* When we close the hdfs state store, we should only remove the entry from `loadedMaps` rather than doing the active data cleanup. JVM GC should be able to help us GC those objects.
* we should wait for the maintenance thread to stop before unloading the providers.

There are two race conditions between state store snapshotting and state store unloading which could result in query failure and potential data corruption.

Case 1:
1. the maintenance thread pool encounters some issues and call the [stopMaintenanceTask,](https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774) this function further calls [threadPool.stop.](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587) However, this function doesn't wait for the stop operation to be completed and move to do the state store [unload and clear.](https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778)
2. the provider unload will [close the state store](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721) which [clear the values of loadedMaps](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355) for HDFS backed state store.
3. if the not-yet-stop maintenance thread is still running and trying to do the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has been removed. if this snapshot process completes successfully, then we will write corrupted data and the following batches will consume this corrupted data.

Case 2:

1. In executor_1, the maintenance thread is going to do the snapshot for state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the loadedMaps, after this, the maintenance thread [releases the lock of the loadedMaps](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751).
2. state_store_1 is loaded in another executor, e.g. executor_2.
3. another state store, state_store_2, is loaded on executor_1 and [reportActiveStoreInstance](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871) to driver.
4. executor_1 does the [unload](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713) for those no longer active state store which clears the data entries in the `HDFSBackedStateStoreMap`
5. the snapshotting thread is terminated and uploads the incomplete snapshot to cloud because the [iterator doesn't have next element](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634) after doing the clear.
6. future batches are consuming the corrupted data.

No

```
[info] Run completed in 2 minutes, 55 seconds.
[info] Total number of tests run: 153
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 153, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 271 s (04:31), completed May 2, 2024, 6:26:33 PM
```
before this change

```
[info] - state store unload/close happens during the maintenance *** FAILED *** (648 milliseconds)
[info]   Vector("a1", "a10", "a11", "a12", "a13", "a14", "a15", "a16", "a17", "a18", "a19", "a2", "a20", "a3", "a4", "a5", "a6", "a7", "a8", "a9") did not equal ArrayBuffer("a8") (StateStoreSuite.scala:414)
[info]   Analysis:
[info]   Vector1(0: "a1" -> "a8", 1: "a10" -> , 2: "a11" -> , 3: "a12" -> , 4: "a13" -> , 5: "a14" -> , 6: "a15" -> , 7: "a16" -> , 8: "a17" -> , 9: "a18" -> , 10: "a19" -> , 11: "a2" -> , 12: "a20" -> , 13: "a3" -> , 14: "a4" -> , 15: "a5" -> , 16: "a6" -> , 17: "a7" -> , 18: "a8" -> , 19: "a9" -> )
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.$anonfun$new$39(StateStoreSuite.scala:414)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuiteBase.tryWithProviderResource(StateStoreSuite.scala:1663)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.$anonfun$new$38(StateStoreSuite.scala:394)
18:32:09.694 WARN org.apache.spark.sql.execution.streaming.state.StateStoreSuite:

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.StateStoreSuite, threads: ForkJoinPool.commonPool-worker-1 (daemon=true) =====
[info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info]   at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.org$scalatest$BeforeAndAfter$$super$runTest(StateStoreSuite.scala:90)
[info]   at org.scalatest.BeforeAndAfter.runTest(BeforeAndAfter.scala:213)
[info]   at org.scalatest.BeforeAndAfter.runTest$(BeforeAndAfter.scala:203)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.runTest(StateStoreSuite.scala:90)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:334)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.org$scalatest$BeforeAndAfter$$super$run(StateStoreSuite.scala:90)
[info]   at org.scalatest.BeforeAndAfter.run(BeforeAndAfter.scala:273)
[info]   at org.scalatest.BeforeAndAfter.run$(BeforeAndAfter.scala:271)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.run(StateStoreSuite.scala:90)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)
[info] Run completed in 2 seconds, 4 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***

```

No

Closes #46351 from huanliwang-db/race.

Authored-by: Huanli Wang <huanli.wangdatabricks.com>

Closes #46415 from huanliwang-db/race-3.5.

Authored-by: Huanli Wang <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
@HeartSaVioR
Copy link
Contributor

Also merged into 3.4.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants